RAGFlow Embedding 实现机制 @ 胡巴 | 星期五,八月 15 日,2025 年 | 6 分钟阅读 | 更新于 星期五,八月 15 日,2025 年

RAGFlow Embedding 实现机制

概述

RAGFlow 的 embedding 系统采用了多提供商统一接口的设计模式,支持 20+ 种不同的 embedding 提供商和模型,包括本地模型和云端 API 服务。该系统为 RAG(检索增强生成)提供了高质量的文本向量化能力。

系统架构

核心组件

主要文件位置

  • rag/llm/embedding_model.py - 核心实现文件,包含所有 embedding 模型类
  • rag/llm/__init__.py - 模型注册和工厂类定义
  • rag/svr/task_executor.py - embedding 处理逻辑和任务执行
  • api/db/services/llm_service.py - 模型服务和配置管理

设计模式

统一接口层 (Base ABC)
    ↓
多提供商实现层 (OpenAIEmbed, DefaultEmbedding, etc.)
    ↓
工厂注册层 (EmbeddingModel Dictionary)
    ↓
服务管理层 (LLMBundle, TenantLLMService)
    ↓
任务执行层 (TaskExecutor)

支持的 Embedding 提供商

RAGFlow 支持以下 embedding 提供商:

EmbeddingModel = {
    "BAAI": DefaultEmbedding,          # 默认本地模型 (bge-large-zh-v1.5)
    "OpenAI": OpenAIEmbed,             # OpenAI text-embedding
    "Azure-OpenAI": AzureEmbed,        # Azure OpenAI
    "Tongyi-Qianwen": QWenEmbed,       # 阿里通义千问
    "ZHIPU-AI": ZhipuEmbed,           # 智谱 AI
    "Ollama": OllamaEmbed,            # Ollama 本地服务
    "LocalAI": LocalAIEmbed,          # LocalAI
    "Xinference": XinferenceEmbed,    # Xinference
    "FastEmbed": FastEmbed,           # FastEmbed 优化版本
    "Youdao": YoudaoEmbed,           # 有道 BCE embedding
    "BaiChuan": BaiChuanEmbed,       # 百川智能
    "Jina": JinaEmbed,               # Jina AI
    "Mistral": MistralEmbed,         # Mistral
    "Bedrock": BedrockEmbed,         # AWS Bedrock
    "Gemini": GeminiEmbed,           # Google Gemini
    "NVIDIA": NvidiaEmbed,           # NVIDIA
    "LM-Studio": LmStudioEmbed,      # LM Studio
    "OpenAI-API-Compatible": OpenAI_APIEmbed,  # 兼容 OpenAI API
    "Cohere": CoHereEmbed,           # Cohere
    "TogetherAI": TogetherAIEmbed,   # Together AI
    "PerfXCloud": PerfXCloudEmbed,   # PerfXCloud
    "Upstage": UpstageEmbed,         # Upstage
    "SILICONFLOW": SILICONFLOWEmbed, # 硅基流动
    "Replicate": ReplicateEmbed,     # Replicate
    "BaiduYiyan": BaiduYiyanEmbed,   # 百度文心一言
    "Voyage AI": VoyageEmbed,        # Voyage AI
    "HuggingFace": HuggingFaceEmbed, # HuggingFace
    "VolcEngine": VolcEngineEmbed,   # 火山引擎
}

核心实现

1. 统一接口设计

所有 embedding 模型都继承自 Base 抽象基类:

class Base(ABC):
    def __init__(self, key, model_name):
        pass
    
    def encode(self, texts: list):
        """批量文本向量化
        Args:
            texts: 文本列表
        Returns:
            tuple: (向量数组, token数量)
        """
        raise NotImplementedError("Please implement encode method!")
    
    def encode_queries(self, text: str):
        """单个查询文本向量化
        Args:
            text: 查询文本
        Returns:
            tuple: (向量数组, token数量)
        """
        raise NotImplementedError("Please implement encode method!")

2. 默认 Embedding 模型实现

DefaultEmbedding 类是 RAGFlow 的默认本地 embedding 实现:

class DefaultEmbedding(Base):
    _model = None
    _model_name = ""
    _model_lock = threading.Lock()
    
    def __init__(self, key, model_name, **kwargs):
        if not settings.LIGHTEN:
            with DefaultEmbedding._model_lock:
                from FlagEmbedding import FlagModel
                import torch
                if not DefaultEmbedding._model or model_name != DefaultEmbedding._model_name:
                    try:
                        # 使用本地模型
                        DefaultEmbedding._model = FlagModel(
                            model_path,
                            query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
                            use_fp16=torch.cuda.is_available()
                        )
                        DefaultEmbedding._model_name = model_name
                    except Exception:
                        # 自动从 HuggingFace 下载 BAAI/bge-large-zh-v1.5
                        model_dir = snapshot_download(
                            repo_id="BAAI/bge-large-zh-v1.5",
                            local_dir=local_path,
                            local_dir_use_symlinks=False
                        )
                        DefaultEmbedding._model = FlagModel(model_dir, ...)
    
    def encode(self, texts: list):
        batch_size = 16
        texts = [truncate(t, 2048) for t in texts]  # 文本截断
        token_count = sum(num_tokens_from_string(t) for t in texts)
        
        ress = []
        for i in range(0, len(texts), batch_size):
            ress.extend(self._model.encode(texts[i:i + batch_size]).tolist())
        return np.array(ress), token_count
    
    def encode_queries(self, text: str):
        token_count = num_tokens_from_string(text)
        return self._model.encode_queries([text]).tolist()[0], token_count

3. OpenAI Embedding 实现示例

class OpenAIEmbed(Base):
    def __init__(self, key, model_name="text-embedding-ada-002", base_url="https://api.openai.com/v1"):
        if not base_url:
            base_url = "https://api.openai.com/v1"
        self.client = OpenAI(api_key=key, base_url=base_url)
        self.model_name = model_name
    
    def encode(self, texts: list):
        batch_size = 16  # OpenAI 要求批量大小 ≤16
        texts = [truncate(t, 8191) for t in texts]  # 文本长度限制
        ress = []
        total_tokens = 0
        
        for i in range(0, len(texts), batch_size):
            res = self.client.embeddings.create(
                input=texts[i:i + batch_size],
                model=self.model_name
            )
            ress.extend([d.embedding for d in res.data])
            total_tokens += res.usage.total_tokens
        
        return np.array(ress), total_tokens
    
    def encode_queries(self, text):
        res = self.client.embeddings.create(
            input=[truncate(text, 8191)],
            model=self.model_name
        )
        return np.array(res.data[0].embedding), res.usage.total_tokens

Embedding 处理流程

文档向量化流程

在文档解析和切块后,系统会对文档进行向量化处理:

def embedding(docs, mdl, parser_config=None, callback=None):
    """文档 embedding 处理函数
    Args:
        docs: 文档chunks列表
        mdl: embedding模型实例  
        parser_config: 解析配置
        callback: 回调函数
    """
    if parser_config is None:
        parser_config = {}
    
    # 1. 提取标题和内容
    tts = [d["title_tks"] for d in docs if d.get("title_tks")]
    cnts = [d["content_with_weight"] for d in docs]
    
    # 2. 配置参数
    batch_size = parser_config.get("embedding_batch_size", 32)
    vctr_nm = f"q_{vector_size}_vec"
    title_vctr_nm = f"q_{vector_size}_title_vec"
    
    # 3. 批量处理标题向量化
    tts_ = np.array([])
    if tts:
        for i in range(0, len(tts), batch_size):
            vts, c = mdl.encode(tts[i: i + batch_size])  # 核心调用
            if len(tts_) == 0:
                tts_ = vts
            else:
                tts_ = np.concatenate((tts_, vts), axis=0)
            if callback:
                callback(msg="Embedding titles...")
    
    # 4. 批量处理内容向量化
    cnts_ = np.array([])
    for i in range(0, len(cnts), batch_size):
        vts, c = mdl.encode(cnts[i: i + batch_size])  # 核心调用
        if len(cnts_) == 0:
            cnts_ = vts
        else:
            cnts_ = np.concatenate((cnts_, vts), axis=0)
        if callback:
            callback(msg=f"Embedding contents({i + batch_size}/{len(cnts)})...")
    
    # 5. 向量存储到文档
    for i, d in enumerate(docs):
        if i < len(cnts_):
            d[vctr_nm] = cnts_[i].tolist()
        if i < len(tts_):
            d[title_vctr_nm] = tts_[i].tolist()
    
    return docs

模型实例化机制

LLMBundle 获取模型实例

@classmethod
def model_instance(cls, tenant_id, llm_type, llm_name=None, lang="Chinese"):
    """获取模型实例
    Args:
        tenant_id: 租户ID
        llm_type: 模型类型 (EMBEDDING, CHAT, etc.)
        llm_name: 模型名称
        lang: 语言设置
    Returns:
        模型实例
    """
    # 1. 获取租户配置
    e, tenant = TenantService.get_by_id(tenant_id)
    if not e:
        raise LookupError("Tenant not found")
    
    # 2. 确定模型名称
    if llm_type == LLMType.EMBEDDING.value:
        mdlnm = tenant.embd_id if not llm_name else llm_name
    elif llm_type == LLMType.SPEECH2TEXT.value:
        mdlnm = tenant.asr_id
    elif llm_type == LLMType.IMAGE2TEXT.value:
        mdlnm = tenant.img2txt_id if not llm_name else llm_name
    else:
        mdlnm = tenant.llm_id if not llm_name else llm_name
    
    # 3. 获取 API 配置
    llm = TenantLLMService.get_api_key(tenant_id, mdlnm)
    if not llm:
        raise LookupError("LLM not found")
    
    # 4. 解析模型名称和工厂
    mdl_nm, fctry = TenantLLMService.split_model_name_and_factory(mdlnm)
    if fctry:
        mdl_nm = mdlnm.replace(f"@{fctry}", "")
    else:
        fctry = llm.llm_factory
    
    # 5. 实例化具体的 embedding 模型
    if llm_type == LLMType.EMBEDDING.value:
        mdl_class = EmbeddingModel.get(fctry)
        if not mdl_class:
            raise LookupError(f"Embedding model factory {fctry} not found")
        
        return mdl_class(
            key=llm.api_key,
            model_name=mdl_nm,
            base_url=llm.api_base
        )

高级特性

1. 性能优化

批量处理

# 支持批量向量化,提升处理效率
batch_size = parser_config.get("embedding_batch_size", 32)
for i in range(0, len(texts), batch_size):
    vectors, tokens = model.encode(texts[i:i + batch_size])

线程安全

# 使用锁机制确保模型加载的线程安全
_model_lock = threading.Lock()
with DefaultEmbedding._model_lock:
    # 模型初始化代码

单例模式

# 避免重复加载模型,节省内存
class DefaultEmbedding(Base):
    _model = None  # 类级别共享模型实例
    _model_name = ""

2. 自动化特性

模型自动下载

try:
    # 尝试使用本地模型
    model = FlagModel(local_path)
except Exception:
    # 自动从 HuggingFace 下载
    model_dir = snapshot_download(
        repo_id="BAAI/bge-large-zh-v1.5",
        local_dir=local_path,
        local_dir_use_symlinks=False
    )
    model = FlagModel(model_dir)

错误重试机制

for i in range(100000):  # 重试机制
    try:
        outputs = self.predictor.run(None, input_dict)
        break
    except Exception as e:
        if i >= 3:
            raise e
        time.sleep(5)

3. 文本处理优化

智能截断

# 根据不同模型的限制进行文本截断
texts = [truncate(t, 2048) for t in texts]  # BAAI 模型
texts = [truncate(t, 8191) for t in texts]  # OpenAI 模型

Token 计数

# 精确的 token 使用量统计
token_count = sum(num_tokens_from_string(t) for t in texts)
return vectors, token_count

向量存储集成

Elasticsearch 集成

RAGFlow 默认使用 Elasticsearch 作为向量存储引擎:

# 向量字段映射配置
"q_768_vec": {
    "type": "dense_vector",
    "dims": 768,
    "index": true,
    "similarity": "cosine"
}

Infinity 集成

可选的高性能向量数据库:

# 切换到 Infinity
DOC_ENGINE=infinity  # 在 docker/.env 中配置

混合检索支持

支持文本检索和向量检索的融合:

class FusionExpr:
    """融合检索表达式"""
    def __init__(self, text_expr, dense_expr, fusion_type="rrf"):
        self.text_expr = text_expr      # 文本检索
        self.dense_expr = dense_expr    # 向量检索  
        self.fusion_type = fusion_type  # 融合方式

使用示例

基本用法

from rag.llm import EmbeddingModel
from api.db.services.llm_service import LLMBundle
from api.db import LLMType

# 1. 获取 embedding 模型实例
embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING)

# 2. 批量向量化文本
texts = ["这是一段测试文本", "另一段文本", "第三段文本"]
vectors, token_count = embd_mdl.encode(texts)

print(f"向量维度: {vectors.shape}")  # (3, 1024)
print(f"使用Token: {token_count}")

# 3. 查询向量化
query = "查询文本"
query_vector, tokens = embd_mdl.encode_queries(query)
print(f"查询向量: {query_vector.shape}")  # (1024,)

自定义配置

# 使用特定的 embedding 提供商
from rag.llm.embedding_model import OpenAIEmbed

# 实例化 OpenAI embedding 模型
openai_embed = OpenAIEmbed(
    key="your-api-key",
    model_name="text-embedding-3-large",
    base_url="https://api.openai.com/v1"
)

# 向量化处理
vectors, tokens = openai_embed.encode(["test text"])

在文档处理中的应用

# 在文档解析流程中使用
def process_documents(docs, tenant_id):
    # 1. 获取 embedding 模型
    embd_mdl = LLMBundle(tenant_id, LLMType.EMBEDDING)
    
    # 2. 配置参数
    parser_config = {
        "embedding_batch_size": 32,
        "chunk_token_count": 128
    }
    
    # 3. 执行 embedding
    embedded_docs = embedding(docs, embd_mdl, parser_config)
    
    # 4. 存储到向量数据库
    for doc in embedded_docs:
        # doc 包含向量字段: q_1024_vec, q_1024_title_vec
        store_to_database(doc)

配置管理

租户级配置

每个租户可以独立配置 embedding 模型:

# 租户 embedding 配置
tenant.embd_id = "text-embedding-3-large@OpenAI"
tenant.save()

API 密钥管理

安全的 API 密钥存储和管理:

# 配置 API 密钥
TenantLLMService.save(
    tenant_id=tenant_id,
    llm_factory="OpenAI",
    llm_name="text-embedding-3-large",
    api_key="your-api-key",
    api_base="https://api.openai.com/v1"
)

环境变量配置

支持通过环境变量配置:

# HuggingFace 镜像站点
export HF_ENDPOINT=https://hf-mirror.com

# 轻量化模式(不加载本地模型)
export LIGHTEN=1

监控和调试

性能监控

# Token 使用统计
logger.info(f"Embedding tokens used: {token_count}")

# 处理时间监控
start_time = timer()
vectors, tokens = model.encode(texts)
elapsed = timer() - start_time
logger.info(f"Embedding time: {elapsed:.2f}s")

错误处理

try:
    vectors, tokens = model.encode(texts)
except Exception as e:
    logger.error(f"Embedding failed: {str(e)}")
    # 降级处理或重试逻辑

扩展指南

添加新的 Embedding 提供商

  1. 创建模型类
class NewProviderEmbed(Base):
    def __init__(self, key, model_name, **kwargs):
        # 初始化逻辑
        pass
    
    def encode(self, texts: list):
        # 实现批量向量化
        return vectors, token_count
    
    def encode_queries(self, text: str):
        # 实现查询向量化
        return vector, token_count
  1. 注册到工厂
# 在 rag/llm/__init__.py 中添加
EmbeddingModel = {
    # ... 现有模型
    "NewProvider": NewProviderEmbed,
}
  1. 更新配置
# 在 conf/llm_factories.json 中添加提供商信息

自定义向量化逻辑

def custom_embedding_processor(docs, model, config):
    """自定义 embedding 处理逻辑"""
    # 预处理
    processed_texts = preprocess_texts([d["content"] for d in docs])
    
    # 向量化
    vectors, tokens = model.encode(processed_texts)
    
    # 后处理
    vectors = postprocess_vectors(vectors)
    
    # 存储
    for i, doc in enumerate(docs):
        doc["custom_vector"] = vectors[i].tolist()
    
    return docs

总结

RAGFlow 的 embedding 实现机制具有以下核心优势:

  1. 多提供商支持:统一接口支持 20+ 种 embedding 服务,满足不同场景需求
  2. 灵活配置:租户级别的模型配置和 API 密钥管理
  3. 性能优化:批量处理、缓存机制、线程安全,确保高效运行
  4. 易于扩展:标准化接口设计,便于添加新的 embedding 提供商
  5. 完整集成:与文档处理、向量存储、检索等模块深度集成
  6. 自动化特性:模型自动下载、错误重试、智能降级
  7. 监控支持:完整的日志记录、性能监控、错误处理机制

这种设计使得 RAGFlow 能够灵活适应不同的部署环境和业务需求,既支持本地部署的开源模型,也支持各种云端 API 服务,为构建高质量的 RAG 系统提供了坚实的基础。

alt 搜索公众号:无限递归

Copyright © 2017 - 2025 boboidea.com All Rights Reserved 波波创意软件工作室 版权所有 【转载请注明出处】

avatar

BoBo`s Blog每天进步一点点,能多一点是一点

appdata apt-get bloomfilter channel Chatbot ChatGPT Chrome chsh ClickHouse Context css csv CUDA Cursor DaDa英语 Deepseek defer df docker elasticsearch embedding error ffmpeg fix-missing form gif git GitLab globalproject golang hosts HTTP HTTPS iconv IDE Interface iota Kafka LangChain libssl LLM ln mac mac系统更新 Map MCP MetaMCP mkdir MSYS2 mysql n8n nginx OCR oh-my-zsh Ollama openconnect openssl PAM permission php pip PowerShell puppeteer python rabbitmq RAGFlow redis reflect rsync SD sed shell Slice snowflake space SQL SSH struct syntax_err tensorflow ubuntu ue4 unauthorized unreal4 UV vim virtualbox vpn VSCode Windows x86_64 xcode-select YCM zookeeper zsh 上海积分 主从复制 事务 二进制安全 交叉熵 人力资源 代码工具 代码编辑助手 代码评审 以太坊 信息论 全民哀悼 内存管理 内容创作 分屏 分支删除 加密货币 区块链 匿名函数 协作系统 协议设计 启动盘 品种 图片转视频 均线 夏天 夏季 实用技巧 密码修改 工作流 工作流触发 工具 工具管理 布局识别 开发工具 开发环境 循环 微信公众号 批处理 批量处理 批量替换 批量重命名 挖矿 接收器 效率工具 教程 数据分析 数据合并 数据处理 数据查询 数据类型 数据结构 数据聚合 数据转换 文件同步 文件管理 文本向量化 文本向量检索 文本识别 日志切分 智能体 智能合约 替代方案 本地部署 概率论 比特币 水果 治疗 流量分析 浏览器调试 消息队列 游侠源码网 版本控制 狗狗币 生活 用户行为 电视 症状 磁盘清理 笔记本技巧 系统配置 编程语言 编译PHP 编辑器 网站统计 网络 自动化 自动化工作流 自动化工具 自动提交 自动签到 节点 获取方式 虚拟机 西瓜 记忆 购买指南 跨平台 软连接 运维技巧 闭包 集成 雪花算法 零代码 面瘫 鞋子 项目无法编译
基本信息
  • 姓名:bobo
  • 花名:胡巴
  • 性别:男
  • 血型:O型
  • 星座:白羊座

联系方式

  • 所在地:上海
  • QQ:279250819
  • 微信号:wanghuiwoshinideyou
  • 电子邮件:279250819@qq.com

博客地址

公众号

alt 无限递归

工作经历
  • 2022.5 - 2025-08-08

    • 公司:乐府互娱
    • 职位:高级平台服务器开发工程师
    • 荣誉:
      • 得到公司Leader的高度认可
    • 所作所为:
      • 优化SDK接入流程,提升接入效率:通过重构SDK后端接入代码,提炼接入模板,显著提升接入时间,由7天缩短到4天
      • 参与公司两款游戏大推,保障SDK服务稳定:通过大推前代码持续review,大推前压测,发现问题并及时修复,制定告警机制,包括飞书及时告警及grafana监控告警及时发现服务问题并修复上线,保障大推期间服务稳定。同时能够和游戏研发、游戏运营、游戏运维团队保持紧密配合,保障大推期间服务稳定
      • 推动cursor在项目组内的应用,提升开发效率:通过AI工具分享,实战演示等,提升项目组内AI工具使用率,进而提升整个项目组开发效率,使IOS开发再也不惧怕Unity开发
      • 利用n8n搭建数据查询助手,提升财务工作效率:利用n8n+AI搭建了对账查询工作流,免去了财务与技术的沟通成本,至少使双方每月沟通时间成本减少2小时
      • 参与公司千目广告系统的开发及维护,提升广告系统稳定性,为公司发行买量业务保驾护航
  • 2019.6 - 2022.4

    • 公司:萌推(上海突进网络科技有限公司)
    • 职位:中级PHP工程师 & 初级golang工程师
    • 荣誉:
      • 绩效A连续得主
      • 月度之星
      • 优秀个人奖
    • 所作所为:
      • 利用ES优化OMS、MMS管理系统商品列表查询
      • 利用消息队列、Redis、乐观锁优化商品审核流程
      • 利用Redis对商家端接口进行有效限流
      • 优化商品相关表索引,提升SQL查询速度
      • 商品中台构建,统一商品相关操作
      • 大表优化(数据分离、分表、大字段拆分)
      • 掌握所有商品核心流程
  • 2018.5 - 2019.5

    • 公司:DaDa英语(上海卓赞教育信息科技有限公司)
    • 职位:中级PHP开发工程师
    • 荣誉:无
    • 所作所为:
      • 利用ES优化教师CMS系统统计数据接口至500ms内
      • 工单系统开发及持续优化
      • 教师CMS系统的功能开发及持续优化
  • 2018.3 - 2018.5

    • 公司:波奇(上海)信息科技有限公司
    • 职位:初级PHP开发工程师
    • 荣誉:同下
    • 所做作为:如下
  • 2016.7 - 2018.3

    • 公司:光橙(上海)信息科技有限公司
    • 职位:初级PHP开发工程师
    • 荣誉:
      • 年度最佳进步奖
    • 所作所为:
      • 利用Redis提升商详接口最佳响应速度至50ms内
      • 利用Redis提升双11活动页可承受QPS至500以上
      • 利用Redis对接口进行简单限流
      • 与小伙伴合作提升搜索质量(ES初识)
      • 其他C端接口的开发及优化
      • B端商城老页面的维护及优化
SKILLS

编程语言

  • PHP
  • Golang
  • Shell
  • JAVA
  • JS
  • HTML\CSS

数据库

  • MySQL
  • Redis
  • Clickhouse

消息中间件

  • RabbitMq
  • Kafka

文档撰写

  • Swagger
  • Markdown

技术框架

  • Laravel
  • gin

搜索引擎

  • ElasticSearch

抓包工具

  • Charles