API 集成指南 – 聊天/完成/工作流端点、身份验证、流媒体、Webhooks
为了将使用 Dify 构建的应用程序集成到业务系统中,通过 API 进行协作至关重要。在本文中,我们将对 Dify 提供的各种 API 端点的结构、身份验证方法、请求/响应格式、流支持和 webhook 协作进行实际解释。
1. Dify API 整体结构
1.1 API分类
Dify 的 API 大致分为两层。
| 层 | 目的 | 认证密钥 | 目标受众 |
|---|---|---|---|
| 应用程序API | 执行已发布的应用程序(聊天、完成、工作流程) | 应用程序 API 密钥 | 应用程序用户/前端 |
| 管理API | 应用程序/数据集管理操作 | 数据集 API 密钥 | 管理员/后台 |
1.2 按应用程序类型划分的端点
| 应用程序类型 | 主要终点 | 方法 |
|---|---|---|
| 聊天应用程序 | /v1/chat-messages | 发布 |
| 完成应用程序 | /v1/completion-messages | 发布 |
| 工作流程应用程序 | /v1/workflows/run | 发布 |
| 知识库 | /v1/datasets | 获取/发布 |
flowchart TD
CLIENT[クライアントアプリ] --> AUTH{認証}
AUTH --> |App API Key| APP[Application API]
AUTH --> |Dataset API Key| DS[Dataset API]
APP --> CHAT[Chat: /v1/chat-messages]
APP --> COMP[Completion: /v1/completion-messages]
APP --> WF[Workflow: /v1/workflows/run]
DS --> DSOP[Dataset CRUD]
DS --> DOC[Document CRUD]
2. 身份验证
2.1 获取API密钥
- 在 Dify 控制台中打开应用程序
- 点击左侧菜单**“API访问”**
- 在**“API Key”**部分生成新密钥
- 复制您的密钥并妥善保管
2.2 认证头格式
Authorization: Bearer {api_key}
请求示例:
curl -X POST "https://your-dify-instance.com/v1/chat-messages" \
-H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx" \
-H "Content-Type: application/json" \
-d '{"inputs": {}, "query": "こんにちは", "user": "user-001"}'
2.3 API 密钥管理最佳实践
| 项目 | 推荐 |
|---|---|
| 储存地点 | 环境变量或秘密管理器 |
| 硬编码为代码 | 禁止 |
| 提交到 Git 存储库 | 禁止(添加到.gitignore) |
| 旋转 | 定期(每 90 天等) |
| 权力分离 | App API Key 和 Dataset API Key 分开管理 |
| 监控 | 监控API调用日志 |
3. 聊天应用 API
3.1 发送消息
向交互式应用程序发送消息并获得 AI 响应。
端点: POST /v1/chat-messages
请求正文:
{
"inputs": {
"company_name": "株式会社Example",
"department": "営業部"
},
"query": "有給休暇の申請方法を教えてください",
"user": "user-001",
"conversation_id": "",
"response_mode": "blocking",
"files": []
}
参数说明:
| 参数 | 类型 | 必填 | 描述 |
|---|---|---|---|
inputs | 对象 | 是的 | 应用程序中定义的输入变量的值 |
query | 字符串 | 是的 | 用户留言 |
user | 字符串 | 是的 | 用户标识符(最终用户 ID) |
conversation_id | 字符串 | 没有 | 对话 ID。使用空字符串开始新对话 |
response_mode | 字符串 | 是的 | blocking(同步)或 streaming(流式) |
files | 数组 | 没有 | 参考上传文件 |
响应(阻塞模式):
{
"event": "message",
"message_id": "msg-xxxxxxxx",
"conversation_id": "conv-xxxxxxxx",
"mode": "chat",
"answer": "有給休暇の申請は、社内ポータルの「勤怠管理」メニューから...",
"metadata": {
"usage": {
"prompt_tokens": 512,
"completion_tokens": 128,
"total_tokens": 640,
"total_price": "0.0064",
"currency": "USD"
},
"retriever_resources": [
{
"dataset_id": "ds-xxxxx",
"dataset_name": "社内規程集",
"document_id": "doc-xxxxx",
"document_name": "勤怠管理規程.pdf",
"segment_id": "seg-xxxxx",
"score": 0.92,
"content": "有給休暇の申請は..."
}
]
},
"created_at": 1712899200
}
3.2 获取通话记录
端点: GET /v1/messages
curl -X GET "https://your-dify-instance.com/v1/messages?conversation_id=conv-xxxxx&user=user-001&limit=20" \
-H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx"
3.3 获取对话列表
端点: GET /v1/conversations
curl -X GET "https://your-dify-instance.com/v1/conversations?user=user-001&limit=20" \
-H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx"
4. 完成应用程序API
4.1 文本完成请求
用于一次性文本生成(没有对话上下文)。
端点: POST /v1/completion-messages
{
"inputs": {
"document_text": "本契約は、甲が乙に対して...",
"task_type": "要約"
},
"user": "user-001",
"response_mode": "blocking"
}
回复:
{
"event": "message",
"message_id": "msg-xxxxxxxx",
"mode": "completion",
"answer": "本契約は、甲(発注者)と乙(受注者)の間で...",
"metadata": {
"usage": {
"prompt_tokens": 1024,
"completion_tokens": 256,
"total_tokens": 1280
}
},
"created_at": 1712899200
}
5. 工作流程应用程序 API
5.1 执行工作流程
端点: POST /v1/workflows/run
{
"inputs": {
"document_list": ["契約書A.pdf", "契約書B.pdf"],
"analysis_type": "リスク分析",
"output_format": "json"
},
"user": "user-001",
"response_mode": "blocking"
}
回复:
{
"workflow_run_id": "wr-xxxxxxxx",
"task_id": "task-xxxxxxxx",
"data": {
"id": "wr-xxxxxxxx",
"workflow_id": "wf-xxxxxxxx",
"status": "succeeded",
"outputs": {
"final_result": "...",
"metadata": {}
},
"elapsed_time": 12.5,
"total_tokens": 2048,
"total_steps": 5,
"created_at": 1712899200,
"finished_at": 1712899213
}
}
5.2 检查工作流程执行状态
端点: GET /v1/workflows/run/{workflow_run_id}
curl -X GET "https://your-dify-instance.com/v1/workflows/run/wr-xxxxxxxx" \
-H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx"
5.3 停止工作流程
端点: POST /v1/workflows/tasks/{task_id}/stop(Service API) or POST /api/workflows/tasks/{task_id}/stop(Web API)
# Dify 1.x 真实端点(与 Chat Message 的停止端点格式一致)
curl -X POST "https://your-dify-instance.com/v1/workflows/tasks/{task_id}/stop" \
-H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx" \
-H "Content-Type: application/json" \
-d '{"user": "user-001"}'
注意:
task_id是从 streaming 响应(SSE)的事件流中获取的,不是 workflow_run_id。Chat Message 也有对应的停止端点:POST /v1/chat-messages/{task_id}/stop。
6. 流式响应
6.1 流式传输的工作原理
如果指定 response_mode: "streaming",响应将以服务器发送事件 (SSE) 格式按顺序返回。
data: {"event": "message", "message_id": "msg-xxx", "answer": "有給", ...}
data: {"event": "message", "message_id": "msg-xxx", "answer": "休暇", ...}
data: {"event": "message", "message_id": "msg-xxx", "answer": "の", ...}
data: {"event": "message_end", "message_id": "msg-xxx", "metadata": {...}}
6.2 SSE 事件类型
| 活动 | 描述 | 时间 |
|---|---|---|
message | 文本块的传递 | 在生成过程中依次 |
message_end | 消息生成完成 | 当生成完成时 |
message_file | 文件输出 | 文件生成 |
tts_message | 音频数据 | 当 TTS 启用时 |
tts_message_end | 音频输出完成 | TTS 完成 |
message_replace | 消息替换(审核期间) | 当内容过滤时 |
error | 错误发生 | 错误 |
6.3 前端实现示例(JavaScript)
async function sendChatMessage(query, conversationId = "") {
const response = await fetch("https://your-dify-instance.com/v1/chat-messages", {
method: "POST",
headers: {
"Authorization": "Bearer app-xxxxxxxxxxxxxxxxxxxx",
"Content-Type": "application/json",
},
body: JSON.stringify({
inputs: {},
query: query,
user: "user-001",
conversation_id: conversationId,
response_mode: "streaming",
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let fullAnswer = "";
let newConversationId = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
switch (data.event) {
case "message":
fullAnswer += data.answer;
// UIにリアルタイム表示
updateUI(fullAnswer);
newConversationId = data.conversation_id;
break;
case "message_end":
console.log("Token usage:", data.metadata?.usage);
break;
case "error":
console.error("Error:", data.message);
break;
}
}
}
}
return { answer: fullAnswer, conversationId: newConversationId };
}
6.4 后端实现示例(Python)
import requests
import json
DIFY_BASE_URL = "https://your-dify-instance.com"
APP_API_KEY = "app-xxxxxxxxxxxxxxxxxxxx"
def chat_streaming(query: str, user: str, conversation_id: str = ""):
"""ストリーミングモードでチャットメッセージを送信"""
url = f"{DIFY_BASE_URL}/v1/chat-messages"
headers = {
"Authorization": f"Bearer {APP_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"inputs": {},
"query": query,
"user": user,
"conversation_id": conversation_id,
"response_mode": "streaming",
}
full_answer = ""
with requests.post(url, headers=headers, json=payload, stream=True) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if line:
decoded = line.decode("utf-8")
if decoded.startswith("data: "):
data = json.loads(decoded[6:])
event = data.get("event")
if event == "message":
chunk = data.get("answer", "")
full_answer += chunk
print(chunk, end="", flush=True)
elif event == "message_end":
print() # 改行
return {
"answer": full_answer,
"conversation_id": data.get("conversation_id"),
"metadata": data.get("metadata"),
}
elif event == "error":
raise Exception(f"API Error: {data.get('message')}")
return {"answer": full_answer}
def chat_blocking(query: str, user: str, conversation_id: str = ""):
"""ブロッキングモードでチャットメッセージを送信"""
url = f"{DIFY_BASE_URL}/v1/chat-messages"
headers = {
"Authorization": f"Bearer {APP_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"inputs": {},
"query": query,
"user": user,
"conversation_id": conversation_id,
"response_mode": "blocking",
}
resp = requests.post(url, headers=headers, json=payload)
resp.raise_for_status()
return resp.json()
7. 阻塞与流式传输选择标准
| 观点 | 封锁 | 流媒体 |
|---|---|---|
| 回复格式 | 完成后一次性全部返回 | 通过上交所顺序交割 |
| 用户体验 | 仅加载显示 | 打字动画风格 |
| 超时风险 | 生成长文本时高 | 低(由于顺序交付) |
| 实施复杂性 | 低 | 中(需要 SSE 解析器) |
| 推荐场景 | 批处理、API协作 | 聊天UI,实时显示 |
| 可被打断 | 不可能(等待完成) | 可以用 POST /stop 中断 |
推荐:始终在面向用户的聊天界面中使用流式传输。如果您需要后端 API 到 API 集成中的完整结果,则 阻塞 适合。
8. 错误处理
8.1 主要错误代码
| HTTP 状态 | 错误代码 | 说明 | 行动 |
|---|---|---|---|
| 400 | invalid_param | 无效参数 | 检查请求正文 |
| 401 | 401 unauthorized | 身份验证错误 | 检查 API 密钥 |
| 403 | 403 forbidden | 无访问权限 | 检查应用程序的公共设置 |
| 404 | 404 not_found | 资源不存在 | 检查端点ID |
| 429 | 429 rate_limit_exceeded | 超出速率限制 | 增加重试间隔 |
| 500 | 500 internal_server_error | 服务器错误 | 检查日志/重试 |
| 400 | 400 provider_not_initialize | LLM 提供商未配置 | 检查模型提供商配置 |
| 400 | 400 model_currently_not_support | 当前模型不支持该操作 | 切换支持的模型 |
| 400 | 400 completion_request_error | 模型调用错误 | 检查日志 / 重试 |
| 400 | 400 provider_quota_exceeded | 提供商配额超限 | 调整配额或切换提供商 |
8.2 重试策略
import time
import requests
from requests.exceptions import RequestException
def call_dify_api_with_retry(url, headers, payload, max_retries=3):
"""指数バックオフ付きリトライ"""
for attempt in range(max_retries):
try:
resp = requests.post(url, headers=headers, json=payload, timeout=60)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 10))
print(f"Rate limited. Retrying after {retry_after}s...")
time.sleep(retry_after)
continue
if resp.status_code >= 500:
wait = 2 ** attempt # 指数バックオフ: 1, 2, 4秒
print(f"Server error {resp.status_code}. Retrying in {wait}s...")
time.sleep(wait)
continue
resp.raise_for_status()
return resp.json()
except RequestException as e:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt
print(f"Request failed: {e}. Retrying in {wait}s...")
time.sleep(wait)
raise Exception(f"Failed after {max_retries} retries")
9. Webhook 回调
9.1 Workflow的Webhook集成
您可以使用工作流的 HTTP 请求节点将处理结果通知外部系统。
HTTP请求节点配置示例:
method: POST
url: "https://your-system.example.com/webhook/dify-callback"
headers:
Content-Type: "application/json"
X-Webhook-Secret: "{{webhook_secret}}"
body:
type: json
content: |
{
"event": "workflow_completed",
"workflow_id": "{{workflow_id}}",
"result": {{workflow_output}},
"timestamp": "{{current_timestamp}}",
"status": "success"
}
9.2 Webhook 接收器实现示例 (Node.js / Express)
const express = require("express");
const crypto = require("crypto");
const app = express();
app.use(express.json());
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;
function verifySignature(payload, signature) {
const expected = crypto
.createHmac("sha256", WEBHOOK_SECRET)
.update(JSON.stringify(payload))
.digest("hex");
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}
app.post("/webhook/dify-callback", (req, res) => {
const signature = req.headers["x-webhook-secret"];
if (signature !== WEBHOOK_SECRET) {
return res.status(401).json({ error: "Invalid signature" });
}
const { event, workflow_id, result, status } = req.body;
console.log(`Received webhook: ${event} for workflow ${workflow_id}`);
// ビジネスロジック: 結果をDBに保存、通知送信など
processWorkflowResult(result, status);
res.status(200).json({ received: true });
});
function processWorkflowResult(result, status) {
// 例: Slack通知、DB保存、後続処理のトリガーなど
console.log("Processing result:", result);
}
app.listen(3000, () => console.log("Webhook server running on port 3000"));
10. 实现模式集合
10.1 LINE Bot 合作
# LINE Bot から Dify Chat API を呼び出すパターン
from flask import Flask, request
from linebot import LineBotApi, WebhookHandler
from linebot.models import MessageEvent, TextMessage, TextSendMessage
import requests
app = Flask(__name__)
line_bot_api = LineBotApi("LINE_CHANNEL_ACCESS_TOKEN")
handler = WebhookHandler("LINE_CHANNEL_SECRET")
DIFY_API_URL = "https://your-dify-instance.com/v1/chat-messages"
DIFY_API_KEY = "app-xxxxxxxxxxxxxxxxxxxx"
# ユーザーごとの会話IDを管理
conversation_store = {}
@app.route("/callback", methods=["POST"])
def callback():
handler.handle(request.get_data(as_text=True),
request.headers["X-Line-Signature"])
return "OK"
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
user_id = event.source.user_id
conv_id = conversation_store.get(user_id, "")
resp = requests.post(
DIFY_API_URL,
headers={
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {},
"query": event.message.text,
"user": user_id,
"conversation_id": conv_id,
"response_mode": "blocking",
},
)
data = resp.json()
conversation_store[user_id] = data.get("conversation_id", "")
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=data["answer"])
)
10.2 批处理模式
import csv
import json
import time
import requests
DIFY_API_URL = "https://your-dify-instance.com/v1/workflows/run"
DIFY_API_KEY = "app-xxxxxxxxxxxxxxxxxxxx"
def process_batch(input_csv: str, output_csv: str):
"""CSVファイルの各行に対してWorkflowを実行するバッチ処理"""
results = []
with open(input_csv, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
print(f"Processing row {i + 1}: {row.get('title', 'N/A')}")
resp = requests.post(
DIFY_API_URL,
headers={
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {
"document_text": row["content"],
"analysis_type": row.get("type", "要約"),
},
"user": "batch-processor",
"response_mode": "blocking",
},
)
if resp.status_code == 200:
data = resp.json()
results.append({
"input_title": row.get("title", ""),
"output": data["data"]["outputs"].get("final_result", ""),
"status": "success",
"tokens": data["data"].get("total_tokens", 0),
})
else:
results.append({
"input_title": row.get("title", ""),
"output": "",
"status": f"error: {resp.status_code}",
"tokens": 0,
})
# レート制限対策: リクエスト間に待機
time.sleep(1)
# 結果をCSVに出力
with open(output_csv, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["input_title", "output", "status", "tokens"])
writer.writeheader()
writer.writerows(results)
print(f"Batch complete. Results saved to {output_csv}")
11. 安全和运营
11.1 安全检查表
- API Key 没有硬编码在源代码中
- 使用 HTTPS 通信
- CORS 设置已正确配置
- 已建立 API 密钥轮换政策。
- 已设置速率限制
- 实施输入验证
- 错误响应中不泄露内部信息
11.2 监控项目
| 项目 | 指标 | 大约警报阈值 |
|---|---|---|
| 响应时间 | P95 延迟 | > 30 秒 |
| 错误率 | 5xx 错误百分比 | > 5% |
| 代币使用 | 每日代币消费 | 预算的 80% |
| API 调用次数 | 每小时请求数 | 速率限制的 80% |
| 对话次数 | 活跃对话数量 | 80% 的容量 |
12. 总结
下面总结了将 Dify API 集成到您的业务系统时需要记住的要点。
| 观点 | 要点 |
|---|---|
| 端点选择 | 根据应用程序类型(聊天/完成/工作流程)使用正确的端点 |
| 认证 | API 密钥通过环境变量进行管理并定期轮换 |
| 响应模式 | 用于用户 UI 的流式传输,用于后端协调的阻塞 |
| 错误处理 | 使用指数退避和适当的回退进行重试 |
| 安全 | HTTPS、CORS、输入验证、日志监控 |
API集成的目标不是能够输入curl命令,而是确保Dify应用程序作为实际业务系统的一部分稳定运行。遵循上面列出的实施模式和最佳实践以确保稳健的集成。