什么是RAG(检索增强生成)?
RAG(检索增强生成)是一种结合检索系统与生成模型的技术,通过向量化知识库并进行语义搜索,找到与问题最相关的文档内容,将其融入提示词后交由大模型生成更准确、实时且可溯源的回答,从而弥补大模型预训练数据陈旧和不全面的问题。
大模型现有存在的问题:
- 预训练数据陈旧
- Claude:2023年8月
- Deepseek R1:2023年12月
- GPT-4o:2023年10月
- Liama 3.3: 2023年10月
- 预训练数据不全面
- 未公开数据
- 企业内部数据
- 实时动态数据
- 专业性极强的内部知识
- 伦理限制的内容
- 小众文化信息
大模型训练流程:
flowchart LR
A[数据爬取] --> B[数据清洗]
B --> C[数据预处理]
C --> D[模型训练]
D --> E[模型推理]
大模型训练:单向、耗时、算力、一次成型
因此,大模型并非完美,预训练数据导致了基础差异和不可进化
RAG流程

- Retrieval Augmented Generation(RAG)检索增强生成
- 是一种将检索系统于生成AI结合的技术,通过从知识库中检索相关信息来增强AI模型的问答,使其能够提供更准确、最新且可溯源的信息。简化了复杂AI工作流的构建过程
- RAG并不是智能体的必选项,但是是能力增强的必选项
RAG构成

- RAG利用向量化首相将知识库向量化
- 将用户的输入也进行向量化
- 在提问大模型前先进行向量搜索,找到于问题语义最接近的文档块
- 将相关文档潜入提示词
- 提交大模型得到一个更优质答案
知识(数据)预处理
RAG系统中数据处理的流程,包括文档加载、切片、向量化和存储到向量数据库的全过程,langchain在实现这些步骤时的封装与生态支持,强调其便捷性和广泛的数据兼容性。
文档向量化是向量搜索的前置条件

graph LR
A[加载文档] --> B(分割文档)
B --> C(嵌入文档)
C --> D(存储文档)
LangChain中的实现

graph LR
A[各种文档] --> B(各种loader)
B --> C(文本切片)
C --> D(向量化)
D --> E(向量存储)
E --> F(向量检索)
常见的Loader加载器
Launchin中的Loader加载各种数据格式,如PDF、网页、CSV、Excel等。

LangChain的Loader加载器的示例
加载PDF
需要安装依赖:
1 2 3 4 5 6
| pip install langchain_community pip install pypdf # 多模态必须安装依赖 pip install PyMuPDF pip install pillow pip install IPython
|
示例一:加载纯文本PDF
1 2 3 4 5 6 7 8 9 10 11 12
| from langchain_community.document_loaders import PyPDFLoader
file_path = "deepseek.pdf"
loader = PyPDFLoader(file_path) pages = [] async for page in loader.alazy_load(): pages.append(page) print(f"{pages[0].metadata}\n") print(pages[0].page_content)
|
示例二:加载图文PDF — 多模型模型解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import base64 import io import fitz from PIL import Image
file_path = "z2021.pdf"
def pdf_page_to_base64(pdf_path: str, page_num: int): doc = fitz.open(pdf_path) page = doc.load_page(page_num - 1) pix = page.get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='PNG')
return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')
from IPython.display import display, Image as IPythonImage
base64_image = pdf_page_to_base64(file_path, 11) display(IPythonImage(data=base64.b64decode(base64_image)))
import os from langchain_openai import ChatOpenAI
llm = ChatOpenAI( model="Qwen/Qwen2.5-VL-32B-Instruct", base_url="https:/xxxxxxxxxxxxx/v1", api_key=os.environ.get("API_KEY") )
from langchain_core.messages import HumanMessage
query = "一线城市消费占比有多少?"
message = HumanMessage( content=[ {"type": "text", "text": query}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} ] ) response = llm.invoke([message]) print(response.content)
|
加载网页
需要安装依赖
1 2 3
| pip install -qU beautifulsoup4 pip install -qU unstructured pip install -qU langchain-unstructured
|
示例一:完全加载网页
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import bs4 from langchain_community.document_loaders import WebBaseLoader
page_url="https://python.langchain.com/docs/integrations/chat/"
loader = WebBaseLoader(web_paths=[page_url]) docs = [] async for doc in loader.alazy_load(): docs.append(doc)
assert len(docs) == 1 doc = docs[0]
print(f"{doc.metadata}\n") print(doc.page_content[:500].strip())
|
示例二:制定加载网页某一部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import bs4 from langchain_community.document_loaders import WebBaseLoader
loader = WebBaseLoader( web_paths=[page_url], bs_kwargs={ "parse_only": bs4.SoupStrainer(class_="codeBlockLines_e6Vv") }, bs_get_text_kwargs={"separator": "|", "strip": True} ) docs = [] async for doc in loader.alazy_load(): docs.append(doc)
assert len(docs) == 1 doc = docs[0]
print(f"{doc.metadata}\n") print(doc.page_content[:500])
|
示例三:高级:不熟悉网页结构的情况下解析网页
1 2 3 4 5 6 7 8 9 10 11
| from langchain_unstructured import UnstructuredLoader
page_url="https://python.langchain.com/docs/integrations/chat/"
loader = UnstructuredLoader(web_url=page_url) docs = [] async for doc in loader.alazy_load(): docs.append(doc) for doc in docs[:5]: print(doc.page_content)
|
加载CSV
示例一:加载CSV
1 2 3 4 5 6 7 8 9
| from langchain_community.document_loaders.csv_loader import CSVLoader
file_path="PMBOK6.csv"
loader = CSVLoader(file_path=file_path) data = loader.load()
for record in data[:2]: print(record)
|
示例二:指定一列来标识文档
1 2 3 4 5 6 7 8 9 10
| from langchain_community.document_loaders.csv_loader import CSVLoader
file_path="PMBOK6.csv"
loader = CSVLoader(file_path=file_path, source_column="名称")
data = loader.load()
for record in data[:2]: print(record)
|
解析Excel
使用openpyxl解析Excel
需要安装依赖
1
| pip install -qU openpyxl
|
1 2 3 4 5 6 7 8
| from langchain_community.document_loaders import UnstructuredExcelLoader file_path = "PMBOK62.xlsx" loader = UnstructuredExcelLoader(file_path=file_path) docs = loader.load()
for doc in docs[:2]: print(doc.page_content)
|
自定义文档加载器
可以使用LangChain提供的现有加载器,但也可以自定义加载器,自定义加载器编写,要遵循以下要求:
| Method Name |
Explanation |
| lazy_load |
Used to load documents one by one lazily. Use for production code. |
| alazy_load |
Async variant of lazy_load |
| load |
Used to load all the documents into memory eagerly. Use for prototyping or interactive work. |
| aload |
Used to load all the documents into memory eagerly. Use for prototyping or interactive work. Added in 2024-04 to LangChain. |
需要安装的依赖
1
| pip install -qU aiofiles
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| from typing import AsyncIterator, Iterator
from langchain_core.document_loaders import BaseLoader from langchain_core.documents import Document
class CustomDocumentLoader(BaseLoader): """逐行读取文件的文档加载器示例"""
def __init__(self, file_path: str) -> None: """使用文件路径初始化加载器
参数: file_path: 要加载的文件路径 """ self.file_path = file_path
def lazy_load(self) -> Iterator[Document]: """逐行读取文件的惰性加载器
当实现惰性加载方法时,你应该使用生成器 一次生成一个文档 """ with open(self.file_path, encoding="utf-8") as f: line_number = 0 for line in f: yield Document( page_content=line, metadata={"line_number": line_number, "source": self.file_path}, ) line_number += 1
async def alazy_load( self, ) -> AsyncIterator[Document]: """逐行读取文件的异步惰性加载器""" import aiofiles
async with aiofiles.open(self.file_path, encoding="utf-8") as f: line_number = 0 async for line in f: yield Document( page_content=line, metadata={"line_number": line_number, "source": self.file_path}, ) line_number += 1
with open("meow.txt", "w", encoding="utf-8") as f: quality_content = "meow meow🐱 \n meow meow🐱 \n meow😻😻" f.write(quality_content)
loader = CustomDocumentLoader("meow.txt")
for doc in loader.lazy_load(): print() print(type(doc)) print(doc)
|
文档切分
需根据上下文窗口限制和检索效率对文档进行切分。主要策略包括按长度、文本架构(如段落、句子)、文档格式或语义智能切分,以确保切片为独立语义单元,提升后续检索效果。
基于文档长度切分
需要安装依赖包
1
| pip install -qU langchain-text-splitters
|
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from langchain_community.document_loaders import PyPDFLoader
file_path='deepseek.pdf'
loader = PyPDFLoader(file_path) pages = [] async for page in loader.alazy_load(): pages.append(page) from langchain_text_splitters import CharacterTextSplitter
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( encoding_name="cl100k_base", chunk_size=50, chunk_overlap=10 ) text_1 = text_splitter.split_text(pages[1].page_content) print(text_1) text_2 = text_splitter.split_text(pages[2].page_content) print(text_2) docs = text_splitter.create_documents([pages[2].page_content,pages[3].page_content]) print(docs)
|
基于文本架构
示例一:普通文本
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from langchain_community.document_loaders import PyPDFLoader
file_path='deepseek.pdf'
loader = PyPDFLoader(file_path) pages = [] async for page in loader.alazy_load(): pages.append(page)
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=50, chunk_overlap=0) texts = text_splitter.split_text(pages[1].page_content) print(texts)
|
示例二:基于Markdown,根据标题拆分(例如,#、##、###)
1 2 3 4 5 6 7 8 9 10 11 12 13
| from langchain_text_splitters import MarkdownHeaderTextSplitter
markdown_document = "# Foo\n\n ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly"
headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ("###", "Header 3"), ]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on) md_header_splits = markdown_splitter.split_text(markdown_document) print(md_header_splits)
|
示例三:基于json,按对象或数组元素拆分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import requests json_data = requests.get("https://api.smith.langchain.com/openapi.json").json()
from langchain_text_splitters import RecursiveJsonSplitter
splitter = RecursiveJsonSplitter(max_chunk_size=300) json_chunks = splitter.split_json(json_data=json_data)
for chunk in json_chunks[:3]: print(chunk)
docs = splitter.create_documents(texts=[json_data])
for doc in docs[:3]: print(doc)
|
基于语义切分
需要安装的依赖
1
| pip install -q langchain_experimental
|
示例代码
1 2 3 4 5 6 7 8 9 10 11
| from langchain_experimental.text_splitter import SemanticChunker from langchain_openai.embeddings import OpenAIEmbeddings
text_splitter = SemanticChunker(OpenAIEmbeddings())
with open("meow.txt") as f: meow = f.read() docs = text_splitter.create_documents([meow]) print(docs[0].page_content)
|