Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangChain LCEL表达式语言

Runnable介绍

什么是Runnable

Runnable 是 LangChain 0.3 中的核心接口,代表一个可执行的组件。它可以被调用(同步或异步),也可以与其他 Runnable 对象组合构建更复杂的链。

Runnable接口是Langchain的核心组件通信标准,通过统一输入输出格式及调用方式,降低组件间耦合度,支持同步异步操作,简化工作流构建,使不同组件能无缝连接,并提供运行时配置功能以增强灵活性和可扩展性。

image-20250904091914451

  • Runnable是LangChain的核心接口,提供统一的调用方式,使不同组件可以无缝连接
  • 简化了复杂AI工作流的构建过程
  • 支持同步和异步操作
  • 可通过管道操作符 | 连接多个组件

注意:并不是所有的组件都支持所有的统一的调用方法,尤其是异步事件

Runnable对组件的输入输出要求:

Component Input Type Output Type
Prompt dictionary PromptValue
ChatModel a string, list of chat messages or PromptValue ChatMessage
LLM a string, list of chat messages or PromptValue String
OutputParser the output of an LLM or ChatModel Depends on the parser
Retriever a string List of Documents
Tool a string or dictionary, depending on the tool Depends on the tool

Runable对组件的输入进行了强制统一(注意:若执行有报错,很大的原因则源头报错)

第二个参数:RunableConfig是在运行时态的一种定义,比如回调等

Attribute Description
run_name Name used for the given Runnable(not iherited)
run_id Unique identifier for this call. sub-calls will get their own unique run ids
tags Tags for this call and sub-calls
metadata Metadata for this call and any sub-calls
callbacks Callbacks for this class and any sub-calls
max_concurrency Maximum number of parallel calls to make(e.g. used by batch)
recursion_limit Maximum number of times a call can recurse(e.g. used by Runnables that return Runnables)
configurable Runtime values for configurable attributes of the Runnable

为什么使用 Runnable

特性 说明
标准化接口 所有组件都可以 .invoke()/ .stream()/ .batch()
可组合性强 可通过 \ 进行链式调用
可观察性强 所有执行都可以加 callback,用于调试、追踪、评估
支持异步 支持同步与异步执行 .ainvoke().astream()

Runnable 的主要方法

方法名 类型 说明
.invoke(input) 同步执行 接收一个输入,返回一个输出
.ainvoke(input) 异步执行 异步方式执行
.stream(input) 流式响应 用于返回响应片段(如聊天消息 chunk)
.batch(inputs) 批处理 接收多个输入列表,返回多个输出
.bind(config) 配置绑定 绑定静态配置,如 stop, temperature

哪些组件是 Runnable

组件类型 实现 Runnable 接口?
ChatOpenAI/ ChatAnthropic
PromptTemplate / ChatPromptTemplate
StrOutputParser / JsonOutputParser
RunnableLambda/ RunnableMap
自定义函数包装器 RunnableLambda(fn)

LCEL简介

LangChain LCEL的全称为LangChain Expression Language即可直译为LangChain表达式。

为了构造更复杂的LLM应用并且更为简便快捷的构造LLM应用,LangChain提供了类似”管道“的形式去声明提示词模版(prompt),即用”|“来连接各个组件之间的操作。也就是LCEL允许开发者将不同的模块进行简单的形式视线串联。语法如下所示:

chain = 提示词模板 | 大模型调用 | 输出解析器

image-20250904094128108

LCEL的好处

  • 优化的并行执行
  • 稳定的异步支持
  • 简化流式传输
  • 无缝植入LangSmith追踪
  • 标准API
  • 可使用LangServe部署(FastAPI + LCEL)

LCEL封装核心内容

Runnables的API方法:https://python.langchain.com/api_reference/core/runnables.html

image-20250904094645701

在使用LCEL表达式时,需要先了解其中包含的元素:

  • |:连接符
  • Runnable对象:可执行操作

LCEL使用场景

使用场景

  • 单个大模型调用:无需LCEL,直接调用模型即可(相当于简单使用deepseek)
  • 简单链(llm + 提示词 + 解析器),使用LCEL最佳场景
  • 构建复杂链(分支、循环、多个智能体等),使用LangGraph
  • LCEL可以嵌入在绝大部分LangChain生态应用里

旧版本中的预制链

  • V0.0版本中存在大量的预制链(LLMChain、CoversationChain…)
  • 在0.3版本中依然支持但建议弃用(提示词不可见无法自定义等)
  • 建议使用LCEL或LangGraph方式进行自定义重构
  • 迁移示例:LLMChain StuffDocumentsChain

链的基本使用

  • 使用管道操作符快速生成一条链
  • 链的流式调用
  • 并行运行多条链
  • 链的调试

使用管道操作符快速生成一条链

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

# 使用管道操作符来实现一条链
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话,不要有任何解释")

chain = prompt | llm | StrOutputParser()
res=chain.invoke({"topic": "鲜花"})
print(res)

链的pipe方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话,不要有任何解释")

chain = (prompt.pipe(llm).pipe(StrOutputParser()))
res=chain.invoke({"topic": "狗"})
print(res)

链的流式调用

  • 异步调用astream
  • JSON流输出
  • 使用流事件

注意:不是所有的组件都支持流式输出,当不支持的组件呗封装在chain中后,最后的结果依旧可以使用流输出

异步调用astream

  • stream: 同步调用
  • astream: 异步调用

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话,不要有任何解释")

chain = prompt | llm | StrOutputParser()

# for chunk in chain.stream("小狗"):
# print(chunk, end="|")

# 异步调用
async for chunk in chain.astream("小狗"):
print(chunk, end="|", flush=True)

JSON流输出

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import JsonOutputParser

# 创建一个链,将LLM的输出通过JsonOutputParser进行解析
# 注意:由于旧版本Langchain的一个bug,JsonOutputParser可能无法正确流式处理某些模型的结果
chain = ( llm | JsonOutputParser())

# 使用astream方法进行异步流式处理
# 发送提示要求模型以JSON格式输出法国、西班牙和日本的国家及其人口信息
async for text in chain.astream(
"以JSON格式输出法国、西班牙和日本的国家及其人口信息"
"使用一个字典,包含外层键名为'countries'的国家列表"
"每个国家应该有键名为'name'和'population'"
):
# 打印每个流式返回的文本片段,并立即刷新输出缓冲区
print(text, flush=True)

事件流

  • 这是一个测试事件

  • 可以将流的过程进行分解,从而事件更细颗粒度的控制

  • langchain-core >= 0.2

  • 事件流的颗粒度:

    | event | name | chunk | input | output |
    | —————————— | ———————— | ———————————————- | —————————————————————— | ——————————————————————— |
    | on_chat_model_start | [model name] | | {“messages”:[[SystemMessage, HumanMessage]]} | |
    | on_chat_model_stream | [model name] | AIMessageChunk(content=”hello”) | | |
    | on_chat_model_end1 | [model name] | | {“messages”:[[SystemMessage, HumanMessage]]} | AIMessageChunk(content=”hello world”) |
    | on_llm_start | [model name] | | {“input”: “hello”} | |
    | on_llm_stream | [model name] | ‘Hello’ | | |
    | on_llm_end | [model name] | | ‘Hello human!’ | |
    | on_chain_start | format_docs | | | |
    | on_chain_stream | format_docs | “hello world!, goodbye world!” | | |
    | on_chain_end | format_docs | | [Document(…)] | “hello world!, goodbye” |
    | on_tool_start15 | some_tool16 | | {“x”: 1, “y”: “2”} | |
    | on_tool_end18 | some_tool19 | | | {“x”: 1, “y”: “2”} |
    | on_retriever_start | [retriever name] | | {“query”: “hello”} | |
    | on_retriever_end | [retriever name] | | {“query”: “hello”} | [Document(…), …] |
    | on_prompt_start | [template_name] | | {“question”: “hello”} | |
    | on_prompt_end | [template_name] | | {“question”: “hello”} | ChatPromptValue(messages=[SystemMessage, …]) |

    注意对于版本langchain-core<0.3.37,需要显式地指定事件流版本

1
2
3
events = []
async for event in llm.astream_events("hello",version="v2"):
events.append(event)

事件过滤-按name

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import JsonOutputParser

chain = llm.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
include_names=["my_parser"],version="v2"
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break

事件过滤 - 按tag

按照tag来过滤事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import JsonOutputParser

chain = (llm | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_tags=["my_chain"],version="v2"
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break

事件阶段过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import JsonOutputParser

chain = (llm | JsonOutputParser()).with_config({"tags": ["my_chain"]})

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break

并行运行多条链

并行运行链会构造:

1
2
3
4
5
6
7
    Input
/ \
/ \
Branch1 Branch2
\ /
\ /
Combine

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel

joke_chain = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话") | llm
poem_chain = (
ChatPromptTemplate.from_template("给我写一首关于{topic}的绝句") | llm
)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

map_chain.invoke({"topic": "程序员"})

查看图,需要安装依赖pip install grandalf

1
map_chain.get_graph().print_ascii()

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
               +--------------------------+                
| Parallel<joke,poem>Input |
+--------------------------+
*** ***
*** ***
** **
+--------------------+ +--------------------+
| ChatPromptTemplate | | ChatPromptTemplate |
+--------------------+ +--------------------+
* *
* *
* *
+------------+ +------------+
| ChatOpenAI | | ChatOpenAI |
+------------+* +------------+
*** ***
*** ***
** **
+---------------------------+
| Parallel<joke,poem>Output |
+---------------------------+

查看提示词

1
map_chain.get_prompts()

执行结果

1
2
[ChatPromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, template='给我讲一个关于{topic}的笑话'), additional_kwargs={})]),
ChatPromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, template='给我写一首关于{topic}的绝句'), additional_kwargs={})])]

老版本的chain迁移到LCEL

示例1:从LLMChain迁移

废弃的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain.chains import LLMChain
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
[
("user", "Tell me a {adjective} joke")
]
)
legacy_chain = LLMChain(llm=llm, prompt=prompt)
legacy_chain.run({"adjective": "funny"})
legacy_chain

LCEL的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
[
("user", "Tell me a {adjective} joke")
]
)
chain = prompt | llm | StrOutputParser()
chain.invoke({"adjective": "funny"})

示例2: 从StuffDocumentsChain迁移

是一个用于文档内容总结的预制链

废弃的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.documents import Document

documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]

from langchain.chains import LLMChain, StuffDocumentsChain
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate

# This controls how each document will be formatted. Specifically,
# it will be passed to `format_document` - see that function for more
# details.
document_prompt = PromptTemplate(
input_variables=["page_content"], template="{page_content}"
)
document_variable_name = "context"
# The prompt here should take as an input variable the
# `document_variable_name`
prompt = ChatPromptTemplate.from_template("Summarize this content: {context}")
llm_chain = LLMChain(llm=llm, prompt=prompt)
chain = StuffDocumentsChain(
llm_chain=llm_chain,
document_prompt=document_prompt,
document_variable_name=document_variable_name,
)

result = chain.invoke(documents)
result["output_text"]

LCEL用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.documents import Document
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]

prompt = ChatPromptTemplate.from_template("Summarize this content: {context}")
chain = create_stuff_documents_chain(llm, prompt)

result = chain.invoke({"context": documents})
result

链的高级应用

链(Chain)的高级使用技巧,包括如何将函数通过修饰符快速转换为链、在链中使用Lambda函数、自定义支持流式输出的函数、链中的传值方式(如config和记忆功能)、以及创建路由链和回退机制等。重点介绍了函数在链中的灵活应用,使开发者可以更方便地构建复杂的应用逻辑。

在链中使用函数

使用@chain快速将函数转为链

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

# 导入字符串输出解析器
from langchain_core.output_parsers import StrOutputParser
# 导入chain装饰器,用于创建自定义链
from langchain_core.runnables import chain
from langchain_core.prompts import ChatPromptTemplate


# 创建第一个提示模板:请求关于特定主题的笑话
prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
# 创建第二个提示模板:询问笑话的主题是什么
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


# 使用@chain装饰器定义一个自定义链
@chain
def custom_chain(text):
# 步骤1: 将输入文本填充到第一个提示模板中
prompt_val1 = prompt1.invoke({"topic": text})
# 步骤2: 使用大模型生成关于指定主题的笑话
output1 = llm.invoke(prompt_val1)
# 步骤3: 将模型输出解析为字符串
parsed_output1 = StrOutputParser().invoke(output1)

# 步骤4: 创建第二个处理链,用于分析笑话主题
# 这个链将提示模板、DS模型和字符串解析器串联起来
chain2 = prompt2 | llm | StrOutputParser()

# 步骤5: 将第一步生成的笑话作为输入,让第二个链分析其主题
return chain2.invoke({"joke": parsed_output1})


# 调用自定义链,输入主题"bears"(熊)
# 整个过程:
# 1. 先生成一个关于熊的笑话
# 2. 然后分析这个笑话的主题是什么
# 3. 返回分析结果
custom_chain.invoke("bears")

链中使用Lambda

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from operator import itemgetter # 导入itemgetter函数,用于从字典中提取值
from langchain_core.prompts import ChatPromptTemplate # 导入聊天提示模板
from langchain_core.runnables import RunnableLambda # 导入可运行的Lambda函数包装器

def length_function(text):
return len(text)

def _multiple_length_function(text1, text2):
return len(text1) + len(text2)

def multiple_length_function(_dict):
return _multiple_length_function(_dict["text1"], _dict["text2"])

# 创建一个简单的聊天提示模板,询问a和b的和
prompt = ChatPromptTemplate.from_template("what is {a} + {b}")


# 构建一个复杂的处理链
chain = (
{
# 处理"a"参数:
# 1. 从输入字典中提取"foo"键的值
# 2. 将提取的值传递给length_function函数(假设这个函数计算字符串长度)
# RunnableLambda 运行的是Lambda函数
"a": itemgetter("foo") | RunnableLambda(length_function),

# 处理"b"参数:
# 1. 创建一个包含两个键值对的字典:
# - "text1": 从输入字典中提取"foo"键的值
# - "text2": 从输入字典中提取"bar"键的值
# 2. 将这个字典传递给multiple_length_function函数
# (假设这个函数计算两个文本的总长度)
"b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
| RunnableLambda(multiple_length_function),
}
| prompt # 将处理后的"a"和"b"值填入提示模板
| llm # 将填充后的提示发送给大模型生成回答
)


# 调用链处理流程,输入一个包含"foo"和"bar"键的字典
# 整个过程:
# 1. 计算"bar"字符串的长度作为a的值
# 2. 计算"bar"和"gah"字符串的总长度作为b的值
# 3. 将这些值填入提示"what is {a} + {b}"
# 4. 让DeepSeek模型回答这个问题
chain.invoke({"foo": "bar", "bar": "gah"})

在链中自定义支持流输出的函数

  • 当链被使用stream或astream调用的时候
  • 如何在链中增加自定义函数

一个简单的链支持流调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import StrOutputParser

# 创建一个聊天提示模板,要求生成5个与给定动物相似的动物名称,以都好分隔
prompt = ChatPromptTemplate.from_template("请列出5个与以下动物相似的动物名称,用逗号分隔: {animal}, 不要包含数字。")

# 创建一个处理链:提示词模板 -》 模型 -》 字符串输出解析器
str_chain = prompt | llm | StrOutputParser()

# 流式输出,输入为“熊”
for chunk in str_chain.stream(prompt.invoke({"animal": "熊"})):
print(chunk, end="", flush=True)

增加自定义函数

  • 聚合当前流的输出
  • 在生成下一个逗号的时候组合
  • 注意:使用了yield

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.output_parsers import StrOutputParser

# 创建一个聊天提示模板,要求生成5个与给定动物相似的动物名称,以都好分隔
prompt = ChatPromptTemplate.from_template("请列出5个与以下动物相似的动物名称,用逗号分隔: {animal}, 不要包含数字。")

# 创建一个处理链:提示词模板 -》 模型 -》 字符串输出解析器
str_chain = prompt | llm | StrOutputParser()

from typing import Iterator, List

# 这是一个自定义解析器,将LLM输出的标记迭代器
# 按逗号分隔转换为字符串列表
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
# 保存部分输入直到遇到逗号
buffer = ""
for chunk in input:
# 将当前块添加到缓冲区
buffer += chunk
# 当缓冲区中有逗号时
while "," in buffer:
# 在逗号处分割缓冲区
comma_index = buffer.index(",")
# 输出逗号之前的所有内容
yield [buffer[:comma_index].strip()]
# 保存剩余部分用于下一次迭代
buffer = buffer[comma_index + 1 :]
# 输出最后一块
yield [buffer.strip()]


list_chain = str_chain | split_into_list

for chunk in list_chain.stream({"animal": "熊"}):
print(chunk, flush=True)

扩展:yeild与return区别

  • return 函数立即计算并返回所有结果,而yield函数按需计算结果
  • return 函数返回一个数据结构(如列表),yield函数返回一个生成器对象
  • yield函数可以处理潜在的无限序列,而return函数必须在有限时间内完成
  • 生成器对象是一次性的,遍历完后就被消耗完毕,而return返回的数据结构可以重复使用
  • yield特别适合处理大数据集或流式数据,因为它不需要一次性将所有数据加载到内存中

return使用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 使用return
def get_squares_return(n):
"""
返回包含0到n-1的平方数的列表
"""
result = []
for i in range(n):
result.append(i * i)
return result

# 使用return函数
squares = get_squares_return(5)
# 一次性获取所有结果
print("使用return的结果:", squares)
# 返回类型是列表
print("类型:", type(squares))

# 遍历结果
for square in squares:
print(square)

yeild使用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 使用yield
def get_squares_yield(n):
"""
返回包含0到n-1的平方数的列表
"""
for i in range(n):
yield i * i # 每次生成一个结果并暂停

# 使用yield函数
squares = get_squares_yield(5)
# 返回一个生成器对象
print("使用yield的结果:", squares)
# 返回类型是生成器
print("类型:", type(squares))

# 遍历生成器
for square in squares:
print(square) # 每次迭代时才计算下一个值

# 再次遍历生成器
print("再次遍历:")
for square in squares:
print(square) # 不会输出任何内容,因为生成器已经耗尽完毕

使用RunnablePassthrough来传递值

示例代码

1
2
3
4
5
6
7
8
9
10
11
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 创建一个可并行运行的处理流程
runnable = RunnableParallel(
passed=RunnablePassthrough(), # 第一个处理器:直接传递输入,不做修改
modified=lambda x: x["num"] + 1, # 第二个处理器:取出输入中的"num"值并加1
)

# 执行这个处理流程,输入是一个包含"num"字段的字典
runnable.invoke({"num": 1})
# 运行结果:{'passed': {'num': 1}, 'modified': 2}

如何在运行时动态添加链的配置

动态改写模型温度

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from langchain.prompts import PromptTemplate
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
import os

llm = ChatOpenAI(
temperature=0.0,
base_url=os.getenv("OPENAI_API_BASE"),
api_key= os.getenv("OPENAI_API_KEY")
).configurable_fields(
temperature=ConfigurableField(
id="llm_temperature",
name="LLM Temperature",
description="Temperature for the model"
)
)

llm.invoke("随意挑选一个随机数,输出为一个整数")

# 在运行时改写温度
llm.with_config(configurable={"llm_temperature": 0.8}).invoke("随意挑选一个随机数,输出为一个整数")

动态切换提示词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langchain.runnables.hub import HubRunnable

prompt = HubRunnable("rlm/rag-prompt").configurable_fields(
owner_repo_commit=ConfigurableField(
id="hub_commit",
name="Hub Commit",
description="The Hub commit to pull from",
)
)

prompt.invoke({"question": "foo", "context": "bar"})

# 在运行时切换提示词
prompt.with_config(configurable={"hub_commit": "rlm/rag-prompt-llama"}).invoke({"question": "foo", "context": "bar"})

为链增加记忆能力(短时记忆InMemoryHistory)

  • 注意:简单的链的记忆天际可以使用v0.2的方式,复杂的官方推荐使用LangGraph
  • 短时记忆:InMemoryHistory
  • 长时记忆:RunnableWithMessageHistory

InMemoryHistory

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from typing import List
from pydantic import BaseModel, Field
from langchain_core.chat_history import BaseChatMessageHistory # 导入聊天历史记录基类
from langchain_core.messages import BaseMessage, AIMessage # 导入消息基类和具体消息类型

class InMemoryHistory(BaseChatMessageHistory, BaseModel):
"""
内存中实现聊天消息历史记录
"""
messages: List[BaseMessage] = Field(default_factory=list) # 使用空列表作为默认值存储消息

def add_messages(self, messages: List[BaseMessage]) -> None:
"""
添加一组消息到存储中
"""
self.messages.extend(messages)

def clear(self) -> None:
"""
清空存储中的所有消息
"""
self.messages = []

def get_messages(self) -> List[BaseMessage]:
"""
获取存储中的所有消息
"""
return self.messages

# 这里我使用全局变量来存储聊天消息历史
# 这样可以更容易地检查它来查看底层结果
store = {} # 创建空字典用于存储不同会话的历史记录

def get_by_session_id(session_id: str) -> BaseChatMessageHistory:
"""
根据会话ID获取聊天历史记录,如果不存在则创建新的
"""
if session_id not in store:
store[session_id] = InMemoryHistory() # 如果会话ID不存在,则创建一个新的历史记录
return store[session_id] # 返回对应的聊天历史记录

# 获取会话id为“1”的历史记录
history = get_by_session_id("1")
# 添加一条AI消息到历史记录中
history.add_messages([AIMessage(content="你好")])
# 打印存储的所有历史记录
print(store) # 将输出包含会话“1”的历史记录,其中有一条”你好“的AI消息

在链中增加短暂记忆

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder # 倒入聊天提示模板和消息占位符
from langchain_core.runnables.history import RunnableWithMessageHistory # 导入带历史记录的可运行组件

# 创建聊天提示模板,包含系统提示、历史记录和用户问题
prompt = ChatPromptTemplate.from_messages(
[
("system", "你是一个擅长{ability}的助手"), # 系统提示,使用ability变量定义助手专长
MessagesPlaceholder(variable_name="history"), # 放置历史消息的占位符
("human", "{question}"), # 用户问题的占位符
]
)

# 将提示模板与大模型连接成一个链
chain = prompt | llm

# 创建带有消息历史功能的可运行链
chain_with_history = RunnableWithMessageHistory(
chain, # 基础链
get_by_session_id, # 使用上一个示例中的get_by_session_id函数获取历史记录
input_messages_key="question", # 输入中代表用户问题的键
history_messages_key="history", # 历史记录在输入中的键
)

# 首次调用链,询问余弦的含义
res = chain_with_history.invoke(
{"question": "什么是余弦?", "ability": "解释科学概念"},
config={"configurable": {"session_id": "foo"}} # 配置会话ID为"foo"
)
print(res) # 打印助手的回答

print("---------------")
# 打印存储中的历史记录
# 此时应包含第一次对话的问题和回答
print(store)

# 第二次调用,询问余弦的反函数
# 由于使用相同的会话ID,模型可以参考前一次对话的上下文
res = chain_with_history.invoke(
{"question": "余弦的反函数是什么?", "ability": "解释科学概念"},
config={"configurable": {"session_id": "foo"}} # 使用相同的会话ID
)
print(res) # 打印助手的回答

print("---------------")
# 打印存储中的历史记录
# 此时应包含两次对话的问题和回答
print(store)

增加用户与对话ID,精准控制记忆

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

from langchain_core.runnables import ( ConfigurableFieldSpec )

# 创建空字典用于存储不同用户和对话的历史记录
store = {}

def get_session_history(user_id: str, session_id: str) -> BaseChatMessageHistory:
"""
根据用户ID和会话ID获取聊天历史记录
如果不存在则创建新的历史记录对象
参数:
user_id: 用户的唯一标识符
session_id: 对话的唯一标识符
返回:
对应的聊天历史记录对象
"""
if(user_id, session_id) not in store:
store[(user_id, session_id)] = InMemoryHistory() # 如果历史记录不存在,则创建一个新的
return store[(user_id, session_id)] # 返回对应的聊天历史记录

# 创建聊天提示模板,包含系统提示、历史记录和用户问题
prompt = ChatPromptTemplate.from_messages(
[
("system", "你是一个擅长{ability}的助手"), # 系统提示,使用ability变量定义助手专长
MessagesPlaceholder(variable_name="history"), # 放置历史消息的占位符
("human", "{question}"), # 用户问题的占位符
]
)

# 将提示模板与大模型连接成一个链
chain = prompt | llm

# 创建带有消息历史功能的可运行链,支持用户ID和对话ID配置
_with_message_history = RunnableWithMessageHistory(
chain, # 基础链
get_session_history=get_session_history, # 获取历史记录的函数
input_messages_key="question", # 输入中代表用户问题的键
history_messages_key="history", # 历史记录在输入中的键
history_factory_config=[
ConfigurableFieldSpec(
id="user_id", # 配置字段ID
annotation=str, # 字段类型注解
name="用户ID", # 字段名称
description="用户的唯一标识符", # 字段描述
default="", # 默认值
is_shared=True, # 是否在多个调用间共享
),
ConfigurableFieldSpec(
id="session_id",
annotation=str,
name="会话ID",
description="对话的唯一标识符",
default="",
is_shared=True,
),
],
)

# 调用链,询问余弦的含义
# 指定用户ID为“123”,对话ID为“1”
res = _with_message_history.invoke(
{"question": "什么是余弦?", "ability": "解释科学概念"},
config={"configurable": {"user_id": "123", "session_id": "1"}} # 配置用户ID和会话ID
)
print(res) # 打印助手的回答

print("---------------")
# 打印存储中的历史记录
print(store)

使用Redis构建长期记忆

  • 安装redis
  • 运行redis服务
  • 配置长期记忆

推荐安装redis-stack

安装依赖:pip install langchain-redis redis

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langchain_redis import RedisChatMessageHistory

REDIS_URL = "redis://localhost:6379"
print(f"Connecting to Redis at: {REDIS_URL}")

# 初始化 Redis 聊天消息历史记录
# 使用 Redis 存储聊天历史,需要提供会话 ID 和 Redis 连接 URL
history = RedisChatMessageHistory(session_id="user_123", redis_url=REDIS_URL)
history.clear() # 首先清空历史记录
# 向历史记录中添加消息
history.add_user_message("你好,AI助手!2222") # 添加用户消息
history.add_ai_message("你好!我今天能为你提供什么帮助?222") # 添加AI回复消息

# 检索并显示历史消息
print("聊天历史:")
for message in history.messages:
# 打印每条消息的类型和内容
print(f"{type(message).__name__}: {message.content}")

使用LCEL来自定义路由链

通过自定义路由链和回退机制实现智能请求分类与模型调用容错,提升应用稳定性与灵活性。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
from langchain_openai import ChatOpenAI

api_base = os.getenv("OPENAI_API_BASE")
api_key = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
temperature=0.0,
base_url=api_base,
api_key=api_key
)

# 导入必要的库
from langchain_core.output_parsers import StrOutputParser # 导入字符串输出解析器
from langchain_core.prompts import PromptTemplate # 导入提示模板
# 导入RunnableLambda用于创建可运行的函数链
from langchain_core.runnables import RunnableLambda

# 创建分类链 - 用于确定问题类型
chain = (
# 创建提示模板,要求模型将问题分类为LangChain、Anthropic或Other
PromptTemplate.from_template(
"""根据下面的用户问题,将其分类为 `LangChain`、`Anthropic` 或 `Other`。

请只回复一个词作为答案。

<question>
{question}
</question>

分类结果:"""
)
| llm # 将提示发送给大模型
| StrOutputParser() # 解析模型的输出为纯文本
)

# 创建LangChain专家链 - 模拟Harrison Chase(LangChain创始人)的回答风格
langchain_chain = PromptTemplate.from_template(
"""你将扮演一位LangChain专家。请以他的视角回答问题。 \
你的回答必须以"正如Harrison Chase告诉我的"开头,否则你会受到惩罚。 \
请回答以下问题:

问题: {question}
回答:"""
) | llm # 将提示发送给



# 创建Anthropic专家链 - 模拟Dario Amodei(Anthropic创始人)的回答风格
anthropic_chain = PromptTemplate.from_template(
"""你将扮演一位一位Anthropic专家。请以他的视角回答问题。 \
你的回答必须以"正如Dario Amodei告诉我的"开头,否则你会受到惩罚。 \
请回答以下问题:

问题: {question}
回答:"""
) | llm

# 创建通用回答链 - 用于处理其他类型的问题
general_chain = PromptTemplate.from_template(
"""请回答以下问题:

问题: {question}
回答:"""
) | llm

# 自定义路由函数 - 根据问题分类结果选择合适的回答链
def route(info):
print(info) # 打印分类结果
# 根据分类结果选择相应的专家链
if "anthropic" in info["topic"].lower(): # 如果问题与Anthropic相关
print("claude")
return anthropic_chain # 使用Anthropic专家链
elif "langchain" in info["topic"].lower(): # 如果问题与LangChain相关
print("langchain")
return langchain_chain # 使用LangChain专家链
else: # 其他类型的问题
print("general")
return general_chain # 使用通用回答链



# 创建完整的处理链
# 1. 首先将问题分类并保留原始问题
# 2. 然后根据分类结果路由到相应的专家链处理
full_chain = {"topic": chain, "question": lambda x: x["question"]} | RunnableLambda(route)

# 调用完整链处理用户问题
# 这个问题会被分类为Anthropic相关,然后由anthropic_chain处理
full_chain.invoke({"question": "我该如何使用langchain?"})

链的回退机制

调用大模型超过速率限制如何处理

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 导入必要的库
from unittest.mock import patch # 导入mock库,用于模拟函数行为
from langchain_anthropic import ChatAnthropic # 导入Anthropic的语言模型接口
from langchain_openai import ChatOpenAI # 导入OpenAI的语言模型接口
import httpx # HTTP客户端库
from openai import RateLimitError # OpenAI的速率限制错误类

# 创建模拟HTTP请求和响应对象,用于构造模拟的API错误
request = httpx.Request("GET", "/") # 创建一个GET请求
response = httpx.Response(200, request=request) # 创建一个状态码为200的响应
# 创建一个OpenAI速率限制错误对象,用于模拟API调用超出速率限制的情况
error = RateLimitError("rate limit", response=response, body="")

# 初始化OpenAI模型
# 注意:设置max_retries = 0是为了避免在遇到速率限制等错误时自动重试
openai_llm = ChatOpenAI(
model="gpt-4", # 使用GPT-4模型
temperature=0, # 设置温度为0,使输出更确定性
api_key=os.environ.get("OPENAI_API_KEY"), # 从环境变量获取API密钥
base_url=os.environ.get("OPENAI_API_BASE"), # 从环境变量获取基础URL
)

# 初始化Anthropic模型作为备用选项
anthropic_llm = ChatAnthropic(
model='claude-3-5-sonnet-latest', # 使用Claude 3.5 Sonnet模型
api_key=os.environ.get("ANTHROPIC_API_KEY"), # 从环境变量获取API密钥
base_url=os.environ.get("ANTHROPIC_BASE_URL"), # 从环境变量获取基础URL
)

# 创建带有备用选项的语言模型
# 如果主模型(OpenAI)失败,将自动尝试使用备用模型(Anthropic)
llm = openai_llm.with_fallbacks([anthropic_llm]) # 可以设置多个模型

# 如果不设置回退机制,当API调用超量就会直接报错
# with patch("openai.resources.chat.completions.Completions.create", side_effect=error):
# try:
# print(llm.invoke("Why did the chicken cross the road?"))
# except RateLimitError:
# print("Hit error")

# 测试备用机制 - 使用中文问题
# 使用patch模拟OpenAI API调用失败(抛出速率限制错误)
with patch("openai.resources.chat.completions.Completions.create", side_effect=error):
try:
# 尝试调用语言模型回答中文问题
# 由于OpenAI被模拟为失败,应该自动切换到Anthropic模型
print(llm.invoke("为什么程序员需要学会python?"))
except RateLimitError:
# 如果仍然遇到错误(备用机制失败),则打印错误信息
print("Hit error")