流式响应协议
Knowledge 服务采用 SSE (Server-Sent Events) 实现流式响应,实时输出生成内容。
响应流结构
每个 SSE 事件包含 type 和 content 两个字段:
json
{
"type": "content",
"content": "分布式锁是"
}响应类型枚举
完整的流式响应按以下顺序输出:
响应类型说明
| 类型 | 说明 | 出现次数 | content 格式 |
|---|---|---|---|
conversationId | 会话 ID | 1 | 字符串 |
userMessageId | 用户消息 ID | 1 | 字符串 |
assistantMessageId | 助手消息 ID | 1 | 字符串 |
hybridSearchWarning | 混合检索警告 | 0-1 | 警告信息 |
referencedDocs | 引用文档列表 | 1 | JSON 数组 |
thinking | 思考过程 (深度思考) | 0-N | 字符串 |
content | 回答内容 | 0-N | 字符串 |
tokenUsage | Token 使用量 | 1 | JSON 对象 |
done | 结束标记 | 1 | 空字符串 |
error | 错误信息 | 0-1 | 错误描述 |
empty | 空响应 (异常场景) | 0-1 | - |
notLogin | 未登录错误 | 0-1 | - |
title | AI 生成的会话标题 | 0-1 | 字符串 |
响应流示例
标准问答
data: {"type":"conversationId","content":"1"}
data: {"type":"userMessageId","content":"100"}
data: {"type":"assistantMessageId","content":"101"}
data: {"type":"referencedDocs","content":"[{\"documentId\":1,\"title\":\"分布式锁指南\",\"score\":0.85}]"}
data: {"type":"content","content":"分布式锁是"}
data: {"type":"content","content":"分布式系统中用于"}
data: {"type":"content","content":"协调多个节点访问共享资源的机制。"}
data: {"type":"tokenUsage","content":"{\"promptTokens\":150,\"completionTokens\":80}"}
data: {"type":"done","content":""}启用深度思考
data: {"type":"conversationId","content":"1"}
data: {"type":"userMessageId","content":"100"}
data: {"type":"assistantMessageId","content":"101"}
data: {"type":"referencedDocs","content":"[...]"}
data: {"type":"thinking","content":"用户问的是分布式锁的高可用..."}
data: {"type":"thinking","content":"我需要考虑以下几个方面..."}
data: {"type":"content","content":"分布式锁保证高可用需要..."}
data: {"type":"content","content":"..."}
data: {"type":"tokenUsage","content":"{\"promptTokens\":200,\"completionTokens\":120}"}
data: {"type":"done","content":""}混合检索警告
data: {"type":"conversationId","content":"1"}
data: {"type":"userMessageId","content":"100"}
data: {"type":"assistantMessageId","content":"101"}
data: {"type":"hybridSearchWarning","content":"关键词检索不可用,仅使用向量检索"}
data: {"type":"referencedDocs","content":"[...]"}
data: {"type":"content","content":"..."}
data: {"type":"done","content":""}错误响应
data: {"type":"error","content":"检索失败: Embedding API 不可用"}前端处理示例
基础处理
javascript
const eventSource = new EventSource('/kl/rag/chat', {
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: '什么是分布式锁?',
libraryIds: [1]
})
});
eventSource.onmessage = (event) => {
const response = JSON.parse(event.data);
switch (response.type) {
case 'conversationId':
console.log('会话 ID:', response.content);
break;
case 'referenced_docs':
const docs = JSON.parse(response.content);
renderReferencedDocs(docs);
break;
case 'thinking':
appendThinking(response.content);
break;
case 'content':
appendContent(response.content);
break;
case 'token_usage':
const usage = JSON.parse(response.content);
console.log('Token 使用:', usage);
break;
case 'done':
console.log('回答完成');
eventSource.close();
break;
case 'error':
console.error('错误:', response.content);
eventSource.close();
break;
}
};完整实现 (Vue 示例)
vue
<script setup>
import { ref } from 'vue';
const content = ref('');
const thinking = ref('');
const referencedDocs = ref([]);
const tokenUsage = ref(null);
async function chat(message) {
const response = await fetch('/kl/rag/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, libraryIds: [1] })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
switch (data.type) {
case 'content':
content.value += data.content;
break;
case 'thinking':
thinking.value += data.content;
break;
case 'referenced_docs':
referencedDocs.value = JSON.parse(data.content);
break;
case 'token_usage':
tokenUsage.value = JSON.parse(data.content);
break;
}
}
}
}
}
</script>引用文档格式
referencedDocs 的 content 为 JSON 数组:
json
[
{
"documentId": 1,
"libraryId": 1,
"title": "分布式锁指南",
"source": "https://example.com/doc/1",
"score": 0.85,
"fileType": "md",
"sectionTitle": "什么是分布式锁",
"highlightRanges": [
{ "start": 0, "end": 85 }
]
}
]字段说明
| 字段 | 说明 |
|---|---|
documentId | 文档 ID |
libraryId | 知识库 ID |
title | 文档标题 |
source | 来源链接 |
score | 相似度分数 |
fileType | 文件类型 |
sectionTitle | 章节标题 |
highlightRanges | 高亮区间 (用于原文定位) |
Token 使用量
tokenUsage 的 content 为 JSON 对象:
json
{
"promptTokens": 150,
"completionTokens": 80,
"totalTokens": 230
}字段说明
| 字段 | 说明 |
|---|---|
promptTokens | 输入 Token 数 (Prompt) |
completionTokens | 输出 Token 数 (回答) |
totalTokens | 总 Token 数 |
错误处理
常见错误
| 错误类型 | 说明 |
|---|---|
error | 检索或生成失败 |
notLogin | 用户未登录 |
empty | 空响应 (异常场景) |
错误处理示例
javascript
eventSource.onmessage = (event) => {
const response = JSON.parse(event.data);
if (response.type === 'error') {
showError(response.content);
eventSource.close();
return;
}
if (response.type === 'notLogin') {
redirectToLogin();
eventSource.close();
return;
}
// ... 正常处理
};注意事项
1. SSE 连接关闭
- 收到
done类型后主动关闭连接 - 收到
error类型后关闭连接 - 超时自动关闭
2. 内容拼接
content 类型可能多次输出,需要拼接:
javascript
let fullContent = '';
if (response.type === 'content') {
fullContent += response.content; // 拼接
}3. 思考内容展示
深度思考模式下,thinking 和 content 交替输出:
javascript
let isThinking = false;
if (response.type === 'thinking') {
isThinking = true;
thinkingContent += response.content;
} else if (response.type === 'content') {
isThinking = false;
content += response.content;
}下一步
- 了解 Standard RAG 的完整流程
- 返回 RAG 问答概览