Skip to content

流式响应协议

Knowledge 服务采用 SSE (Server-Sent Events) 实现流式响应,实时输出生成内容。

响应流结构

每个 SSE 事件包含 typecontent 两个字段:

json
{
  "type": "content",
  "content": "分布式锁是"
}

响应类型枚举

完整的流式响应按以下顺序输出:

响应类型说明

类型说明出现次数content 格式
conversationId会话 ID1字符串
userMessageId用户消息 ID1字符串
assistantMessageId助手消息 ID1字符串
hybridSearchWarning混合检索警告0-1警告信息
referencedDocs引用文档列表1JSON 数组
thinking思考过程 (深度思考)0-N字符串
content回答内容0-N字符串
tokenUsageToken 使用量1JSON 对象
done结束标记1空字符串
error错误信息0-1错误描述
empty空响应 (异常场景)0-1-
notLogin未登录错误0-1-
titleAI 生成的会话标题0-1字符串

响应流示例

标准问答

data: {"type":"conversationId","content":"1"}

data: {"type":"userMessageId","content":"100"}

data: {"type":"assistantMessageId","content":"101"}

data: {"type":"referencedDocs","content":"[{\"documentId\":1,\"title\":\"分布式锁指南\",\"score\":0.85}]"}

data: {"type":"content","content":"分布式锁是"}
data: {"type":"content","content":"分布式系统中用于"}
data: {"type":"content","content":"协调多个节点访问共享资源的机制。"}

data: {"type":"tokenUsage","content":"{\"promptTokens\":150,\"completionTokens\":80}"}

data: {"type":"done","content":""}

启用深度思考

data: {"type":"conversationId","content":"1"}

data: {"type":"userMessageId","content":"100"}

data: {"type":"assistantMessageId","content":"101"}

data: {"type":"referencedDocs","content":"[...]"}

data: {"type":"thinking","content":"用户问的是分布式锁的高可用..."}
data: {"type":"thinking","content":"我需要考虑以下几个方面..."}

data: {"type":"content","content":"分布式锁保证高可用需要..."}
data: {"type":"content","content":"..."}

data: {"type":"tokenUsage","content":"{\"promptTokens\":200,\"completionTokens\":120}"}

data: {"type":"done","content":""}

混合检索警告

data: {"type":"conversationId","content":"1"}

data: {"type":"userMessageId","content":"100"}

data: {"type":"assistantMessageId","content":"101"}

data: {"type":"hybridSearchWarning","content":"关键词检索不可用,仅使用向量检索"}

data: {"type":"referencedDocs","content":"[...]"}

data: {"type":"content","content":"..."}

data: {"type":"done","content":""}

错误响应

data: {"type":"error","content":"检索失败: Embedding API 不可用"}

前端处理示例

基础处理

javascript
const eventSource = new EventSource('/kl/rag/chat', {
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ 
    message: '什么是分布式锁?', 
    libraryIds: [1] 
  })
});

eventSource.onmessage = (event) => {
  const response = JSON.parse(event.data);
  
  switch (response.type) {
    case 'conversationId':
      console.log('会话 ID:', response.content);
      break;
      
    case 'referenced_docs':
      const docs = JSON.parse(response.content);
      renderReferencedDocs(docs);
      break;
      
    case 'thinking':
      appendThinking(response.content);
      break;
      
    case 'content':
      appendContent(response.content);
      break;
      
    case 'token_usage':
      const usage = JSON.parse(response.content);
      console.log('Token 使用:', usage);
      break;
      
    case 'done':
      console.log('回答完成');
      eventSource.close();
      break;
      
    case 'error':
      console.error('错误:', response.content);
      eventSource.close();
      break;
  }
};

完整实现 (Vue 示例)

vue
<script setup>
import { ref } from 'vue';

const content = ref('');
const thinking = ref('');
const referencedDocs = ref([]);
const tokenUsage = ref(null);

async function chat(message) {
  const response = await fetch('/kl/rag/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message, libraryIds: [1] })
  });
  
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const text = decoder.decode(value);
    const lines = text.split('\n');
    
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));
        
        switch (data.type) {
          case 'content':
            content.value += data.content;
            break;
          case 'thinking':
            thinking.value += data.content;
            break;
          case 'referenced_docs':
            referencedDocs.value = JSON.parse(data.content);
            break;
          case 'token_usage':
            tokenUsage.value = JSON.parse(data.content);
            break;
        }
      }
    }
  }
}
</script>

引用文档格式

referencedDocscontent 为 JSON 数组:

json
[
  {
    "documentId": 1,
    "libraryId": 1,
    "title": "分布式锁指南",
    "source": "https://example.com/doc/1",
    "score": 0.85,
    "fileType": "md",
    "sectionTitle": "什么是分布式锁",
    "highlightRanges": [
      { "start": 0, "end": 85 }
    ]
  }
]

字段说明

字段说明
documentId文档 ID
libraryId知识库 ID
title文档标题
source来源链接
score相似度分数
fileType文件类型
sectionTitle章节标题
highlightRanges高亮区间 (用于原文定位)

Token 使用量

tokenUsagecontent 为 JSON 对象:

json
{
  "promptTokens": 150,
  "completionTokens": 80,
  "totalTokens": 230
}

字段说明

字段说明
promptTokens输入 Token 数 (Prompt)
completionTokens输出 Token 数 (回答)
totalTokens总 Token 数

错误处理

常见错误

错误类型说明
error检索或生成失败
notLogin用户未登录
empty空响应 (异常场景)

错误处理示例

javascript
eventSource.onmessage = (event) => {
  const response = JSON.parse(event.data);
  
  if (response.type === 'error') {
    showError(response.content);
    eventSource.close();
    return;
  }
  
  if (response.type === 'notLogin') {
    redirectToLogin();
    eventSource.close();
    return;
  }
  
  // ... 正常处理
};

注意事项

1. SSE 连接关闭

  • 收到 done 类型后主动关闭连接
  • 收到 error 类型后关闭连接
  • 超时自动关闭

2. 内容拼接

content 类型可能多次输出,需要拼接:

javascript
let fullContent = '';
if (response.type === 'content') {
  fullContent += response.content;  // 拼接
}

3. 思考内容展示

深度思考模式下,thinkingcontent 交替输出:

javascript
let isThinking = false;
if (response.type === 'thinking') {
  isThinking = true;
  thinkingContent += response.content;
} else if (response.type === 'content') {
  isThinking = false;
  content += response.content;
}

下一步