向量化存储
分片完成后,进行向量化处理并存储到向量数据库。
向量化流程
详细步骤
- 读取分片 — 从数据库读取状态为
PENDING的分片 - 批量处理 — 按
batch-size分批处理,避免单次请求过大 - 构建元数据 — 为每个分片构建丰富的元数据信息
- 生成向量 — 调用 Embedding API 将文本转换为向量
- 存储向量 — 存入 PgVector 向量数据库
- 更新状态 — 更新分片状态为
VECTORIZED,记录向量 ID - 同步索引 — 可选:同步到 Elasticsearch 关键词索引
元数据构建
为每个分片构建丰富的元数据,支持检索时的过滤:
java
// 伪代码 - 元数据结构
{
"library_id": "1", // 知识库 ID (过滤必需)
"document_id": "10", // 文档 ID (过滤必需)
"chunk_index": 0, // 分片位置序号
"author": "张三", // 作者 (过滤)
"category": "技术文档", // 分类 (过滤)
"tags": "分布式,锁", // 标签 (过滤,IN 查询)
"section_path": "第一章 > 1.1 背景", // 章节路径
"charStartIndex": 0, // 原文起始位置
"charEndIndex": 500 // 原文结束位置
}元数据用途
| 字段 | 用途 |
|---|---|
library_id | 按知识库过滤检索范围 |
document_id | 按文档过滤,支持单文档检索 |
author | 按作者过滤 |
category | 按分类过滤 |
tags | 按标签过滤 (支持多标签 IN 查询) |
批量向量化
为提高效率,采用批量向量化:
yaml
molandev:
rag:
embedding:
# 批处理大小
batch-size: 10
# 批次间延迟 (毫秒), 避免 API 限流
batch-delay-ms: 100处理流程
java
// 伪代码 - 批量向量化
public int[] embedAndUpdateStatus(List<KlDocumentChunkEntity> chunks) {
int vectorCount = 0;
// 分批处理
for (List<KlDocumentChunkEntity> batch : partition(chunks, batchSize)) {
// 1. 构建 SearchDocument 列表
List<SearchDocument> documents = buildSearchDocuments(batch);
// 2. 批量添加到向量库 (内部调用 Embedding API)
vectorSearchService.add(documents);
// 3. 更新分片状态为 VECTORIZED
for (KlDocumentChunkEntity chunk : batch) {
chunk.setStatus("VECTORIZED");
chunk.setVectorId(...); // 记录向量 ID
}
documentChunkService.saveBatch(batch);
vectorCount += batch.size();
// 4. 批次间延迟
Thread.sleep(batchDelayMs);
}
return new int[]{chunks.size(), vectorCount};
}为什么需要批量处理?
- 减少 API 调用次数 — 一次请求多个文本,降低网络开销
- 避免 API 限流 — 批次间延迟,控制请求频率
- 提升吞吐量 — 并行处理向量生成
PgVector 存储
技术选型
Knowledge 使用 Spring AI 的 VectorStore 抽象,通过 PgVectorStore 实现 PostgreSQL 向量存储。
向量表结构
PgVector 会自动创建以下结构:
sql
-- Spring AI 自动创建的表
CREATE TABLE vector_store (
id UUID PRIMARY KEY, -- 向量 ID
content TEXT, -- 文本内容
metadata JSONB, -- 元数据 (JSON 格式)
embedding VECTOR(1536) -- 向量 (维度数可配置)
);
-- HNSW 索引 (加速相似度搜索)
CREATE INDEX ON vector_store
USING hnsw (embedding vector_cosine_ops);向量维度
向量维度需与 Embedding 模型匹配:
| Embedding 模型 | 维度 | 配置值 |
|---|---|---|
| OpenAI text-embedding-3-small | 1536 | dimensions: 1536 |
| 通义千问 text-embedding-v3 | 1024/1536 | 按需选择 |
| Ollama nomic-embed-text | 768 | dimensions: 768 |
yaml
spring:
ai:
vectorstore:
pgvector:
dimensions: 1536 # 需与模型匹配同步到关键词索引 (可选)
如果启用了 Elasticsearch 或 Lucene,向量化完成后会同步到关键词索引:
java
// 伪代码 - 同步到 ES
if (elasticsearchEnabled) {
chunkEsService.syncChunks(savedChunks);
}
if (luceneEnabled) {
chunkLuceneService.syncChunks(savedChunks);
}同步内容
json
{
"chunk_id": "uuid-001",
"library_id": "1",
"document_id": "10",
"content": "分布式锁是分布式系统中用于协调多个节点访问共享资源的机制...",
"section_title": "什么是分布式锁",
"author": "张三",
"category": "技术文档",
"tags": "分布式,锁"
}用于 混合检索 中的关键词检索部分。
向量化完成
向量化完成后,文档状态更新:
json
{
"id": 1,
"title": "分布式锁指南",
"status": "SUCCESS",
"stage": "VECTORIZE",
"chunkCount": 15,
"vectorCount": 15, // 全部向量化完成
"mdFilePath": "/data/knowledge/1/distributed-lock.md"
}此时文档已可用于检索和问答。
向量管理
删除文档向量
删除文档时,同步删除向量:
java
vectorSearchService.deleteByDocumentId(documentId);删除知识库向量
删除知识库时,批量删除所有向量:
java
vectorSearchService.deleteByLibraryId(libraryId);性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
batch-size | 10-20 | 根据 API 限制调整 |
batch-delay-ms | 100-500 | 避免 API 限流 |
max-concurrent-tasks | 3-5 | 控制并发向量化任务 |
| HNSW 索引 | 必建 | 大幅提升检索速度 |