Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangChain RAG之向量检索器

检索器概念

LangChain中的检索器是RAG系统关键的读取环节,输入为用户问题字符串,输出为最相关的文档集合,主要功能是将用户输入与知识库关联,通过召回率衡量效果,并支持向量数据库封装成检索器进行高效搜索。

image-20250912102356793

  • 检索器系统是输入是一个字符串
  • 它的输出是一组文档(标准的Langchain Document对象)

说明:

  • 检索器系统是Langchain的概念封装
  • 包含了初存储外的所有RAG系统
  • 它将用户的输入与知识库碎片关联
  • 召回率是衡量检索器的重要指标
  • 它是一个标准的Runnable组件
  • 所有的向量存储都可以转为检索器
  • 最简单的使用as_retriver

检索器在LangChain中的实践

LangChain中通过多种检索器实现文档和信息的高效查询,包括基于数据库的retriver、文档检索、外部搜索(如Tavily搜索引擎)、词法搜索(如BM25算法)及向量数据库检索,支持本地部署与云端调用,部分检索器可混合使用以提升搜索效果。

检索器官方网址:https://python.langchain.com/docs/integrations/retrievers/

LangChain官方介绍:

  • 最基本

    • as_retriver
  • 文件检索

    image-20250912105309570

  • 外部检索

    image-20250912105439083

更多的类型:

  • 基于搜索引擎构建
  • 基于关系型数据库构建
  • 基于词法搜索构建
  • 基于向量数据库构建
  • 混合构建

检索器基本使用

示例一:基本检索器设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from langchain_openai import OpenAIEmbeddings

embeddings_model = OpenAIEmbeddings(
model="BAAI/bge-m3",
api_key=os.environ.get("API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import CharacterTextSplitter

loader = TextLoader("test.txt")

documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
vectorstore = FAISS.from_documents(texts, embeddings_model)

# 实例化检索器,使用最简单的as_retriever()方法
retriever = vectorstore.as_retriever()

# 检索器进行检索
docs = retriever.invoke("deepseek是什么?")

print(docs)

示例二:词法搜索检索器

BM25也称为Okapi BM25,是信息检索系统中用来估计文档与给定搜索查询的相关性的排名函数。

安装依赖

1
pip install -qU  rank_bm25

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 给BM25Retriever添加一组文档
from langchain_community.retrievers import BM25Retriever
from langchain_core.documents import Document

vretriever = BM25Retriever.from_texts(["foo", "bar", "world", "hello", "foo bar"])

retriever = BM25Retriever.from_documents(
[
Document(page_content="foo"),
Document(page_content="bar"),
Document(page_content="world"),
Document(page_content="hello"),
Document(page_content="foo bar"),
]
)

result = retriever.invoke("foo")

# 返回相关性排行,相关性逐渐降低
print(result)

使用预处理器进行增强,在词语级别检索效果比较显著,例如,使用punkt,是一个句子分割器模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import nltk

nltk.download("punkt_tab") # 下载分词器

from nltk.tokenize import word_tokenize

retriever = BM25Retriever.from_documents(
[
Document(page_content="foo"),
Document(page_content="bar"),
Document(page_content="world"),
Document(page_content="hello"),
Document(page_content="foo bar"),
],
k=2,
preprocess_func=word_tokenize, # 加载分词器
)

result = retriever.invoke("bar")
print(result)

查询重写:如何处理非结构化数据?

检索器通过查询重写提升检索效果,将用户原始问题改写为多个更精准的子问题,分别检索后合并结果,提高回答质量与相关性。

查询重写:通过这种检索策略,将用户的问题改为效率更高的问题或问题组

image-20250912110806892

例如:

  • 用户输入(用户提问):
    • baidu是一个什么网站
  • 重写(将用户的提问进行改写):
    • baidu是什么
    • baidu是谁开发的
    • baidu可以做什么

代码示例

使用chroma向量数据库

安装依赖

1
pip install langchain_chroma

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 模型设置
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# llm模型
llm_model = ChatOpenAI(
model="deepseek-ai/DeepSeek-V3",
api_key=os.environ.get("API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

# embedding模型
embeddings_model = OpenAIEmbeddings(
model="BAAI/bge-m3",
api_key=os.environ.get("API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

# 加载文档到向量库中
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Load blog post
loader = WebBaseLoader("https://python.langchain.com/docs/how_to/MultiQueryRetriever/")
data = loader.load()

# Split
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
splits = text_splitter.split_documents(data)

# VectorDB
vectordb = Chroma.from_documents(documents=splits, embedding=embeddings_model)

from langchain.retrievers.multi_query import MultiQueryRetriever

# 问题改写,将基础问题改写成多个子问题
question = "如何让用户查询更准确?"
retriever_from_llm = MultiQueryRetriever.from_llm(
retriever=vectordb.as_retriever(), llm=llm_model # 传入检索器
)

# 设置log,追踪输出
import logging

logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)

unique_docs = retriever_from_llm.invoke(question)
print(unique_docs)
len(unique_docs)

从输出内容来看,将问题:如何让用户查询更准确?,改写为三个问题

langchain.retrievers.multi_query:Generated queries: [‘1. 提升用户查询准确性的有效方法有哪些? ‘, ‘2. 优化搜索技术以提高查询精度的策略是什么? ‘, ‘3. 如何通过改进查询方式让搜索结果更匹配用户需求?’]

查询重构:如何处理结构化数据?

查询重写和重构是检索优化中的重要技术,前者用于将用户低质量的自然语言问题转化为高质量的问题,尤其在多轮对话中结合上下文改写问题以提高检索效果;后者则是将自然语言输入转换为高效的结构化查询语句(如SQL),通过大模型生成精确的查询语句提升检索效率,并可与数据库直接交互实现自然语言输入输出。

查询重构:这种检索策略聚焦在如何提高查询效率上,比如如何优化SQL语句

自然查询:

  • 适用于半结构化数据,一般用在向量数据库的元数据查询上
  • 使用SelfQueryRetriever的检索器
  • 核心是从元数据中查找信息

文本到SQL:

  • 将自然输入重建为相应的SQL查询
  • 使用自然语言来跑SQL语句

image-20250912140720592

代码示例:自然语言查询SQL

  • 步骤1: 将问题转换为 SQL 查询,模型将用户输入转换为 SQL 查询。
  • 步骤2: 执行 SQL 查询,执行查询。
  • 步骤3: 回答问题,模型使用查询结果响应用户输入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# 模型设置
import os
from langchain_openai import ChatOpenAI

# llm模型
llm_model = ChatOpenAI(
model="deepseek-ai/DeepSeek-V3",
api_key=os.environ.get("API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

# 查看数据库
from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///Chinook.db")
print(db.dialect)
print(db.get_usable_table_names())
db.run("SELECT * FROM Artist LIMIT 10;")

# 使用Hub上预制提示词(安装依赖:pip install langchainhub)
from langchain import hub

query_prompt_template = hub.pull("langchain-ai/sql-query-system-prompt")

# assert len(query_prompt_template.messages) == 1
query_prompt_template.messages[0].pretty_print()

# 使用LCEL创建一个最简单的SQL查询
from typing_extensions import Annotated
from typing_extensions import TypedDict

# Define the state type
class State(TypedDict):
question: str
query: str
result: str
answer: str

# Define the output type
class QueryOutput(TypedDict):
"""Generated SQL query."""

query: Annotated[str, ..., "Syntactically valid SQL query."]

# Define the write_query function
def write_query(state: State):
"""Generate SQL query to fetch information."""
prompt = query_prompt_template.invoke(
{
"dialect": db.dialect,
"top_k": 10,
"table_info": db.get_table_info(),
"input": state["question"],
}
)
structured_llm = llm_model.with_structured_output(QueryOutput)
result = structured_llm.invoke(prompt)
return {"query": result["query"]}

sqlMessage = write_query({"question": "一共有多少个员工?"})
print(sqlMessage)

# 设置执行SQL语句方法(注:该操作有风险)
from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool

def execute_query(state: State):
"""Execute SQL query."""
execute_query_tool = QuerySQLDatabaseTool(db=db)
return {"result": execute_query_tool.invoke(state["query"])}

# 执行生成的SQL语句
res = execute_query(sqlMessage)
print(res)

from langchain_core.runnables import RunnablePassthrough

# Define the chain to answer questions from SQL query
def answer_question(state: State):
"""Format answer based on the query result."""
prompt = f"""Based on the SQL query:
{state["query"]}

And the query result:
{state["result"]}

Answer the user's question: {state["question"]}
Provide a concise and informative response.
"""
return {"answer": llm_model.invoke(prompt).content}

# Create a full chain from question to answer
sql_chain = (
RunnablePassthrough.assign(query=write_query)
.assign(result=execute_query)
.assign(answer=answer_question)
)

# Example usage
question = "获取销售额最高的5位员工及其销售总额"
response = sql_chain.invoke({"question": question})

print("Question:", question)
print("\nGenerated SQL:")
print(response["query"])
print("\nExecution Result:")
print(response["result"])
print("\nAnswer:")
print(response["answer"])

最后执行结果为:

1
2
3
4
5
6
7
8
9
10
Question: 获取销售额最高的5位员工及其销售总额

Generated SQL:
{'query': 'SELECT e.FirstName, e.LastName, SUM(i.Total) AS TotalSales FROM Employee e JOIN Customer c ON e.EmployeeId = c.SupportRepId JOIN Invoice i ON c.CustomerId = i.CustomerId GROUP BY e.EmployeeId ORDER BY TotalSales DESC LIMIT 5;'}

Execution Result:
{'result': "[('Jane', 'Peacock', 833.04), ('Margaret', 'Park', 775.4), ('Steve', 'Johnson', 720.16)]"}

Answer:
{'answer': '根据查询结果,销售额最高的3位员工及其销售总额如下(仅返回了3位,可能数据中不足5位):\n\n1. Jane Peacock - 833.04\n2. Margaret Park - 775.40 \n3. Steve Johnson - 720.16\n\n数据已按销售总额降序排列。'}

检索器(策略)对比

多种检索优化策略,包括向量检索、文档检索、外部检索、关系数据库与图数据库查询、词法搜索及混合检索等,强调根据具体场景选择合适的检索方式以提升效果。

检索器 适用场景 说明
向量检索器 向量数据库检索查询 大部分向量库都支持
文档检索器 内部知识库/企业内网 可以使用类似ES搭建一个私有化的强大检索器
外部检索器 搜索场景优化 使用外部API进行检索,可不依赖数据库
关系数据库检索器 SQL查询、图数据查询 重点在对查询语言的重建(写更好的SQL)
词法搜索检索器 精准的字面匹配 类似传统搜索引擎,代表为BM25
多重检索 需要更好的召回率 返回与原始问题扩展后最相关的文档块
多重检索之分解 Deep research 返回原始问题扩展成子问题后的答案

检索调优:让RAG系统更快更准确

调优原理:将检索结果通过排序、压缩等算法进行进一步过滤,进一步提高相关性

  • 未做检索调优的结果一般依赖于向量库能力
  • 对于检索结果的调优可以大幅提升RAG准确度
  • 检索调优会产生额外的成本负担
  • 上下文压缩:支持单独或管道串联
  • 排序:长文进行切片后检索后出现的Lost in the middle现象
  • 相似性分数:一种精确定位方式
  • 混合搜索:不同算法互补的到更好结果

在LangChain中如何实现

  1. 上下文压缩
    • LLMChainExtrator
    • DocumentCompressonPipeline
  2. 排序
    • LongContextReorder
  3. 相似性分数
    • similarity_search_with_score
  4. 混合搜索
    • 词法搜索(BM25Retriever)
    • 向量相似搜索(FAISS)

调优示例

示例一:上下文压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
from langchain_openai import ChatOpenAI

# llm模型
llm_model = ChatOpenAI(
model="deepseek-ai/DeepSeek-V3",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

# 定义文档输出格式
def pretty_print_docs(docs):
print(
f"\n{'-' * 100}\n".join(
[f"Document {i+1}:\n\n" + d.page_content for i, d in enumerate(docs)]
)
)

from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import CharacterTextSplitter

documents = TextLoader("test.txt").load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
retriever = FAISS.from_documents(texts, OpenAIEmbeddings()).as_retriever()

docs = retriever.invoke("如何进行模型部署")
pretty_print_docs(docs)
  • 使用LLMChainExtractor:基础检索器ContextualCompressionRetriever以及LLMChainExtractor,它将迭代最初返回的文档,并从每个文档中仅提取与查询相关的内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain.retrievers.document_compressors import LLMChainExtractor

    compressor = LLMChainExtractor.from_llm(llm_model)
    compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
    )

    compressed_docs = compression_retriever.invoke(
    "如何进行部署"
    )
    pretty_print_docs(compressed_docs)
  • LLMChainFilter:使用 LLM 链来决定过滤掉哪些最初检索到的文档以及返回哪些文档,而无需操作文档内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    from langchain.retrievers.document_compressors import LLMChainFilter

    _filter = LLMChainFilter.from_llm(llm_model)
    compression_retriever = ContextualCompressionRetriever(
    base_compressor=_filter, base_retriever=retriever
    )

    compressed_docs = compression_retriever.invoke(
    "如何进行部署"
    )
    pretty_print_docs(compressed_docs)
  • 多个压缩器组合管道

    • 使用DocumentCompressorPipeline轻松地按顺序组合多个压缩器
    • 将文档拆解成更小的碎片
    • 删除冗余文档
    • 串联多个压缩器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    from langchain.retrievers.document_compressors import DocumentCompressorPipeline
    from langchain_community.document_transformers import EmbeddingsRedundantFilter
    from langchain_text_splitters import CharacterTextSplitter
    from langchain.retrievers.document_compressors import EmbeddingsFilter

    # 设置多个压缩器
    splitter = CharacterTextSplitter(chunk_size=400, chunk_overlap=0, separator=". ")
    redundant_filter = EmbeddingsRedundantFilter(embeddings=OpenAIEmbeddings())
    relevant_filter = EmbeddingsFilter(embeddings=OpenAIEmbeddings(), similarity_threshold=0.76)
    # 管道进行组合
    pipeline_compressor = DocumentCompressorPipeline(
    transformers=[splitter, redundant_filter, relevant_filter]
    )

    compression_retriever = ContextualCompressionRetriever(
    base_compressor=pipeline_compressor, base_retriever=retriever
    )

    compressed_docs = compression_retriever.invoke(
    "如何进行部署"
    )
    pretty_print_docs(compressed_docs)

示例二:排序

什么是lost in the middle:

  • 出现的原因:对于文章长度比较长的情况下,横轴的文章的长度,纵轴是问题回答的正确率,可以从图中的趋势看到,答案在文章的中间部分会容易被忽略,而且两头的位置则可以很高几率高的找到答案

    image-20250915085918128

  • 对于提高找出准确答案的相关性,则需要进行重新排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
from langchain_openai import ChatOpenAI

# llm模型
llm_model = ChatOpenAI(
model="deepseek-ai/DeepSeek-V3",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

from langchain_openai import OpenAIEmbeddings

# embedding模型
embeddings_model = OpenAIEmbeddings(
model="BAAI/bge-m3",
api_key=os.environ.get("API_KEY"),
base_url="https://api.siliconflow.cn/v1",
)

from langchain_core.vectorstores import InMemoryVectorStore

texts = [
"西湖是杭州著名的旅游景点。",
"我最喜欢的歌曲是《月亮代表我的心》。",
"故宫是北京最著名的古迹之一。",
"这是一篇关于北京故宫历史的文档。",
"我非常喜欢去电影院看电影。",
"北京故宫的藏品数量超过一百万件。",
"这只是一段随机文本。",
"《三国演义》是中国四大名著之一。",
"紫禁城是故宫的别称,位于北京。",
"故宫博物院每年接待游客数百万人次。",
]

# 创建检索器
retriever = InMemoryVectorStore.from_texts(texts, embedding=embeddings_model).as_retriever(
search_kwargs={"k": 10}
)
query = "请告诉我关于故宫的信息?"

# 获取按相关性排序的文档
docs = retriever.invoke(query)
for doc in docs:
print(f"- {doc.page_content}")

返回的文档按与查询的相关性降序排列。LongContextReorder文档转换器将实现上述重新排序:

1
2
3
4
5
6
7
8
9
10
11
from langchain_community.document_transformers import LongContextReorder

# 重新排序文档:
# 相关性较低的文档将位于列表中间
# 相关性较高的文档将位于开头和结尾
reordering = LongContextReorder()
reordered_docs = reordering.transform_documents(docs)

# 确认相关性高的文档位于开头和结尾
for doc in reordered_docs:
print(f"- {doc.page_content}")

整合到chain里面去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import PromptTemplate

prompt_template = """
Given these texts:
-----
{context}
-----
Please answer the following question:
{query}
"""

prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "query"],
)

# Create and invoke the chain:
chain = create_stuff_documents_chain(llm_model, prompt)
response = chain.invoke({"context": reordered_docs, "query": query})
print(response)

示例三:相似性分数

与向量数据库的特性有关,因此以向量数据库为例,该例子使用轻量级的chroma向量数据库,需要安装依赖:pip install -qU "langchain-chroma"

初始化数据库和添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from langchain_chroma import Chroma

vector_store = Chroma(
collection_name="example_collection", # 集合名字
embedding_function=embeddings_model,
persist_directory="chroma_langchain_db", # 可选参数,指定本地持久化目录
)

import chromadb

persistent_client = chromadb.PersistentClient()
collection = persistent_client.get_or_create_collection("collection_name")
collection.add(ids=["1", "2", "3"], documents=["a", "b", "c"])

vector_store_from_client = Chroma(
client=persistent_client,
collection_name="collection_name",
embedding_function=embeddings_model,
)

from uuid import uuid4

from langchain_core.documents import Document

document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
id=1,
)

document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
id=2,
)

document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
id=3,
)

document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
id=4,
)

document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
id=5,
)

document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
id=6,
)

document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
id=7,
)

document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
id=8,
)

document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
id=9,
)

document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
id=10,
)

documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]

vector_store.add_documents(documents=documents, ids=uuids)
  • 相似性分数搜索

    1
    2
    3
    4
    5
    results = vector_store.similarity_search_with_score(
    "Will it be hot tomorrow?", k=1, filter={"source": "news"}
    )
    for res, score in results:
    print(f"* [SIM={score:3f}] {res.page_content} [{res.metadata}]")
  • 为文档添加分数:通过一个自定义链,可以为原始文档增加相关性评分

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from typing import List

    from langchain_core.documents import Document
    from langchain_core.runnables import chain


    @chain
    def retriever(query: str) -> List[Document]:
    docs, scores = zip(*vector_store.similarity_search_with_score(query))
    for doc, score in zip(docs, scores):
    doc.metadata["score"] = score

    return docs

    result = retriever.invoke("Robbers")
    print(result)