
RAGFlow Embedding 实现机制
概述
RAGFlow 的 embedding 系统采用了多提供商统一接口的设计模式,支持 20+ 种不同的 embedding 提供商和模型,包括本地模型和云端 API 服务。该系统为 RAG(检索增强生成)提供了高质量的文本向量化能力。
系统架构
核心组件
主要文件位置:
rag/llm/embedding_model.py
- 核心实现文件,包含所有 embedding 模型类rag/llm/__init__.py
- 模型注册和工厂类定义rag/svr/task_executor.py
- embedding 处理逻辑和任务执行api/db/services/llm_service.py
- 模型服务和配置管理
设计模式
统一接口层 (Base ABC)
↓
多提供商实现层 (OpenAIEmbed, DefaultEmbedding, etc.)
↓
工厂注册层 (EmbeddingModel Dictionary)
↓
服务管理层 (LLMBundle, TenantLLMService)
↓
任务执行层 (TaskExecutor)
支持的 Embedding 提供商
RAGFlow 支持以下 embedding 提供商:
EmbeddingModel = {
"BAAI": DefaultEmbedding, # 默认本地模型 (bge-large-zh-v1.5)
"OpenAI": OpenAIEmbed, # OpenAI text-embedding
"Azure-OpenAI": AzureEmbed, # Azure OpenAI
"Tongyi-Qianwen": QWenEmbed, # 阿里通义千问
"ZHIPU-AI": ZhipuEmbed, # 智谱 AI
"Ollama": OllamaEmbed, # Ollama 本地服务
"LocalAI": LocalAIEmbed, # LocalAI
"Xinference": XinferenceEmbed, # Xinference
"FastEmbed": FastEmbed, # FastEmbed 优化版本
"Youdao": YoudaoEmbed, # 有道 BCE embedding
"BaiChuan": BaiChuanEmbed, # 百川智能
"Jina": JinaEmbed, # Jina AI
"Mistral": MistralEmbed, # Mistral
"Bedrock": BedrockEmbed, # AWS Bedrock
"Gemini": GeminiEmbed, # Google Gemini
"NVIDIA": NvidiaEmbed, # NVIDIA
"LM-Studio": LmStudioEmbed, # LM Studio
"OpenAI-API-Compatible": OpenAI_APIEmbed, # 兼容 OpenAI API
"Cohere": CoHereEmbed, # Cohere
"TogetherAI": TogetherAIEmbed, # Together AI
"PerfXCloud": PerfXCloudEmbed, # PerfXCloud
"Upstage": UpstageEmbed, # Upstage
"SILICONFLOW": SILICONFLOWEmbed, # 硅基流动
"Replicate": ReplicateEmbed, # Replicate
"BaiduYiyan": BaiduYiyanEmbed, # 百度文心一言
"Voyage AI": VoyageEmbed, # Voyage AI
"HuggingFace": HuggingFaceEmbed, # HuggingFace
"VolcEngine": VolcEngineEmbed, # 火山引擎
}
核心实现
1. 统一接口设计
所有 embedding 模型都继承自 Base
抽象基类:
class Base(ABC):
def __init__(self, key, model_name):
pass
def encode(self, texts: list):
"""批量文本向量化
Args:
texts: 文本列表
Returns:
tuple: (向量数组, token数量)
"""
raise NotImplementedError("Please implement encode method!")
def encode_queries(self, text: str):
"""单个查询文本向量化
Args:
text: 查询文本
Returns:
tuple: (向量数组, token数量)
"""
raise NotImplementedError("Please implement encode method!")
2. 默认 Embedding 模型实现
DefaultEmbedding 类是 RAGFlow 的默认本地 embedding 实现:
class DefaultEmbedding(Base):
_model = None
_model_name = ""
_model_lock = threading.Lock()
def __init__(self, key, model_name, **kwargs):
if not settings.LIGHTEN:
with DefaultEmbedding._model_lock:
from FlagEmbedding import FlagModel
import torch
if not DefaultEmbedding._model or model_name != DefaultEmbedding._model_name:
try:
# 使用本地模型
DefaultEmbedding._model = FlagModel(
model_path,
query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
use_fp16=torch.cuda.is_available()
)
DefaultEmbedding._model_name = model_name
except Exception:
# 自动从 HuggingFace 下载 BAAI/bge-large-zh-v1.5
model_dir = snapshot_download(
repo_id="BAAI/bge-large-zh-v1.5",
local_dir=local_path,
local_dir_use_symlinks=False
)
DefaultEmbedding._model = FlagModel(model_dir, ...)
def encode(self, texts: list):
batch_size = 16
texts = [truncate(t, 2048) for t in texts] # 文本截断
token_count = sum(num_tokens_from_string(t) for t in texts)
ress = []
for i in range(0, len(texts), batch_size):
ress.extend(self._model.encode(texts[i:i + batch_size]).tolist())
return np.array(ress), token_count
def encode_queries(self, text: str):
token_count = num_tokens_from_string(text)
return self._model.encode_queries([text]).tolist()[0], token_count
3. OpenAI Embedding 实现示例
class OpenAIEmbed(Base):
def __init__(self, key, model_name="text-embedding-ada-002", base_url="https://api.openai.com/v1"):
if not base_url:
base_url = "https://api.openai.com/v1"
self.client = OpenAI(api_key=key, base_url=base_url)
self.model_name = model_name
def encode(self, texts: list):
batch_size = 16 # OpenAI 要求批量大小 ≤16
texts = [truncate(t, 8191) for t in texts] # 文本长度限制
ress = []
total_tokens = 0
for i in range(0, len(texts), batch_size):
res = self.client.embeddings.create(
input=texts[i:i + batch_size],
model=self.model_name
)
ress.extend([d.embedding for d in res.data])
total_tokens += res.usage.total_tokens
return np.array(ress), total_tokens
def encode_queries(self, text):
res = self.client.embeddings.create(
input=[truncate(text, 8191)],
model=self.model_name
)
return np.array(res.data[0].embedding), res.usage.total_tokens
Embedding 处理流程
文档向量化流程
在文档解析和切块后,系统会对文档进行向量化处理:
def embedding(docs, mdl, parser_config=None, callback=None):
"""文档 embedding 处理函数
Args:
docs: 文档chunks列表
mdl: embedding模型实例
parser_config: 解析配置
callback: 回调函数
"""
if parser_config is None:
parser_config = {}
# 1. 提取标题和内容
tts = [d["title_tks"] for d in docs if d.get("title_tks")]
cnts = [d["content_with_weight"] for d in docs]
# 2. 配置参数
batch_size = parser_config.get("embedding_batch_size", 32)
vctr_nm = f"q_{vector_size}_vec"
title_vctr_nm = f"q_{vector_size}_title_vec"
# 3. 批量处理标题向量化
tts_ = np.array([])
if tts:
for i in range(0, len(tts), batch_size):
vts, c = mdl.encode(tts[i: i + batch_size]) # 核心调用
if len(tts_) == 0:
tts_ = vts
else:
tts_ = np.concatenate((tts_, vts), axis=0)
if callback:
callback(msg="Embedding titles...")
# 4. 批量处理内容向量化
cnts_ = np.array([])
for i in range(0, len(cnts), batch_size):
vts, c = mdl.encode(cnts[i: i + batch_size]) # 核心调用
if len(cnts_) == 0:
cnts_ = vts
else:
cnts_ = np.concatenate((cnts_, vts), axis=0)
if callback:
callback(msg=f"Embedding contents({i + batch_size}/{len(cnts)})...")
# 5. 向量存储到文档
for i, d in enumerate(docs):
if i < len(cnts_):
d[vctr_nm] = cnts_[i].tolist()
if i < len(tts_):
d[title_vctr_nm] = tts_[i].tolist()
return docs
模型实例化机制
LLMBundle 获取模型实例:
@classmethod
def model_instance(cls, tenant_id, llm_type, llm_name=None, lang="Chinese"):
"""获取模型实例
Args:
tenant_id: 租户ID
llm_type: 模型类型 (EMBEDDING, CHAT, etc.)
llm_name: 模型名称
lang: 语言设置
Returns:
模型实例
"""
# 1. 获取租户配置
e, tenant = TenantService.get_by_id(tenant_id)
if not e:
raise LookupError("Tenant not found")
# 2. 确定模型名称
if llm_type == LLMType.EMBEDDING.value:
mdlnm = tenant.embd_id if not llm_name else llm_name
elif llm_type == LLMType.SPEECH2TEXT.value:
mdlnm = tenant.asr_id
elif llm_type == LLMType.IMAGE2TEXT.value:
mdlnm = tenant.img2txt_id if not llm_name else llm_name
else:
mdlnm = tenant.llm_id if not llm_name else llm_name
# 3. 获取 API 配置
llm = TenantLLMService.get_api_key(tenant_id, mdlnm)
if not llm:
raise LookupError("LLM not found")
# 4. 解析模型名称和工厂
mdl_nm, fctry = TenantLLMService.split_model_name_and_factory(mdlnm)
if fctry:
mdl_nm = mdlnm.replace(f"@{fctry}", "")
else:
fctry = llm.llm_factory
# 5. 实例化具体的 embedding 模型
if llm_type == LLMType.EMBEDDING.value:
mdl_class = EmbeddingModel.get(fctry)
if not mdl_class:
raise LookupError(f"Embedding model factory {fctry} not found")
return mdl_class(
key=llm.api_key,
model_name=mdl_nm,
base_url=llm.api_base
)
高级特性
1. 性能优化
批量处理:
# 支持批量向量化,提升处理效率
batch_size = parser_config.get("embedding_batch_size", 32)
for i in range(0, len(texts), batch_size):
vectors, tokens = model.encode(texts[i:i + batch_size])
线程安全:
# 使用锁机制确保模型加载的线程安全
_model_lock = threading.Lock()
with DefaultEmbedding._model_lock:
# 模型初始化代码
单例模式:
# 避免重复加载模型,节省内存
class DefaultEmbedding(Base):
_model = None # 类级别共享模型实例
_model_name = ""
2. 自动化特性
模型自动下载:
try:
# 尝试使用本地模型
model = FlagModel(local_path)
except Exception:
# 自动从 HuggingFace 下载
model_dir = snapshot_download(
repo_id="BAAI/bge-large-zh-v1.5",
local_dir=local_path,
local_dir_use_symlinks=False
)
model = FlagModel(model_dir)
错误重试机制:
for i in range(100000): # 重试机制
try:
outputs = self.predictor.run(None, input_dict)
break
except Exception as e:
if i >= 3:
raise e
time.sleep(5)
3. 文本处理优化
智能截断:
# 根据不同模型的限制进行文本截断
texts = [truncate(t, 2048) for t in texts] # BAAI 模型
texts = [truncate(t, 8191) for t in texts] # OpenAI 模型
Token 计数:
# 精确的 token 使用量统计
token_count = sum(num_tokens_from_string(t) for t in texts)
return vectors, token_count
向量存储集成
Elasticsearch 集成
RAGFlow 默认使用 Elasticsearch 作为向量存储引擎:
# 向量字段映射配置
"q_768_vec": {
"type": "dense_vector",
"dims": 768,
"index": true,
"similarity": "cosine"
}
Infinity 集成
可选的高性能向量数据库:
# 切换到 Infinity
DOC_ENGINE=infinity # 在 docker/.env 中配置
混合检索支持
支持文本检索和向量检索的融合:
class FusionExpr:
"""融合检索表达式"""
def __init__(self, text_expr, dense_expr, fusion_type="rrf"):
self.text_expr = text_expr # 文本检索
self.dense_expr = dense_expr # 向量检索
self.fusion_type = fusion_type # 融合方式
使用示例
基本用法
from rag.llm import EmbeddingModel
from api.db.services.llm_service import LLMBundle
from api.db import LLMType
# 1. 获取 embedding 模型实例
embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING)
# 2. 批量向量化文本
texts = ["这是一段测试文本", "另一段文本", "第三段文本"]
vectors, token_count = embd_mdl.encode(texts)
print(f"向量维度: {vectors.shape}") # (3, 1024)
print(f"使用Token: {token_count}")
# 3. 查询向量化
query = "查询文本"
query_vector, tokens = embd_mdl.encode_queries(query)
print(f"查询向量: {query_vector.shape}") # (1024,)
自定义配置
# 使用特定的 embedding 提供商
from rag.llm.embedding_model import OpenAIEmbed
# 实例化 OpenAI embedding 模型
openai_embed = OpenAIEmbed(
key="your-api-key",
model_name="text-embedding-3-large",
base_url="https://api.openai.com/v1"
)
# 向量化处理
vectors, tokens = openai_embed.encode(["test text"])
在文档处理中的应用
# 在文档解析流程中使用
def process_documents(docs, tenant_id):
# 1. 获取 embedding 模型
embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING)
# 2. 配置参数
parser_config = {
"embedding_batch_size": 32,
"chunk_token_count": 128
}
# 3. 执行 embedding
embedded_docs = embedding(docs, embd_mdl, parser_config)
# 4. 存储到向量数据库
for doc in embedded_docs:
# doc 包含向量字段: q_1024_vec, q_1024_title_vec
store_to_database(doc)
配置管理
租户级配置
每个租户可以独立配置 embedding 模型:
# 租户 embedding 配置
tenant.embd_id = "text-embedding-3-large@OpenAI"
tenant.save()
API 密钥管理
安全的 API 密钥存储和管理:
# 配置 API 密钥
TenantLLMService.save(
tenant_id=tenant_id,
llm_factory="OpenAI",
llm_name="text-embedding-3-large",
api_key="your-api-key",
api_base="https://api.openai.com/v1"
)
环境变量配置
支持通过环境变量配置:
# HuggingFace 镜像站点
export HF_ENDPOINT=https://hf-mirror.com
# 轻量化模式(不加载本地模型)
export LIGHTEN=1
监控和调试
性能监控
# Token 使用统计
logger.info(f"Embedding tokens used: {token_count}")
# 处理时间监控
start_time = timer()
vectors, tokens = model.encode(texts)
elapsed = timer() - start_time
logger.info(f"Embedding time: {elapsed:.2f}s")
错误处理
try:
vectors, tokens = model.encode(texts)
except Exception as e:
logger.error(f"Embedding failed: {str(e)}")
# 降级处理或重试逻辑
扩展指南
添加新的 Embedding 提供商
- 创建模型类:
class NewProviderEmbed(Base):
def __init__(self, key, model_name, **kwargs):
# 初始化逻辑
pass
def encode(self, texts: list):
# 实现批量向量化
return vectors, token_count
def encode_queries(self, text: str):
# 实现查询向量化
return vector, token_count
- 注册到工厂:
# 在 rag/llm/__init__.py 中添加
EmbeddingModel = {
# ... 现有模型
"NewProvider": NewProviderEmbed,
}
- 更新配置:
# 在 conf/llm_factories.json 中添加提供商信息
自定义向量化逻辑
def custom_embedding_processor(docs, model, config):
"""自定义 embedding 处理逻辑"""
# 预处理
processed_texts = preprocess_texts([d["content"] for d in docs])
# 向量化
vectors, tokens = model.encode(processed_texts)
# 后处理
vectors = postprocess_vectors(vectors)
# 存储
for i, doc in enumerate(docs):
doc["custom_vector"] = vectors[i].tolist()
return docs
总结
RAGFlow 的 embedding 实现机制具有以下核心优势:
- 多提供商支持:统一接口支持 20+ 种 embedding 服务,满足不同场景需求
- 灵活配置:租户级别的模型配置和 API 密钥管理
- 性能优化:批量处理、缓存机制、线程安全,确保高效运行
- 易于扩展:标准化接口设计,便于添加新的 embedding 提供商
- 完整集成:与文档处理、向量存储、检索等模块深度集成
- 自动化特性:模型自动下载、错误重试、智能降级
- 监控支持:完整的日志记录、性能监控、错误处理机制
这种设计使得 RAGFlow 能够灵活适应不同的部署环境和业务需求,既支持本地部署的开源模型,也支持各种云端 API 服务,为构建高质量的 RAG 系统提供了坚实的基础。
Copyright © 2017 - 2025 boboidea.com All Rights Reserved 波波创意软件工作室 版权所有 【转载请注明出处】