RAG知识库问答系统实战案例:从需求分析到上线部署全流程

RAG知识库问答系统实战案例,从需求分析到上线部署全流程

正文

一、项目背景与需求

先说说这个项目怎么来的。

我们公司是做SaaS产品的,客服团队每天要处理大量重复问题,什么”怎么重置密码””退款政策是什么””如何升级套餐”之类的,占了工单总量的60%以上。客服同事累得要命,用户等待时间也长。

老板提了个需求:能不能做个智能问答机器人,能自动回答这些常见问题?

技术选型的时候,考虑到几点:

  1. 准确率要够高:答错了比不答更糟糕,必须有据可查
  2. 能对接私有知识库:产品文档、FAQ都是内部资料,通用大模型不知道
  3. 响应要快:用户等太久体验差
  4. 后期好维护:知识库会更新,不能每次都改代码

综合考虑,选了RAG架构。

RAG项目技术架构图,Milvus向量库Docker部署方案示意

二、技术方案设计

2.1 整体架构

plaintext

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   用户问题   │────▶│   API服务    │────▶│   大模型    │
└─────────────┘     └──────┬──────┘     └─────────────┘
                           │
                     ┌─────▼─────┐     ┌─────────────┐
                     │  检索模块  │◀───▶│  向量数据库  │
                     └───────────┘     └─────────────┘
                           │
                     ┌─────▼─────┐
                     │  文档处理  │◀───▶│  文件存储   │
                     └───────────┘

核心流程:

  1. 用户提问 → 转成向量
  2. 向量检索 → 找到最相关的知识块
  3. 拼成prompt → 调用大模型
  4. 返回答案 → 附带参考来源

2.2 技术栈选择

组件技术选型选型理由
后端框架FastAPI高性能、支持异步、文档自动生成
向量数据库Milvus支持分布式、亿级向量毫秒级检索
Embeddingtext2vec-base-chinese中文效果好,开源免费
大模型通义千问中文能力强,国内合规
文档处理LangChain生态完善,社区活跃
前端Vue3 + Element Plus内部使用,简单够用

2.3 性能指标目标

  • 问答响应时间:P95 < 3秒
  • 检索准确率:> 85%(人工评测)
  • 支持并发:100 QPS
  • 知识库规模:10万条文档

三、环境准备与项目结构

3.1 目录结构

bash

rag-knowledge-base/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI入口
│   ├── api/
│   │   ├── __init__.py
│   │   ├── chat.py          # 问答接口
│   │   └── knowledge.py     # 知识库管理接口
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py        # 配置管理
│   │   └── security.py      # 安全相关
│   ├── services/
│   │   ├── __init__.py
│   │   ├── embedding.py     # Embedding服务
│   │   ├── retrieval.py     # 检索服务
│   │   └── llm.py            # 大模型服务
│   └── models/
│       ├── __init__.py
│       └── schemas.py       # Pydantic模型
├── scripts/
│   ├── ingest.py            # 文档入库脚本
│   └── test_query.py        # 测试脚本
├── knowledge_base/          # 知识库源文件
│   ├── faq/
│   ├── product_docs/
│   └── policy/
├── vector_db/               # 向量数据库数据
├── Dockerfile
├── docker-compose.yml
└── requirements.txt

3.2 依赖安装

bash

# requirements.txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.3
langchain==0.1.4
langchain-community==0.0.16
pymilvus==2.3.4
sentence-transformers==2.3.1
tiktoken==0.5.2
python-multipart==0.0.6
python-dotenv==1.0.0

bash

pip install -r requirements.txt

四、核心代码实现

4.1 配置管理

python

# app/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    """应用配置"""
    
    # 服务配置
    APP_NAME: str = "RAG知识库问答系统"
    DEBUG: bool = False
    
    # Milvus配置
    MILVUS_HOST: str = "localhost"
    MILVUS_PORT: int = 19530
    COLLECTION_NAME: str = "knowledge_base"
    VECTOR_DIM: int = 768  # text2vec-base-chinese输出维度
    
    # Embedding配置
    EMBEDDING_MODEL: str = "shibing624/text2vec-base-chinese"
    BATCH_SIZE: int = 32
    
    # LLM配置
    LLM_API_KEY: str = ""
    LLM_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    LLM_MODEL: str = "qwen-turbo"
    LLM_TEMPERATURE: float = 0.3
    MAX_TOKENS: int = 1000
    
    # 检索配置
    TOP_K: int = 3
    SCORE_THRESHOLD: float = 0.5
    
    class Config:
        env_file = ".env"
        case_sensitive = True

@lru_cache()
def get_settings():
    return Settings()

4.2 Embedding服务

python

# app/services/embedding.py
from sentence_transformers import SentenceTransformer
from app.core.config import get_settings
from typing import List
import numpy as np

class EmbeddingService:
    def __init__(self):
        settings = get_settings()
        self.model = SentenceTransformer(settings.EMBEDDING_MODEL)
        self.batch_size = settings.BATCH_SIZE
    
    def encode(self, texts: List[str]) -> List[List[float]]:
        """批量获取文本向量"""
        embeddings = self.model.encode(
            texts,
            batch_size=self.batch_size,
            show_progress_bar=False
        )
        return embeddings.tolist()
    
    def encode_single(self, text: str) -> List[float]:
        """获取单条文本向量"""
        embedding = self.model.encode([text])[0]
        return embedding.tolist()

# 全局单例
embedding_service = EmbeddingService()

4.3 向量检索服务

python

# app/services/retrieval.py
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from app.core.config import get_settings
from app.services.embedding import embedding_service
from typing import List, Dict

class RetrievalService:
    def __init__(self):
        self.settings = get_settings()
        self.collection = None
        self._connect()
    
    def _connect(self):
        """连接Milvus"""
        connections.connect(
            host=self.settings.MILVUS_HOST,
            port=self.settings.MILVUS_PORT,
            alias="default"
        )
        self._init_collection()
    
    def _init_collection(self):
        """初始化集合"""
        collection_name = self.settings.COLLECTION_NAME
        
        if utility.has_collection(collection_name):
            self.collection = Collection(collection_name)
            self.collection.load()
        else:
            # 创建新集合
            fields = [
                FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
                FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096),
                FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
                FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.settings.VECTOR_DIM)
            ]
            schema = CollectionSchema(fields=fields, description="知识库向量集合")
            self.collection = Collection(name=collection_name, schema=schema)
            
            # 创建索引
            index_params = {
                "index_type": "IVF_FLAT",
                "metric_type": "L2",
                "params": {"nlist": 128}
            }
            self.collection.create_index(field_name="vector", index_params=index_params)
            self.collection.load()
    
    def search(self, query: str, top_k: int = None) -> List[Dict]:
        """检索最相关的文档"""
        if top_k is None:
            top_k = self.settings.TOP_K
        
        # 1. 将问题向量化
        query_vector = embedding_service.encode_single(query)
        
        # 2. 执行搜索
        search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
        
        results = self.collection.search(
            data=[query_vector],
            anns_field="vector",
            param=search_params,
            limit=top_k,
            output_fields=["text", "source"]
        )
        
        # 3. 整理结果
        retrieved_docs = []
        for hits in results:
            for hit in hits:
                # 过滤低分结果
                if hit.distance < self.settings.SCORE_THRESHOLD:
                    continue
                retrieved_docs.append({
                    "text": hit.entity.get("text"),
                    "source": hit.entity.get("source"),
                    "score": float(hit.distance)
                })
        
        return retrieved_docs

# 全局单例
retrieval_service = RetrievalService()

4.4 大模型服务

python

# app/services/llm.py
import openai
from app.core.config import get_settings
from typing import List, Dict

class LLMService:
    def __init__(self):
        settings = get_settings()
        self.client = openai.OpenAI(
            api_key=settings.LLM_API_KEY,
            base_url=settings.LLM_BASE_URL
        )
        self.model = settings.LLM_MODEL
        self.temperature = settings.LLM_TEMPERATURE
        self.max_tokens = settings.MAX_TOKENS
    
    def generate(self, query: str, context: List[Dict]) -> Dict:
        """生成回答"""
        # 构建prompt
        system_prompt = """你是一个智能客服助手,擅长回答用户问题。
请基于以下参考信息回答用户问题。
要求:
1. 只根据参考信息回答,不要编造
2. 如果参考信息不足以回答,明确告知用户
3. 回答要简洁清晰
4. 如果涉及政策说明,要标注信息来源"""

        # 组装参考信息
        context_text = "\n\n".join([
            f"[来源{i+1}] {doc['text']}\n(文件:{doc['source']})"
            for i, doc in enumerate(context)
        ])
        
        user_prompt = f"""## 参考信息
{context_text}

## 用户问题
{query}
"""
        
        # 调用API
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )
        
        answer = response.choices[0].message.content
        
        return {
            "answer": answer,
            "sources": [doc['source'] for doc in context]
        }

# 全局单例
llm_service = LLMService()

4.5 API接口

python

# app/api/chat.py
from fastapi import APIRouter, HTTPException
from app.models.schemas import ChatRequest, ChatResponse
from app.services.retrieval import retrieval_service
from app.services.llm import llm_service

router = APIRouter(prefix="/api/v1", tags=["问答"])

@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """问答接口"""
    try:
        # 1. 检索相关文档
        retrieved_docs = retrieval_service.search(
            query=request.question,
            top_k=request.top_k
        )
        
        if not retrieved_docs:
            return ChatResponse(
                answer="抱歉,暂时没有找到相关信息,建议您联系人工客服获取帮助。",
                sources=[]
            )
        
        # 2. 生成回答
        result = llm_service.generate(
            query=request.question,
            context=retrieved_docs
        )
        
        return ChatResponse(
            answer=result["answer"],
            sources=result["sources"]
        )
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"服务异常:{str(e)}")

python

# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional

class ChatRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=500, description="用户问题")
    top_k: Optional[int] = Field(3, ge=1, le=10, description="返回结果数量")

class ChatResponse(BaseModel):
    answer: str = Field(..., description="回答内容")
    sources: List[str] = Field(default_factory=list, description="参考来源")

4.6 主入口

python

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api import chat, knowledge
from app.core.config import get_settings

settings = get_settings()

app = FastAPI(
    title=settings.APP_NAME,
    version="1.0.0",
    description="基于RAG的企业知识库问答系统"
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(chat.router)
app.include_router(knowledge.router)

@app.get("/health")
async def health_check():
    return {"status": "ok"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)

五、文档入库脚本

python

# scripts/ingest.py
import os
import json
from pathlib import Path
from langchain_community.document_loaders import (
    TextLoader, 
    UnstructuredMarkdownLoader,
    PyPDFLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from app.services.embedding import embedding_service
from app.services.retrieval import retrieval_service
from app.core.config import get_settings

def load_documents(directory: str):
    """加载目录下的所有文档"""
    docs = []
    path = Path(directory)
    
    for file_path in path.rglob("*"):
        if file_path.is_file() and file_path.suffix in ['.txt', '.md', '.pdf']:
            try:
                if file_path.suffix == '.pdf':
                    loader = PyPDFLoader(str(file_path))
                elif file_path.suffix == '.md':
                    loader = UnstructuredMarkdownLoader(str(file_path))
                else:
                    loader = TextLoader(str(file_path), encoding='utf-8')
                
                docs.extend(loader.load())
                print(f"加载成功: {file_path.name}")
            except Exception as e:
                print(f"加载失败: {file_path.name}, 错误: {e}")
    
    return docs

def split_documents(docs):
    """切分文档"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=300,
        chunk_overlap=50,
        length_function=len
    )
    return text_splitter.split_documents(docs)

def ingest_documents():
    """文档入库"""
    print("开始文档入库...")
    
    # 1. 加载文档
    print("\n步骤1: 加载文档")
    docs = load_documents("knowledge_base")
    print(f"共加载 {len(docs)} 个文档")
    
    # 2. 切分文档
    print("\n步骤2: 切分文档")
    chunks = split_documents(docs)
    print(f"切分后共 {len(chunks)} 个文本块")
    
    # 3. 向量化并入库
    print("\n步骤3: 向量化并入库")
    for i, chunk in enumerate(chunks):
        if i % 100 == 0:
            print(f"处理进度: {i}/{len(chunks)}")
        
        # 获取向量
        vector = embedding_service.encode_single(chunk.page_content)
        
        # 存入Milvus(此处省略具体代码,假设retrieval_service有add方法)
        retrieval_service.add(
            text=chunk.page_content,
            source=chunk.metadata.get('source', 'unknown'),
            vector=vector
        )
    
    print("\n文档入库完成!")

if __name__ == "__main__":
    ingest_documents()

六、Docker部署

6.1 docker-compose.yml

yaml

version: '3.8'

services:
  # FastAPI服务
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MILVUS_HOST=milvus
      - MILVUS_PORT=19530
      - LLM_API_KEY=${LLM_API_KEY}
    depends_on:
      - milvus
    restart: unless-stopped

  # Milvus向量数据库
  milvus:
    image: milvusdb/milvus:v2.3.3
    ports:
      - "19530:19530"
      - "9091:9091"
    volumes:
      - ./volumes/milvus:/var/lib/milvus
    environment:
      - ETCD_ENDPOINTS=etcd:2379
      - MINIO_ADDRESS=minio:9000
    depends_on:
      - etcd
      - minio
    restart: unless-stopped

  etcd:
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
    volumes:
      - ./volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  minio:
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
    volumes:
      - ./volumes/minio:/minio_data
    command: minio server /minio_data
    restart: unless-stopped

6.2 Dockerfile

dockerfile

FROM python:3.10-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 下载Embedding模型
RUN python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('shibing624/text2vec-base-chinese')"

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

6.3 启动服务

bash

# 复制环境变量模板
cp .env.example .env
# 编辑.env,填入API Key

# 启动所有服务
docker-compose up -d

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f api

服务启动后,访问 http://localhost:8000/docs 查看API文档。

七、上线后的调优

系统上线后,监控发现几个问题,做了相应调优:

7.1 响应时间优化

问题:P95响应时间超过5秒,不满足要求。

原因分析

  1. Milvus检索不稳定,部分查询耗时>2秒
  2. Embedding模型加载慢,每次请求都重新加载
  3. 网络IO阻塞

解决方案

python

# 1. 模型预加载
embedding_service = EmbeddingService()  # 服务启动时加载

# 2. Milvus连接池
connections.connect(host="milvus", port="19530", pool_size=10)

# 3. 异步处理
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    # 并行执行检索和Embedding
    retrieved_docs = await asyncio.to_thread(
        retrieval_service.search, 
        query=request.question
    )
    ...

7.2 准确率提升

问题:部分问题检索不到相关内容。

解决方案

  1. 增加Query改写层
  2. 混合检索(向量+关键词)
  3. 调整chunk_size,找到最优值(我们测试后确定为300)

八、总结与经验

这个项目做完,有几点体会:

  1. RAG不是万能的:它解决的是”知识库+大模型”的问题,如果知识库本身质量差,RAG效果也好不了
  2. 数据准备比技术实现更重要:我们花在清洗文档、规范FAQ上的时间,占了整个项目的40%
  3. 监控很重要:上线前一定要埋点,记录检索结果、大模型输出、用户反馈,才能持续优化
  4. 渐进式迭代:第一版不用追求完美,先跑通流程,再根据实际反馈优化

以上就是完整的RAG项目实战经验,希望对你有帮助。

相关推荐:

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注