正文
一、项目背景与需求
先说说这个项目怎么来的。
我们公司是做SaaS产品的,客服团队每天要处理大量重复问题,什么”怎么重置密码””退款政策是什么””如何升级套餐”之类的,占了工单总量的60%以上。客服同事累得要命,用户等待时间也长。
老板提了个需求:能不能做个智能问答机器人,能自动回答这些常见问题?
技术选型的时候,考虑到几点:
- 准确率要够高:答错了比不答更糟糕,必须有据可查
- 能对接私有知识库:产品文档、FAQ都是内部资料,通用大模型不知道
- 响应要快:用户等太久体验差
- 后期好维护:知识库会更新,不能每次都改代码
综合考虑,选了RAG架构。

二、技术方案设计
2.1 整体架构
plaintext
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 用户问题 │────▶│ API服务 │────▶│ 大模型 │
└─────────────┘ └──────┬──────┘ └─────────────┘
│
┌─────▼─────┐ ┌─────────────┐
│ 检索模块 │◀───▶│ 向量数据库 │
└───────────┘ └─────────────┘
│
┌─────▼─────┐
│ 文档处理 │◀───▶│ 文件存储 │
└───────────┘
核心流程:
- 用户提问 → 转成向量
- 向量检索 → 找到最相关的知识块
- 拼成prompt → 调用大模型
- 返回答案 → 附带参考来源
2.2 技术栈选择
| 组件 | 技术选型 | 选型理由 |
|---|---|---|
| 后端框架 | FastAPI | 高性能、支持异步、文档自动生成 |
| 向量数据库 | Milvus | 支持分布式、亿级向量毫秒级检索 |
| Embedding | text2vec-base-chinese | 中文效果好,开源免费 |
| 大模型 | 通义千问 | 中文能力强,国内合规 |
| 文档处理 | LangChain | 生态完善,社区活跃 |
| 前端 | Vue3 + Element Plus | 内部使用,简单够用 |
2.3 性能指标目标
- 问答响应时间:P95 < 3秒
- 检索准确率:> 85%(人工评测)
- 支持并发:100 QPS
- 知识库规模:10万条文档
三、环境准备与项目结构
3.1 目录结构
bash
rag-knowledge-base/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI入口
│ ├── api/
│ │ ├── __init__.py
│ │ ├── chat.py # 问答接口
│ │ └── knowledge.py # 知识库管理接口
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ └── security.py # 安全相关
│ ├── services/
│ │ ├── __init__.py
│ │ ├── embedding.py # Embedding服务
│ │ ├── retrieval.py # 检索服务
│ │ └── llm.py # 大模型服务
│ └── models/
│ ├── __init__.py
│ └── schemas.py # Pydantic模型
├── scripts/
│ ├── ingest.py # 文档入库脚本
│ └── test_query.py # 测试脚本
├── knowledge_base/ # 知识库源文件
│ ├── faq/
│ ├── product_docs/
│ └── policy/
├── vector_db/ # 向量数据库数据
├── Dockerfile
├── docker-compose.yml
└── requirements.txt
3.2 依赖安装
bash
# requirements.txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.3
langchain==0.1.4
langchain-community==0.0.16
pymilvus==2.3.4
sentence-transformers==2.3.1
tiktoken==0.5.2
python-multipart==0.0.6
python-dotenv==1.0.0
bash
pip install -r requirements.txt
四、核心代码实现
4.1 配置管理
python
# app/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""应用配置"""
# 服务配置
APP_NAME: str = "RAG知识库问答系统"
DEBUG: bool = False
# Milvus配置
MILVUS_HOST: str = "localhost"
MILVUS_PORT: int = 19530
COLLECTION_NAME: str = "knowledge_base"
VECTOR_DIM: int = 768 # text2vec-base-chinese输出维度
# Embedding配置
EMBEDDING_MODEL: str = "shibing624/text2vec-base-chinese"
BATCH_SIZE: int = 32
# LLM配置
LLM_API_KEY: str = ""
LLM_BASE_URL: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
LLM_MODEL: str = "qwen-turbo"
LLM_TEMPERATURE: float = 0.3
MAX_TOKENS: int = 1000
# 检索配置
TOP_K: int = 3
SCORE_THRESHOLD: float = 0.5
class Config:
env_file = ".env"
case_sensitive = True
@lru_cache()
def get_settings():
return Settings()
4.2 Embedding服务
python
# app/services/embedding.py
from sentence_transformers import SentenceTransformer
from app.core.config import get_settings
from typing import List
import numpy as np
class EmbeddingService:
def __init__(self):
settings = get_settings()
self.model = SentenceTransformer(settings.EMBEDDING_MODEL)
self.batch_size = settings.BATCH_SIZE
def encode(self, texts: List[str]) -> List[List[float]]:
"""批量获取文本向量"""
embeddings = self.model.encode(
texts,
batch_size=self.batch_size,
show_progress_bar=False
)
return embeddings.tolist()
def encode_single(self, text: str) -> List[float]:
"""获取单条文本向量"""
embedding = self.model.encode([text])[0]
return embedding.tolist()
# 全局单例
embedding_service = EmbeddingService()
4.3 向量检索服务
python
# app/services/retrieval.py
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from app.core.config import get_settings
from app.services.embedding import embedding_service
from typing import List, Dict
class RetrievalService:
def __init__(self):
self.settings = get_settings()
self.collection = None
self._connect()
def _connect(self):
"""连接Milvus"""
connections.connect(
host=self.settings.MILVUS_HOST,
port=self.settings.MILVUS_PORT,
alias="default"
)
self._init_collection()
def _init_collection(self):
"""初始化集合"""
collection_name = self.settings.COLLECTION_NAME
if utility.has_collection(collection_name):
self.collection = Collection(collection_name)
self.collection.load()
else:
# 创建新集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=self.settings.VECTOR_DIM)
]
schema = CollectionSchema(fields=fields, description="知识库向量集合")
self.collection = Collection(name=collection_name, schema=schema)
# 创建索引
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
self.collection.create_index(field_name="vector", index_params=index_params)
self.collection.load()
def search(self, query: str, top_k: int = None) -> List[Dict]:
"""检索最相关的文档"""
if top_k is None:
top_k = self.settings.TOP_K
# 1. 将问题向量化
query_vector = embedding_service.encode_single(query)
# 2. 执行搜索
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = self.collection.search(
data=[query_vector],
anns_field="vector",
param=search_params,
limit=top_k,
output_fields=["text", "source"]
)
# 3. 整理结果
retrieved_docs = []
for hits in results:
for hit in hits:
# 过滤低分结果
if hit.distance < self.settings.SCORE_THRESHOLD:
continue
retrieved_docs.append({
"text": hit.entity.get("text"),
"source": hit.entity.get("source"),
"score": float(hit.distance)
})
return retrieved_docs
# 全局单例
retrieval_service = RetrievalService()
4.4 大模型服务
python
# app/services/llm.py
import openai
from app.core.config import get_settings
from typing import List, Dict
class LLMService:
def __init__(self):
settings = get_settings()
self.client = openai.OpenAI(
api_key=settings.LLM_API_KEY,
base_url=settings.LLM_BASE_URL
)
self.model = settings.LLM_MODEL
self.temperature = settings.LLM_TEMPERATURE
self.max_tokens = settings.MAX_TOKENS
def generate(self, query: str, context: List[Dict]) -> Dict:
"""生成回答"""
# 构建prompt
system_prompt = """你是一个智能客服助手,擅长回答用户问题。
请基于以下参考信息回答用户问题。
要求:
1. 只根据参考信息回答,不要编造
2. 如果参考信息不足以回答,明确告知用户
3. 回答要简洁清晰
4. 如果涉及政策说明,要标注信息来源"""
# 组装参考信息
context_text = "\n\n".join([
f"[来源{i+1}] {doc['text']}\n(文件:{doc['source']})"
for i, doc in enumerate(context)
])
user_prompt = f"""## 参考信息
{context_text}
## 用户问题
{query}
"""
# 调用API
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=self.temperature,
max_tokens=self.max_tokens
)
answer = response.choices[0].message.content
return {
"answer": answer,
"sources": [doc['source'] for doc in context]
}
# 全局单例
llm_service = LLMService()
4.5 API接口
python
# app/api/chat.py
from fastapi import APIRouter, HTTPException
from app.models.schemas import ChatRequest, ChatResponse
from app.services.retrieval import retrieval_service
from app.services.llm import llm_service
router = APIRouter(prefix="/api/v1", tags=["问答"])
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""问答接口"""
try:
# 1. 检索相关文档
retrieved_docs = retrieval_service.search(
query=request.question,
top_k=request.top_k
)
if not retrieved_docs:
return ChatResponse(
answer="抱歉,暂时没有找到相关信息,建议您联系人工客服获取帮助。",
sources=[]
)
# 2. 生成回答
result = llm_service.generate(
query=request.question,
context=retrieved_docs
)
return ChatResponse(
answer=result["answer"],
sources=result["sources"]
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务异常:{str(e)}")
python
# app/models/schemas.py
from pydantic import BaseModel, Field
from typing import List, Optional
class ChatRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=500, description="用户问题")
top_k: Optional[int] = Field(3, ge=1, le=10, description="返回结果数量")
class ChatResponse(BaseModel):
answer: str = Field(..., description="回答内容")
sources: List[str] = Field(default_factory=list, description="参考来源")
4.6 主入口
python
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api import chat, knowledge
from app.core.config import get_settings
settings = get_settings()
app = FastAPI(
title=settings.APP_NAME,
version="1.0.0",
description="基于RAG的企业知识库问答系统"
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(chat.router)
app.include_router(knowledge.router)
@app.get("/health")
async def health_check():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)
五、文档入库脚本
python
# scripts/ingest.py
import os
import json
from pathlib import Path
from langchain_community.document_loaders import (
TextLoader,
UnstructuredMarkdownLoader,
PyPDFLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from app.services.embedding import embedding_service
from app.services.retrieval import retrieval_service
from app.core.config import get_settings
def load_documents(directory: str):
"""加载目录下的所有文档"""
docs = []
path = Path(directory)
for file_path in path.rglob("*"):
if file_path.is_file() and file_path.suffix in ['.txt', '.md', '.pdf']:
try:
if file_path.suffix == '.pdf':
loader = PyPDFLoader(str(file_path))
elif file_path.suffix == '.md':
loader = UnstructuredMarkdownLoader(str(file_path))
else:
loader = TextLoader(str(file_path), encoding='utf-8')
docs.extend(loader.load())
print(f"加载成功: {file_path.name}")
except Exception as e:
print(f"加载失败: {file_path.name}, 错误: {e}")
return docs
def split_documents(docs):
"""切分文档"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=300,
chunk_overlap=50,
length_function=len
)
return text_splitter.split_documents(docs)
def ingest_documents():
"""文档入库"""
print("开始文档入库...")
# 1. 加载文档
print("\n步骤1: 加载文档")
docs = load_documents("knowledge_base")
print(f"共加载 {len(docs)} 个文档")
# 2. 切分文档
print("\n步骤2: 切分文档")
chunks = split_documents(docs)
print(f"切分后共 {len(chunks)} 个文本块")
# 3. 向量化并入库
print("\n步骤3: 向量化并入库")
for i, chunk in enumerate(chunks):
if i % 100 == 0:
print(f"处理进度: {i}/{len(chunks)}")
# 获取向量
vector = embedding_service.encode_single(chunk.page_content)
# 存入Milvus(此处省略具体代码,假设retrieval_service有add方法)
retrieval_service.add(
text=chunk.page_content,
source=chunk.metadata.get('source', 'unknown'),
vector=vector
)
print("\n文档入库完成!")
if __name__ == "__main__":
ingest_documents()
六、Docker部署
6.1 docker-compose.yml
yaml
version: '3.8'
services:
# FastAPI服务
api:
build: .
ports:
- "8000:8000"
environment:
- MILVUS_HOST=milvus
- MILVUS_PORT=19530
- LLM_API_KEY=${LLM_API_KEY}
depends_on:
- milvus
restart: unless-stopped
# Milvus向量数据库
milvus:
image: milvusdb/milvus:v2.3.3
ports:
- "19530:19530"
- "9091:9091"
volumes:
- ./volumes/milvus:/var/lib/milvus
environment:
- ETCD_ENDPOINTS=etcd:2379
- MINIO_ADDRESS=minio:9000
depends_on:
- etcd
- minio
restart: unless-stopped
etcd:
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- ./volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
volumes:
- ./volumes/minio:/minio_data
command: minio server /minio_data
restart: unless-stopped
6.2 Dockerfile
dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 下载Embedding模型
RUN python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('shibing624/text2vec-base-chinese')"
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
6.3 启动服务
bash
# 复制环境变量模板
cp .env.example .env
# 编辑.env,填入API Key
# 启动所有服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f api
服务启动后,访问 http://localhost:8000/docs 查看API文档。
七、上线后的调优
系统上线后,监控发现几个问题,做了相应调优:
7.1 响应时间优化
问题:P95响应时间超过5秒,不满足要求。
原因分析:
- Milvus检索不稳定,部分查询耗时>2秒
- Embedding模型加载慢,每次请求都重新加载
- 网络IO阻塞
解决方案:
python
# 1. 模型预加载
embedding_service = EmbeddingService() # 服务启动时加载
# 2. Milvus连接池
connections.connect(host="milvus", port="19530", pool_size=10)
# 3. 异步处理
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# 并行执行检索和Embedding
retrieved_docs = await asyncio.to_thread(
retrieval_service.search,
query=request.question
)
...
7.2 准确率提升
问题:部分问题检索不到相关内容。
解决方案:
- 增加Query改写层
- 混合检索(向量+关键词)
- 调整chunk_size,找到最优值(我们测试后确定为300)
八、总结与经验
这个项目做完,有几点体会:
- RAG不是万能的:它解决的是”知识库+大模型”的问题,如果知识库本身质量差,RAG效果也好不了
- 数据准备比技术实现更重要:我们花在清洗文档、规范FAQ上的时间,占了整个项目的40%
- 监控很重要:上线前一定要埋点,记录检索结果、大模型输出、用户反馈,才能持续优化
- 渐进式迭代:第一版不用追求完美,先跑通流程,再根据实际反馈优化
以上就是完整的RAG项目实战经验,希望对你有帮助。
相关推荐:

发表回复