Runnable介绍 什么是Runnable Runnable 是 LangChain 0.3 中的核心接口,代表一个可执行的组件 。它可以被调用(同步或异步),也可以与其他 Runnable 对象组合构建更复杂的链。
Runnable接口是Langchain的核心组件通信标准,通过统一输入输出格式及调用方式,降低组件间耦合度,支持同步异步操作,简化工作流构建,使不同组件能无缝连接,并提供运行时配置功能以增强灵活性和可扩展性。
Runnable是LangChain的核心接口,提供统一的调用方式,使不同组件可以无缝连接
简化了复杂AI工作流的构建过程
支持同步和异步操作
可通过管道操作符 | 连接多个组件
注意:并不是所有的组件都支持所有的统一的调用方法,尤其是异步事件
Runnable对组件的输入输出要求:
Component
Input Type
Output Type
Prompt
dictionary
PromptValue
ChatModel
a string, list of chat messages or PromptValue
ChatMessage
LLM
a string, list of chat messages or PromptValue
String
OutputParser
the output of an LLM or ChatModel
Depends on the parser
Retriever
a string
List of Documents
Tool
a string or dictionary, depending on the tool
Depends on the tool
Runable对组件的输入进行了强制统一(注意:若执行有报错,很大的原因则源头报错)
第二个参数:RunableConfig是在运行时态的一种定义,比如回调等
Attribute
Description
run_name
Name used for the given Runnable(not iherited)
run_id
Unique identifier for this call. sub-calls will get their own unique run ids
tags
Tags for this call and sub-calls
metadata
Metadata for this call and any sub-calls
callbacks
Callbacks for this class and any sub-calls
max_concurrency
Maximum number of parallel calls to make(e.g. used by batch)
recursion_limit
Maximum number of times a call can recurse(e.g. used by Runnables that return Runnables)
configurable
Runtime values for configurable attributes of the Runnable
为什么使用 Runnable
特性
说明
标准化接口
所有组件都可以 .invoke()/ .stream()/ .batch()
可组合性强
可通过 \
进行链式调用
可观察性强
所有执行都可以加 callback,用于调试、追踪、评估
支持异步
支持同步与异步执行 .ainvoke(),.astream()
Runnable 的主要方法
方法名
类型
说明
.invoke(input)
同步执行
接收一个输入,返回一个输出
.ainvoke(input)
异步执行
异步方式执行
.stream(input)
流式响应
用于返回响应片段(如聊天消息 chunk)
.batch(inputs)
批处理
接收多个输入列表,返回多个输出
.bind(config)
配置绑定
绑定静态配置,如 stop, temperature等
哪些组件是 Runnable
组件类型
实现 Runnable 接口?
ChatOpenAI/ ChatAnthropic
是
PromptTemplate / ChatPromptTemplate
是
StrOutputParser / JsonOutputParser
是
RunnableLambda/ RunnableMap
是
自定义函数包装器 RunnableLambda(fn)
是
LCEL简介 LangChain LCEL的全称为LangChain Expression Language即可直译为LangChain表达式。
为了构造更复杂的LLM应用并且更为简便快捷的构造LLM应用,LangChain提供了类似”管道“的形式去声明提示词模版(prompt),即用”|“来连接各个组件之间的操作。也就是LCEL允许开发者将不同的模块进行简单的形式视线串联。语法如下所示:
chain = 提示词模板 | 大模型调用 | 输出解析器
LCEL的好处
优化的并行执行
稳定的异步支持
简化流式传输
无缝植入LangSmith追踪
标准API
可使用LangServe部署(FastAPI + LCEL)
LCEL封装核心内容 Runnables的API方法:https://python.langchain.com/api_reference/core/runnables.html
在使用LCEL表达式时,需要先了解其中包含的元素:
LCEL使用场景 使用场景
单个大模型调用:无需LCEL,直接调用模型即可(相当于简单使用deepseek)
简单链(llm + 提示词 + 解析器),使用LCEL最佳场景
构建复杂链(分支、循环、多个智能体等),使用LangGraph
LCEL可以嵌入在绝大部分LangChain生态应用里
旧版本中的预制链
V0.0版本中存在大量的预制链(LLMChain、CoversationChain…)
在0.3版本中依然支持但建议弃用(提示词不可见无法自定义等)
建议使用LCEL或LangGraph方式进行自定义重构
迁移示例:LLMChain StuffDocumentsChain
链的基本使用
使用管道操作符快速生成一条链
链的流式调用
并行运行多条链
链的调试
使用管道操作符快速生成一条链 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserfrom langchain_core.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话,不要有任何解释" ) chain = prompt | llm | StrOutputParser() res=chain.invoke({"topic" : "鲜花" }) print (res)
链的pipe方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserfrom langchain_core.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话,不要有任何解释" ) chain = (prompt.pipe(llm).pipe(StrOutputParser())) res=chain.invoke({"topic" : "狗" }) print (res)
链的流式调用
异步调用astream
JSON流输出
使用流事件
注意:不是所有的组件都支持流式输出,当不支持的组件呗封装在chain中后,最后的结果依旧可以使用流输出
异步调用astream
stream: 同步调用
astream: 异步调用
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserfrom langchain_core.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话,不要有任何解释" ) chain = prompt | llm | StrOutputParser() async for chunk in chain.astream("小狗" ): print (chunk, end="|" , flush=True )
JSON流输出 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import JsonOutputParserchain = ( llm | JsonOutputParser()) async for text in chain.astream( "以JSON格式输出法国、西班牙和日本的国家及其人口信息" "使用一个字典,包含外层键名为'countries'的国家列表" "每个国家应该有键名为'name'和'population'" ): print (text, flush=True )
事件流
这是一个测试事件
可以将流的过程进行分解,从而事件更细颗粒度的控制
langchain-core >= 0.2
事件流的颗粒度:
| event | name | chunk | input | output | | —————————— | ———————— | ———————————————- | —————————————————————— | ——————————————————————— | | on_chat_model_start | [model name] | | {“messages”:[[SystemMessage, HumanMessage]]} | | | on_chat_model_stream | [model name] | AIMessageChunk(content=”hello”) | | | | on_chat_model_end1 | [model name] | | {“messages”:[[SystemMessage, HumanMessage]]} | AIMessageChunk(content=”hello world”) | | on_llm_start | [model name] | | {“input”: “hello”} | | | on_llm_stream | [model name] | ‘Hello’ | | | | on_llm_end | [model name] | | ‘Hello human!’ | | | on_chain_start | format_docs | | | | | on_chain_stream | format_docs | “hello world!, goodbye world!” | | | | on_chain_end | format_docs | | [Document(…)] | “hello world!, goodbye” | | on_tool_start15 | some_tool16 | | {“x”: 1, “y”: “2”} | | | on_tool_end18 | some_tool19 | | | {“x”: 1, “y”: “2”} | | on_retriever_start | [retriever name] | | {“query”: “hello”} | | | on_retriever_end | [retriever name] | | {“query”: “hello”} | [Document(…), …] | | on_prompt_start | [template_name] | | {“question”: “hello”} | | | on_prompt_end | [template_name] | | {“question”: “hello”} | ChatPromptValue(messages=[SystemMessage, …]) |
注意对于版本langchain-core<0.3.37,需要显式地指定事件流版本
1 2 3 events = [] async for event in llm.astream_events("hello" ,version="v2" ): events.append(event)
事件过滤-按name 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import JsonOutputParserchain = llm.with_config({"run_name" : "model" }) | JsonOutputParser().with_config( {"run_name" : "my_parser" } ) max_events = 0 async for event in chain.astream_events( "output a list of the countries france, spain and japan and their populations in JSON format. " 'Use a dict with an outer key of "countries" which contains a list of countries. ' "Each country should have the key `name` and `population`" , include_names=["my_parser" ],version="v2" ): print (event) max_events += 1 if max_events > 10 : print ("..." ) break
事件过滤 - 按tag 按照tag来过滤事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import JsonOutputParserchain = (llm | JsonOutputParser()).with_config({"tags" : ["my_chain" ]}) max_events = 0 async for event in chain.astream_events( 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`' , include_tags=["my_chain" ],version="v2" ): print (event) max_events += 1 if max_events > 10 : print ("..." ) break
事件阶段过滤 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import JsonOutputParserchain = (llm | JsonOutputParser()).with_config({"tags" : ["my_chain" ]}) num_events = 0 async for event in chain.astream_events( "output a list of the countries france, spain and japan and their populations in JSON format. " 'Use a dict with an outer key of "countries" which contains a list of countries. ' "Each country should have the key `name` and `population`" ,version="v2" ): kind = event["event" ] if kind == "on_chat_model_stream" : print ( f"Chat model chunk: {repr (event['data' ]['chunk' ].content)} " , flush=True , ) if kind == "on_parser_stream" : print (f"Parser chunk: {event['data' ]['chunk' ]} " , flush=True ) num_events += 1 if num_events > 30 : print ("..." ) break
并行运行多条链 并行运行链会构造:
1 2 3 4 5 6 7 Input / \ / \ Branch1 Branch2 \ / \ / Combine
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.prompts import ChatPromptTemplatefrom langchain_core.runnables import RunnableParalleljoke_chain = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话" ) | llm poem_chain = ( ChatPromptTemplate.from_template("给我写一首关于{topic}的绝句" ) | llm ) map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain) map_chain.invoke({"topic" : "程序员" })
查看图,需要安装依赖pip install grandalf
1 map_chain.get_graph().print_ascii()
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 +--------------------------+ | Parallel<joke,poem>Input | +--------------------------+ *** *** *** *** ** ** +--------------------+ +--------------------+ | ChatPromptTemplate | | ChatPromptTemplate | +--------------------+ +--------------------+ * * * * * * +------------+ +------------+ | ChatOpenAI | | ChatOpenAI | +------------+* +------------+ *** *** *** *** ** ** +---------------------------+ | Parallel<joke,poem>Output | +---------------------------+
查看提示词
执行结果
1 2 [ChatPromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, template='给我讲一个关于{topic}的笑话'), additional_kwargs={})]), ChatPromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], input_types={}, partial_variables={}, template='给我写一首关于{topic}的绝句'), additional_kwargs={})])]
老版本的chain迁移到LCEL 示例1:从LLMChain迁移 废弃的用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain.chains import LLMChainfrom langchain.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_messages( [ ("user" , "Tell me a {adjective} joke" ) ] ) legacy_chain = LLMChain(llm=llm, prompt=prompt) legacy_chain.run({"adjective" : "funny" }) legacy_chain
LCEL的用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserfrom langchain.prompts import ChatPromptTemplateprompt = ChatPromptTemplate.from_messages( [ ("user" , "Tell me a {adjective} joke" ) ] ) chain = prompt | llm | StrOutputParser() chain.invoke({"adjective" : "funny" })
示例2: 从StuffDocumentsChain迁移 是一个用于文档内容总结的预制链
废弃的用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.documents import Documentdocuments = [ Document(page_content="Apples are red" , metadata={"title" : "apple_book" }), Document(page_content="Blueberries are blue" , metadata={"title" : "blueberry_book" }), Document(page_content="Bananas are yelow" , metadata={"title" : "banana_book" }), ] from langchain.chains import LLMChain, StuffDocumentsChainfrom langchain_core.prompts import ChatPromptTemplate, PromptTemplatedocument_prompt = PromptTemplate( input_variables=["page_content" ], template="{page_content}" ) document_variable_name = "context" prompt = ChatPromptTemplate.from_template("Summarize this content: {context}" ) llm_chain = LLMChain(llm=llm, prompt=prompt) chain = StuffDocumentsChain( llm_chain=llm_chain, document_prompt=document_prompt, document_variable_name=document_variable_name, ) result = chain.invoke(documents) result["output_text" ]
LCEL用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.documents import Documentfrom langchain.chains.combine_documents import create_stuff_documents_chainfrom langchain_core.prompts import ChatPromptTemplatedocuments = [ Document(page_content="Apples are red" , metadata={"title" : "apple_book" }), Document(page_content="Blueberries are blue" , metadata={"title" : "blueberry_book" }), Document(page_content="Bananas are yelow" , metadata={"title" : "banana_book" }), ] prompt = ChatPromptTemplate.from_template("Summarize this content: {context}" ) chain = create_stuff_documents_chain(llm, prompt) result = chain.invoke({"context" : documents}) result
链的高级应用 链(Chain)的高级使用技巧,包括如何将函数通过修饰符快速转换为链、在链中使用Lambda函数、自定义支持流式输出的函数、链中的传值方式(如config和记忆功能)、以及创建路由链和回退机制等。重点介绍了函数在链中的灵活应用,使开发者可以更方便地构建复杂的应用逻辑。
在链中使用函数 使用@chain快速将函数转为链 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import chainfrom langchain_core.prompts import ChatPromptTemplateprompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}" ) prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}" ) @chain def custom_chain (text ): prompt_val1 = prompt1.invoke({"topic" : text}) output1 = llm.invoke(prompt_val1) parsed_output1 = StrOutputParser().invoke(output1) chain2 = prompt2 | llm | StrOutputParser() return chain2.invoke({"joke" : parsed_output1}) custom_chain.invoke("bears" )
链中使用Lambda 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from operator import itemgetter from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableLambda def length_function (text ): return len (text) def _multiple_length_function (text1, text2 ): return len (text1) + len (text2) def multiple_length_function (_dict ): return _multiple_length_function(_dict ["text1" ], _dict ["text2" ]) prompt = ChatPromptTemplate.from_template("what is {a} + {b}" ) chain = ( { "a" : itemgetter("foo" ) | RunnableLambda(length_function), "b" : {"text1" : itemgetter("foo" ), "text2" : itemgetter("bar" )} | RunnableLambda(multiple_length_function), } | prompt | llm ) chain.invoke({"foo" : "bar" , "bar" : "gah" })
在链中自定义支持流输出的函数
当链被使用stream或astream调用的时候
如何在链中增加自定义函数
一个简单的链支持流调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_template("请列出5个与以下动物相似的动物名称,用逗号分隔: {animal}, 不要包含数字。" ) str_chain = prompt | llm | StrOutputParser() for chunk in str_chain.stream(prompt.invoke({"animal" : "熊" })): print (chunk, end="" , flush=True )
增加自定义函数
聚合当前流的输出
在生成下一个逗号的时候组合
注意:使用了yield
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParserprompt = ChatPromptTemplate.from_template("请列出5个与以下动物相似的动物名称,用逗号分隔: {animal}, 不要包含数字。" ) str_chain = prompt | llm | StrOutputParser() from typing import Iterator, List def split_into_list (input : Iterator[str ] ) -> Iterator[List [str ]]: buffer = "" for chunk in input : buffer += chunk while "," in buffer: comma_index = buffer.index("," ) yield [buffer[:comma_index].strip()] buffer = buffer[comma_index + 1 :] yield [buffer.strip()] list_chain = str_chain | split_into_list for chunk in list_chain.stream({"animal" : "熊" }): print (chunk, flush=True )
扩展:yeild与return区别
return 函数立即计算并返回所有结果,而yield函数按需计算结果
return 函数返回一个数据结构(如列表),yield函数返回一个生成器对象
yield函数可以处理潜在的无限序列,而return函数必须在有限时间内完成
生成器对象是一次性的,遍历完后就被消耗完毕,而return返回的数据结构可以重复使用
yield特别适合处理大数据集或流式数据,因为它不需要一次性将所有数据加载到内存中
return使用代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def get_squares_return (n ): """ 返回包含0到n-1的平方数的列表 """ result = [] for i in range (n): result.append(i * i) return result squares = get_squares_return(5 ) print ("使用return的结果:" , squares)print ("类型:" , type (squares))for square in squares: print (square)
yeild使用代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def get_squares_yield (n ): """ 返回包含0到n-1的平方数的列表 """ for i in range (n): yield i * i squares = get_squares_yield(5 ) print ("使用yield的结果:" , squares)print ("类型:" , type (squares))for square in squares: print (square) print ("再次遍历:" )for square in squares: print (square)
使用RunnablePassthrough来传递值 示例代码
1 2 3 4 5 6 7 8 9 10 11 from langchain_core.runnables import RunnableParallel, RunnablePassthroughrunnable = RunnableParallel( passed=RunnablePassthrough(), modified=lambda x: x["num" ] + 1 , ) runnable.invoke({"num" : 1 })
如何在运行时动态添加链的配置 动态改写模型温度 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from langchain.prompts import PromptTemplatefrom langchain_core.runnables import ConfigurableFieldfrom langchain_openai import ChatOpenAIimport osllm = ChatOpenAI( temperature=0.0 , base_url=os.getenv("OPENAI_API_BASE" ), api_key= os.getenv("OPENAI_API_KEY" ) ).configurable_fields( temperature=ConfigurableField( id ="llm_temperature" , name="LLM Temperature" , description="Temperature for the model" ) ) llm.invoke("随意挑选一个随机数,输出为一个整数" ) llm.with_config(configurable={"llm_temperature" : 0.8 }).invoke("随意挑选一个随机数,输出为一个整数" )
动态切换提示词 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from langchain.runnables.hub import HubRunnableprompt = HubRunnable("rlm/rag-prompt" ).configurable_fields( owner_repo_commit=ConfigurableField( id ="hub_commit" , name="Hub Commit" , description="The Hub commit to pull from" , ) ) prompt.invoke({"question" : "foo" , "context" : "bar" }) prompt.with_config(configurable={"hub_commit" : "rlm/rag-prompt-llama" }).invoke({"question" : "foo" , "context" : "bar" })
为链增加记忆能力(短时记忆InMemoryHistory)
注意:简单的链的记忆天际可以使用v0.2的方式,复杂的官方推荐使用LangGraph
短时记忆:InMemoryHistory
长时记忆:RunnableWithMessageHistory
InMemoryHistory 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from typing import List from pydantic import BaseModel, Fieldfrom langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import BaseMessage, AIMessage class InMemoryHistory (BaseChatMessageHistory, BaseModel): """ 内存中实现聊天消息历史记录 """ messages: List [BaseMessage] = Field(default_factory=list ) def add_messages (self, messages: List [BaseMessage] ) -> None : """ 添加一组消息到存储中 """ self .messages.extend(messages) def clear (self ) -> None : """ 清空存储中的所有消息 """ self .messages = [] def get_messages (self ) -> List [BaseMessage]: """ 获取存储中的所有消息 """ return self .messages store = {} def get_by_session_id (session_id: str ) -> BaseChatMessageHistory: """ 根据会话ID获取聊天历史记录,如果不存在则创建新的 """ if session_id not in store: store[session_id] = InMemoryHistory() return store[session_id] history = get_by_session_id("1" ) history.add_messages([AIMessage(content="你好" )]) print (store)
在链中增加短暂记忆 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables.history import RunnableWithMessageHistory prompt = ChatPromptTemplate.from_messages( [ ("system" , "你是一个擅长{ability}的助手" ), MessagesPlaceholder(variable_name="history" ), ("human" , "{question}" ), ] ) chain = prompt | llm chain_with_history = RunnableWithMessageHistory( chain, get_by_session_id, input_messages_key="question" , history_messages_key="history" , ) res = chain_with_history.invoke( {"question" : "什么是余弦?" , "ability" : "解释科学概念" }, config={"configurable" : {"session_id" : "foo" }} ) print (res) print ("---------------" )print (store)res = chain_with_history.invoke( {"question" : "余弦的反函数是什么?" , "ability" : "解释科学概念" }, config={"configurable" : {"session_id" : "foo" }} ) print (res) print ("---------------" )print (store)
增加用户与对话ID,精准控制记忆 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.runnables import ( ConfigurableFieldSpec )store = {} def get_session_history (user_id: str , session_id: str ) -> BaseChatMessageHistory: """ 根据用户ID和会话ID获取聊天历史记录 如果不存在则创建新的历史记录对象 参数: user_id: 用户的唯一标识符 session_id: 对话的唯一标识符 返回: 对应的聊天历史记录对象 """ if (user_id, session_id) not in store: store[(user_id, session_id)] = InMemoryHistory() return store[(user_id, session_id)] prompt = ChatPromptTemplate.from_messages( [ ("system" , "你是一个擅长{ability}的助手" ), MessagesPlaceholder(variable_name="history" ), ("human" , "{question}" ), ] ) chain = prompt | llm _with_message_history = RunnableWithMessageHistory( chain, get_session_history=get_session_history, input_messages_key="question" , history_messages_key="history" , history_factory_config=[ ConfigurableFieldSpec( id ="user_id" , annotation=str , name="用户ID" , description="用户的唯一标识符" , default="" , is_shared=True , ), ConfigurableFieldSpec( id ="session_id" , annotation=str , name="会话ID" , description="对话的唯一标识符" , default="" , is_shared=True , ), ], ) res = _with_message_history.invoke( {"question" : "什么是余弦?" , "ability" : "解释科学概念" }, config={"configurable" : {"user_id" : "123" , "session_id" : "1" }} ) print (res) print ("---------------" )print (store)
使用Redis构建长期记忆
推荐安装redis-stack
安装依赖:pip install langchain-redis redis
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from langchain_redis import RedisChatMessageHistoryREDIS_URL = "redis://localhost:6379" print (f"Connecting to Redis at: {REDIS_URL} " )history = RedisChatMessageHistory(session_id="user_123" , redis_url=REDIS_URL) history.clear() history.add_user_message("你好,AI助手!2222" ) history.add_ai_message("你好!我今天能为你提供什么帮助?222" ) print ("聊天历史:" )for message in history.messages: print (f"{type (message).__name__} : {message.content} " )
使用LCEL来自定义路由链 通过自定义路由链和回退机制实现智能请求分类与模型调用容错,提升应用稳定性与灵活性。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 import osfrom langchain_openai import ChatOpenAIapi_base = os.getenv("OPENAI_API_BASE" ) api_key = os.getenv("OPENAI_API_KEY" ) llm = ChatOpenAI( temperature=0.0 , base_url=api_base, api_key=api_key ) from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableLambdachain = ( PromptTemplate.from_template( """根据下面的用户问题,将其分类为 `LangChain`、`Anthropic` 或 `Other`。 请只回复一个词作为答案。 <question> {question} </question> 分类结果:""" ) | llm | StrOutputParser() ) langchain_chain = PromptTemplate.from_template( """你将扮演一位LangChain专家。请以他的视角回答问题。 \ 你的回答必须以"正如Harrison Chase告诉我的"开头,否则你会受到惩罚。 \ 请回答以下问题: 问题: {question} 回答:""" ) | llm anthropic_chain = PromptTemplate.from_template( """你将扮演一位一位Anthropic专家。请以他的视角回答问题。 \ 你的回答必须以"正如Dario Amodei告诉我的"开头,否则你会受到惩罚。 \ 请回答以下问题: 问题: {question} 回答:""" ) | llm general_chain = PromptTemplate.from_template( """请回答以下问题: 问题: {question} 回答:""" ) | llm def route (info ): print (info) if "anthropic" in info["topic" ].lower(): print ("claude" ) return anthropic_chain elif "langchain" in info["topic" ].lower(): print ("langchain" ) return langchain_chain else : print ("general" ) return general_chain full_chain = {"topic" : chain, "question" : lambda x: x["question" ]} | RunnableLambda(route) full_chain.invoke({"question" : "我该如何使用langchain?" })
链的回退机制 调用大模型超过速率限制如何处理
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from unittest.mock import patch from langchain_anthropic import ChatAnthropic from langchain_openai import ChatOpenAI import httpx from openai import RateLimitError request = httpx.Request("GET" , "/" ) response = httpx.Response(200 , request=request) error = RateLimitError("rate limit" , response=response, body="" ) openai_llm = ChatOpenAI( model="gpt-4" , temperature=0 , api_key=os.environ.get("OPENAI_API_KEY" ), base_url=os.environ.get("OPENAI_API_BASE" ), ) anthropic_llm = ChatAnthropic( model='claude-3-5-sonnet-latest' , api_key=os.environ.get("ANTHROPIC_API_KEY" ), base_url=os.environ.get("ANTHROPIC_BASE_URL" ), ) llm = openai_llm.with_fallbacks([anthropic_llm]) with patch("openai.resources.chat.completions.Completions.create" , side_effect=error): try : print (llm.invoke("为什么程序员需要学会python?" )) except RateLimitError: print ("Hit error" )