Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

API 集成指南 – 聊天/完成/工作流端点、身份验证、流媒体、Webhooks

为了将使用 Dify 构建的应用程序集成到业务系统中,通过 API 进行协作至关重要。在本文中,我们将对 Dify 提供的各种 API 端点的结构、身份验证方法、请求/响应格式、流支持和 webhook 协作进行实际解释。


1. Dify API 整体结构

1.1 API分类

Dify 的 API 大致分为两层。

目的认证密钥目标受众
应用程序API执行已发布的应用程序(聊天、完成、工作流程)应用程序 API 密钥应用程序用户/前端
管理API应用程序/数据集管理操作数据集 API 密钥管理员/后台

1.2 按应用程序类型划分的端点

应用程序类型主要终点方法
聊天应用程序/v1/chat-messages发布
完成应用程序/v1/completion-messages发布
工作流程应用程序/v1/workflows/run发布
知识库/v1/datasets获取/发布
flowchart TD
    CLIENT[クライアントアプリ] --> AUTH{認証}
    AUTH --> |App API Key| APP[Application API]
    AUTH --> |Dataset API Key| DS[Dataset API]
    APP --> CHAT[Chat: /v1/chat-messages]
    APP --> COMP[Completion: /v1/completion-messages]
    APP --> WF[Workflow: /v1/workflows/run]
    DS --> DSOP[Dataset CRUD]
    DS --> DOC[Document CRUD]

2. 身份验证

2.1 获取API密钥

  1. 在 Dify 控制台中打开应用程序
  2. 点击左侧菜单**“API访问”**
  3. 在**“API Key”**部分生成新密钥
  4. 复制您的密钥并妥善保管

2.2 认证头格式

Authorization: Bearer {api_key}

请求示例:

curl -X POST "https://your-dify-instance.com/v1/chat-messages" \
  -H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx" \
  -H "Content-Type: application/json" \
  -d '{"inputs": {}, "query": "こんにちは", "user": "user-001"}'

2.3 API 密钥管理最佳实践

项目推荐
储存地点环境变量或秘密管理器
硬编码为代码禁止
提交到 Git 存储库禁止(添加到.gitignore
旋转定期(每 90 天等)
权力分离App API Key 和 Dataset API Key 分开管理
监控监控API调用日志

3. 聊天应用 API

3.1 发送消息

向交互式应用程序发送消息并获得 AI 响应。

端点: POST /v1/chat-messages

请求正文:

{
  "inputs": {
    "company_name": "株式会社Example",
    "department": "営業部"
  },
  "query": "有給休暇の申請方法を教えてください",
  "user": "user-001",
  "conversation_id": "",
  "response_mode": "blocking",
  "files": []
}

参数说明:

参数类型必填描述
inputs对象是的应用程序中定义的输入变量的值
query字符串是的用户留言
user字符串是的用户标识符(最终用户 ID)
conversation_id字符串没有对话 ID。使用空字符串开始新对话
response_mode字符串是的blocking(同步)或 streaming(流式)
files数组没有参考上传文件

响应(阻塞模式):

{
  "event": "message",
  "message_id": "msg-xxxxxxxx",
  "conversation_id": "conv-xxxxxxxx",
  "mode": "chat",
  "answer": "有給休暇の申請は、社内ポータルの「勤怠管理」メニューから...",
  "metadata": {
    "usage": {
      "prompt_tokens": 512,
      "completion_tokens": 128,
      "total_tokens": 640,
      "total_price": "0.0064",
      "currency": "USD"
    },
    "retriever_resources": [
      {
        "dataset_id": "ds-xxxxx",
        "dataset_name": "社内規程集",
        "document_id": "doc-xxxxx",
        "document_name": "勤怠管理規程.pdf",
        "segment_id": "seg-xxxxx",
        "score": 0.92,
        "content": "有給休暇の申請は..."
      }
    ]
  },
  "created_at": 1712899200
}

3.2 获取通话记录

端点: GET /v1/messages

curl -X GET "https://your-dify-instance.com/v1/messages?conversation_id=conv-xxxxx&user=user-001&limit=20" \
  -H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx"

3.3 获取对话列表

端点: GET /v1/conversations

curl -X GET "https://your-dify-instance.com/v1/conversations?user=user-001&limit=20" \
  -H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx"

4. 完成应用程序API

4.1 文本完成请求

用于一次性文本生成(没有对话上下文)。

端点: POST /v1/completion-messages

{
  "inputs": {
    "document_text": "本契約は、甲が乙に対して...",
    "task_type": "要約"
  },
  "user": "user-001",
  "response_mode": "blocking"
}

回复:

{
  "event": "message",
  "message_id": "msg-xxxxxxxx",
  "mode": "completion",
  "answer": "本契約は、甲(発注者)と乙(受注者)の間で...",
  "metadata": {
    "usage": {
      "prompt_tokens": 1024,
      "completion_tokens": 256,
      "total_tokens": 1280
    }
  },
  "created_at": 1712899200
}

5. 工作流程应用程序 API

5.1 执行工作流程

端点: POST /v1/workflows/run

{
  "inputs": {
    "document_list": ["契約書A.pdf", "契約書B.pdf"],
    "analysis_type": "リスク分析",
    "output_format": "json"
  },
  "user": "user-001",
  "response_mode": "blocking"
}

回复:

{
  "workflow_run_id": "wr-xxxxxxxx",
  "task_id": "task-xxxxxxxx",
  "data": {
    "id": "wr-xxxxxxxx",
    "workflow_id": "wf-xxxxxxxx",
    "status": "succeeded",
    "outputs": {
      "final_result": "...",
      "metadata": {}
    },
    "elapsed_time": 12.5,
    "total_tokens": 2048,
    "total_steps": 5,
    "created_at": 1712899200,
    "finished_at": 1712899213
  }
}

5.2 检查工作流程执行状态

端点: GET /v1/workflows/run/{workflow_run_id}

curl -X GET "https://your-dify-instance.com/v1/workflows/run/wr-xxxxxxxx" \
  -H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx"

5.3 停止工作流程

端点: POST /v1/workflows/tasks/{task_id}/stop(Service API) or POST /api/workflows/tasks/{task_id}/stop(Web API)

# Dify 1.x 真实端点(与 Chat Message 的停止端点格式一致)
curl -X POST "https://your-dify-instance.com/v1/workflows/tasks/{task_id}/stop" \
  -H "Authorization: Bearer app-xxxxxxxxxxxxxxxxxxxx" \
  -H "Content-Type: application/json" \
  -d '{"user": "user-001"}'

注意task_id 是从 streaming 响应(SSE)的事件流中获取的,不是 workflow_run_id。Chat Message 也有对应的停止端点:POST /v1/chat-messages/{task_id}/stop


6. 流式响应

6.1 流式传输的工作原理

如果指定 response_mode: "streaming",响应将以服务器发送事件 (SSE) 格式按顺序返回。

data: {"event": "message", "message_id": "msg-xxx", "answer": "有給", ...}

data: {"event": "message", "message_id": "msg-xxx", "answer": "休暇", ...}

data: {"event": "message", "message_id": "msg-xxx", "answer": "の", ...}

data: {"event": "message_end", "message_id": "msg-xxx", "metadata": {...}}

6.2 SSE 事件类型

活动描述时间
message文本块的传递在生成过程中依次
message_end消息生成完成当生成完成时
message_file文件输出文件生成
tts_message音频数据当 TTS 启用时
tts_message_end音频输出完成TTS 完成
message_replace消息替换(审核期间)当内容过滤时
error错误发生错误

6.3 前端实现示例(JavaScript)

async function sendChatMessage(query, conversationId = "") {
  const response = await fetch("https://your-dify-instance.com/v1/chat-messages", {
    method: "POST",
    headers: {
      "Authorization": "Bearer app-xxxxxxxxxxxxxxxxxxxx",
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      inputs: {},
      query: query,
      user: "user-001",
      conversation_id: conversationId,
      response_mode: "streaming",
    }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";
  let fullAnswer = "";
  let newConversationId = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");
    buffer = lines.pop() || "";

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));

        switch (data.event) {
          case "message":
            fullAnswer += data.answer;
            // UIにリアルタイム表示
            updateUI(fullAnswer);
            newConversationId = data.conversation_id;
            break;

          case "message_end":
            console.log("Token usage:", data.metadata?.usage);
            break;

          case "error":
            console.error("Error:", data.message);
            break;
        }
      }
    }
  }

  return { answer: fullAnswer, conversationId: newConversationId };
}

6.4 后端实现示例(Python)

import requests
import json

DIFY_BASE_URL = "https://your-dify-instance.com"
APP_API_KEY = "app-xxxxxxxxxxxxxxxxxxxx"

def chat_streaming(query: str, user: str, conversation_id: str = ""):
    """ストリーミングモードでチャットメッセージを送信"""
    url = f"{DIFY_BASE_URL}/v1/chat-messages"
    headers = {
        "Authorization": f"Bearer {APP_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "inputs": {},
        "query": query,
        "user": user,
        "conversation_id": conversation_id,
        "response_mode": "streaming",
    }

    full_answer = ""
    with requests.post(url, headers=headers, json=payload, stream=True) as resp:
        resp.raise_for_status()
        for line in resp.iter_lines():
            if line:
                decoded = line.decode("utf-8")
                if decoded.startswith("data: "):
                    data = json.loads(decoded[6:])
                    event = data.get("event")

                    if event == "message":
                        chunk = data.get("answer", "")
                        full_answer += chunk
                        print(chunk, end="", flush=True)

                    elif event == "message_end":
                        print()  # 改行
                        return {
                            "answer": full_answer,
                            "conversation_id": data.get("conversation_id"),
                            "metadata": data.get("metadata"),
                        }

                    elif event == "error":
                        raise Exception(f"API Error: {data.get('message')}")

    return {"answer": full_answer}


def chat_blocking(query: str, user: str, conversation_id: str = ""):
    """ブロッキングモードでチャットメッセージを送信"""
    url = f"{DIFY_BASE_URL}/v1/chat-messages"
    headers = {
        "Authorization": f"Bearer {APP_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "inputs": {},
        "query": query,
        "user": user,
        "conversation_id": conversation_id,
        "response_mode": "blocking",
    }

    resp = requests.post(url, headers=headers, json=payload)
    resp.raise_for_status()
    return resp.json()

7. 阻塞与流式传输选择标准

观点封锁流媒体
回复格式完成后一次性全部返回通过上交所顺序交割
用户体验仅加载显示打字动画风格
超时风险生成长文本时高低(由于顺序交付)
实施复杂性中(需要 SSE 解析器)
推荐场景批处理、API协作聊天UI,实时显示
可被打断不可能(等待完成)可以用 POST /stop 中断

推荐:始终在面向用户的聊天界面中使用流式传输。如果您需要后端 API 到 API 集成中的完整结果,则 阻塞 适合。


8. 错误处理

8.1 主要错误代码

HTTP 状态错误代码说明行动
400invalid_param无效参数检查请求正文
401401 unauthorized身份验证错误检查 API 密钥
403403 forbidden无访问权限检查应用程序的公共设置
404404 not_found资源不存在检查端点ID
429429 rate_limit_exceeded超出速率限制增加重试间隔
500500 internal_server_error服务器错误检查日志/重试
400400 provider_not_initializeLLM 提供商未配置检查模型提供商配置
400400 model_currently_not_support当前模型不支持该操作切换支持的模型
400400 completion_request_error模型调用错误检查日志 / 重试
400400 provider_quota_exceeded提供商配额超限调整配额或切换提供商

8.2 重试策略

import time
import requests
from requests.exceptions import RequestException

def call_dify_api_with_retry(url, headers, payload, max_retries=3):
    """指数バックオフ付きリトライ"""
    for attempt in range(max_retries):
        try:
            resp = requests.post(url, headers=headers, json=payload, timeout=60)

            if resp.status_code == 429:
                retry_after = int(resp.headers.get("Retry-After", 10))
                print(f"Rate limited. Retrying after {retry_after}s...")
                time.sleep(retry_after)
                continue

            if resp.status_code >= 500:
                wait = 2 ** attempt  # 指数バックオフ: 1, 2, 4秒
                print(f"Server error {resp.status_code}. Retrying in {wait}s...")
                time.sleep(wait)
                continue

            resp.raise_for_status()
            return resp.json()

        except RequestException as e:
            if attempt == max_retries - 1:
                raise
            wait = 2 ** attempt
            print(f"Request failed: {e}. Retrying in {wait}s...")
            time.sleep(wait)

    raise Exception(f"Failed after {max_retries} retries")

9. Webhook 回调

9.1 Workflow的Webhook集成

您可以使用工作流的 HTTP 请求节点将处理结果通知外部系统。

HTTP请求节点配置示例:

method: POST
url: "https://your-system.example.com/webhook/dify-callback"
headers:
  Content-Type: "application/json"
  X-Webhook-Secret: "{{webhook_secret}}"
body:
  type: json
  content: |
    {
      "event": "workflow_completed",
      "workflow_id": "{{workflow_id}}",
      "result": {{workflow_output}},
      "timestamp": "{{current_timestamp}}",
      "status": "success"
    }

9.2 Webhook 接收器实现示例 (Node.js / Express)

const express = require("express");
const crypto = require("crypto");

const app = express();
app.use(express.json());

const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET;

function verifySignature(payload, signature) {
  const expected = crypto
    .createHmac("sha256", WEBHOOK_SECRET)
    .update(JSON.stringify(payload))
    .digest("hex");
  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(expected)
  );
}

app.post("/webhook/dify-callback", (req, res) => {
  const signature = req.headers["x-webhook-secret"];

  if (signature !== WEBHOOK_SECRET) {
    return res.status(401).json({ error: "Invalid signature" });
  }

  const { event, workflow_id, result, status } = req.body;
  console.log(`Received webhook: ${event} for workflow ${workflow_id}`);

  // ビジネスロジック: 結果をDBに保存、通知送信など
  processWorkflowResult(result, status);

  res.status(200).json({ received: true });
});

function processWorkflowResult(result, status) {
  // 例: Slack通知、DB保存、後続処理のトリガーなど
  console.log("Processing result:", result);
}

app.listen(3000, () => console.log("Webhook server running on port 3000"));

10. 实现模式集合

10.1 LINE Bot 合作

# LINE Bot から Dify Chat API を呼び出すパターン
from flask import Flask, request
from linebot import LineBotApi, WebhookHandler
from linebot.models import MessageEvent, TextMessage, TextSendMessage
import requests

app = Flask(__name__)
line_bot_api = LineBotApi("LINE_CHANNEL_ACCESS_TOKEN")
handler = WebhookHandler("LINE_CHANNEL_SECRET")

DIFY_API_URL = "https://your-dify-instance.com/v1/chat-messages"
DIFY_API_KEY = "app-xxxxxxxxxxxxxxxxxxxx"

# ユーザーごとの会話IDを管理
conversation_store = {}

@app.route("/callback", methods=["POST"])
def callback():
    handler.handle(request.get_data(as_text=True),
                   request.headers["X-Line-Signature"])
    return "OK"

@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
    user_id = event.source.user_id
    conv_id = conversation_store.get(user_id, "")

    resp = requests.post(
        DIFY_API_URL,
        headers={
            "Authorization": f"Bearer {DIFY_API_KEY}",
            "Content-Type": "application/json",
        },
        json={
            "inputs": {},
            "query": event.message.text,
            "user": user_id,
            "conversation_id": conv_id,
            "response_mode": "blocking",
        },
    )
    data = resp.json()

    conversation_store[user_id] = data.get("conversation_id", "")

    line_bot_api.reply_message(
        event.reply_token,
        TextSendMessage(text=data["answer"])
    )

10.2 批处理模式

import csv
import json
import time
import requests

DIFY_API_URL = "https://your-dify-instance.com/v1/workflows/run"
DIFY_API_KEY = "app-xxxxxxxxxxxxxxxxxxxx"

def process_batch(input_csv: str, output_csv: str):
    """CSVファイルの各行に対してWorkflowを実行するバッチ処理"""
    results = []

    with open(input_csv, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            print(f"Processing row {i + 1}: {row.get('title', 'N/A')}")

            resp = requests.post(
                DIFY_API_URL,
                headers={
                    "Authorization": f"Bearer {DIFY_API_KEY}",
                    "Content-Type": "application/json",
                },
                json={
                    "inputs": {
                        "document_text": row["content"],
                        "analysis_type": row.get("type", "要約"),
                    },
                    "user": "batch-processor",
                    "response_mode": "blocking",
                },
            )

            if resp.status_code == 200:
                data = resp.json()
                results.append({
                    "input_title": row.get("title", ""),
                    "output": data["data"]["outputs"].get("final_result", ""),
                    "status": "success",
                    "tokens": data["data"].get("total_tokens", 0),
                })
            else:
                results.append({
                    "input_title": row.get("title", ""),
                    "output": "",
                    "status": f"error: {resp.status_code}",
                    "tokens": 0,
                })

            # レート制限対策: リクエスト間に待機
            time.sleep(1)

    # 結果をCSVに出力
    with open(output_csv, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["input_title", "output", "status", "tokens"])
        writer.writeheader()
        writer.writerows(results)

    print(f"Batch complete. Results saved to {output_csv}")

11. 安全和运营

11.1 安全检查表

  • API Key 没有硬编码在源代码中
  • 使用 HTTPS 通信
  • CORS 设置已正确配置
  • 已建立 API 密钥轮换政策。
  • 已设置速率限制
  • 实施输入验证
  • 错误响应中不泄露内部信息

11.2 监控项目

项目指标大约警报阈值
响应时间P95 延迟> 30 秒
错误率5xx 错误百分比> 5%
代币使用每日代币消费预算的 80%
API 调用次数每小时请求数速率限制的 80%
对话次数活跃对话数量80% 的容量

12. 总结

下面总结了将 Dify API 集成到您的业务系统时需要记住的要点。

观点要点
端点选择根据应用程序类型(聊天/完成/工作流程)使用正确的端点
认证API 密钥通过环境变量进行管理并定期轮换
响应模式用于用户 UI 的流式传输,用于后端协调的阻塞
错误处理使用指数退避和适当的回退进行重试
安全HTTPS、CORS、输入验证、日志监控

API集成的目标不是能够输入curl命令,而是确保Dify应用程序作为实际业务系统的一部分稳定运行。遵循上面列出的实施模式和最佳实践以确保稳健的集成。