什么是Middleware(中间件) Langchain的v.0.3.x版本,推荐使用langgraph.prebuilt.create_react_agent来构建智能代理中会使用一些hook钩子函数(例如:prehook)进行大模型执行前后或者其中位置进行能力增强,在此基础上,Langchain的v1.x.x版本后,增加了中间件的机制,为 Agent 提供 可插拔的控制层 ,允许你在 每一次模型调用、每一次工具执行之前/之后 ,插入自定义逻辑。
官方文档介绍:https://docs.langchain.com/oss/javascript/langchain/middleware/overview
有主要以下用途:
应用场景
具体说明
追踪与监控
记录日志、进行行为分析和调试,便于观察(跟踪)代理的决策过程。
内容与流程转换
动态修改提示词、调整工具选择逻辑,或重新格式化最终输出。
增强鲁棒性
为核心流程添加重试机制、失败回退方案、异常兜底、提前终止的逻辑。
安全与合规
实施速率限制、设置安全护栏、检测个人敏感信息(PII)等。
中间件的执行位于Agent核心循环的关键节点,执行流程图如下:
graph TD
%% 节点定义(椭圆((xxx))、矩形[xxx],单独行无注释)
A((request)) --> B[model]
B -- action --> C[tools]
B --> D((result))
C -- observation --> B
%% linkStyle单独行,仅写样式+分号(无任何注释/额外内容)
linkStyle 1 stroke-dasharray:5,5;
linkStyle 2 stroke-dasharray:5,5;
中间件可在该循环的每个阶段插入钩子(hook)
graph TD
%% 节点定义
request([request])
before_agent[before_agent]
before_model[before_model]
%% 定义右侧分支
subgraph wrap_model_call [wrap_model_call]
model[model]
end
%% 定义左侧分支
subgraph wrap_tool_call [wrap_tool_call]
tools[tools]
end
after_model[after_model]
after_agent[after_agent]
result([result])
%% 流程连接
request --> before_agent
before_agent --> before_model
%% 核心循环与并列结构
before_model --> wrap_model_call
wrap_model_call --> after_model
%% 虚线分支
after_model -.-> wrap_tool_call
wrap_tool_call --> before_model
after_model -.-> after_agent
after_agent --> result
%% 样式美化:匹配原图紫色调
classDef purpleNode fill:#ede9fe,stroke:#a78bfa,stroke-width:1px
classDef container fill:#fff,stroke:#a78bfa,stroke-width:1px
classDef capsule fill:#f5f3ff,stroke:#a78bfa,stroke-width:1px
class request,result capsule
class before_agent,before_model,after_model,after_agent,tools,model purpleNode
class wrap_tool_call,wrap_model_call container
中间件的分类 节点式钩子(Node-style Hooks) 在固定事件点执行:
钩子
触发时机
before_agent
每次 agent 调用开始之前
before_model
每次模型调用之前
after_model
每次模型调用之后
after_agent
整个 agent 调用结束之后
主要用于 顺序逻辑、检查、修改状态 。
包装式钩子(Wrap-style Hooks) 直接包裹模型或工具调用:
钩子
用途
wrap_model_call
拦截模型调用,用于重试/回退/缓存/动态改 prompt
wrap_tool_call
拦截工具调用,用于监控/Mock/工具重试等
主要用于 控制流程、重试、模拟、替换、缓存等 。
如何使用中间件? 使用时只需在 create_agent() 里传入,官方示例如下:
1 2 3 4 5 6 7 8 9 10 11 from langchain.agents import create_agentfrom langchain.agents.middleware import SummarizationMiddleware, HumanInTheLoopMiddlewareagent = create_agent( model="gpt-4o" , tools=[...], middleware=[ SummarizationMiddleware(...), HumanInTheLoopMiddleware(...) ], )
内置中间件 基于LangChain的代理应用时,会遇到各种各样的需求和场景,例如对话管理、工具调用控制、数据安全等。为了帮助开发者更高效地应对这些常见问题,LangChain预先设计并实现了一系列中间件,这些中间件就像是一个个功能模块,可以直接被集成到代理应用中,从而节省了开发者从头开始编写代码的时间和精力。
官方文档:https://docs.langchain.com/oss/python/langchain/middleware/built-in
以下列出些常用的内置中间件,具体的内置中间件可到官方文档中查看说明及使用方式。
Summarization(摘要) :接近令牌(token)限制时,自动对对话历史进行总结。这有助于在长对话中保持上下文的连贯性,同时避免因令牌数量过多而导致的性能问题。
Human-in-the-loop(人工介入) :在工具调用之前暂停执行,等待人工批准。这对于需要人工监督或在高风险操作中确保安全和合规性非常有用。
Model call limit(模型调用限制) :限制模型调用的次数,以防止成本过高。这对于控制生产环境中的资源使用和成本非常有效
Tool call limit(工具调用限制) :通过限制调用次数来控制工具的执行。这有助于避免工具被过度使用,特别是在调用外部API或执行资源密集型任务时。
Tool retry(工具重试) :当工具调用失败时,自动以指数退避策略进行重试。这有助于处理临时性故障,提高工具调用的可靠性。
自定义中间件
实现方式
适用场景
特点
优势
典型使用场景
基于装饰器
- 只需要一个钩子 - 无需复杂配置
使用 @before_model/ @after_model / @wrap_model_call等装饰器
简单、快速、无类结构
- 打印日志 - 单次验证 - 简单重写提示词 - 单次响应校验
基于类的中间件
- 需要多个钩子 - 需要复杂配置 - 需要跨项目复用
继承 AgentMiddleware实现多个 Hook
功能强大、可配置、可复用
- 日志系统(前后钩子) - 重试 + 校验多组合 - 动态模型选择 - 状态统计 - 企业级中间件库
基于装饰器的中间件 可用装饰器 节点式 (在特定执行点运行)
@before_agent - 代理启动前(每次调用一次)
@before_model- 每次模型调用前
@after_model- 每次模型响应后
@after_agent - 代理完成时(每次调用一次)
包装式 (拦截和控制执行)
@wrap_model_call- 每次模型调用前后
@wrap_tool_call - 每次工具调用前后
便利装饰器 :
@dynamic_prompt - 生成动态系统提示(相当于修改@wrap_model_call中的系统提示词)
基于类的中间件 节点式钩子 在执行流中的特定点运行
before_agent - 代理启动前(每次调用一次)
before_model - 每次模型调用前
after_model - 每次模型响应后
after_agent - 代理完成时(每次调用最多一次)
包装式钩子 拦截执行并控制何时调用处理程序
wrap_model_call - 每次模型调用前后
wrap_tool_call - 每次工具调用前后
您可以决定处理程序是调用零次(短路)、一次(正常流程)还是多次(重试逻辑)。
示例代码 自定义中间件基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 """ 自定义中间件 在执行流中的特定点运行 - **before_agent** - 代理启动前(每次调用一次) - **before_model** - 每次模型调用前 - **after_model** - 每次模型响应后 - **after_agent** - 代理完成时(每次调用最多一次) """ from typing import Any from langchain.agents import AgentState, create_agentfrom langchain.agents.middleware import before_model, ModelRequest, ModelResponse, wrap_model_call, after_modelfrom langgraph.runtime import Runtimefrom src.core.llms import model_clientdef print_stream_result (response ): """流式结果的输出""" for item in response: for key, value in item.items(): if key == "model" : print ("-----执行步骤:调用大模型" ) if value['messages' ][0 ].content: print (f"-----大模型分析的结果:{value["messages" ][0 ].content} " ) elif value['messages' ][0 ].tool_calls: print ("-----大模型分析的结果为调用以下工具:" ) for tool_ in value['messages' ][0 ].tool_calls: print (f"工具名称:{tool_['name' ]} ,调用工具的入参:{tool_['args' ]} " ) elif key == "tools" : print (f"智能体执行工具:{value['messages' ][0 ].name} " ) print (f"工具执行结果:{value['messages' ][0 ].content} " ) @before_model def log_before_model (state: AgentState, runtime: Runtime ) -> dict [str , Any ] | None : """调用大模型之前的中间件:进行日志输出""" print (f"调用大模型之前{len (state['messages' ])} messages" ) return None @wrap_model_call def model_call (request: ModelRequest, handler ) -> ModelResponse: """模型调用时会执行的中间件""" print ("开始调用模型" ) return handler(request) @after_model def log_after_model (state: AgentState, runtime: Runtime ) -> dict [str , Any ] | None : """调用大模型之后的中间件:进行日志输出""" print (f"调用大模型之后{len (state['messages' ])} messages" ) return None agent = create_agent( model_client, tools=[], middleware=[log_before_model, model_call, log_after_model] ) res = agent.stream({"messages" : [{"role" : "user" , "content" : "你好" }]}) print_stream_result(res)
执行结果
1 2 3 4 5 6 7 8 9 10 调用大模型之前1 messages 开始调用模型 -----执行步骤:调用大模型 -----大模型分析的结果:你好!很高兴见到你!😊 我是DeepSeek,由深度求索公司创造的AI助手。无论你有什么问题、需要什么帮助,或者只是想聊聊天,我都很乐意为你提供支持! 我可以帮你解答各种问题,处理文本内容,进行创作和分析等等。虽然我不支持多模态识别,但我可以处理你上传的图像、txt、pdf、ppt、word、excel等文件,从中读取文字信息来帮助你。 有什么我可以为你做的吗?不管是学习、工作还是生活中的问题,我都很愿意帮助你!✨ 调用大模型之后2 messages
自定义中间件类的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 """ 自定义中间件,定义中间件类 在执行流中的特定点运行 - **before_agent** - 代理启动前(每次调用一次) - **before_model** - 每次模型调用前 - **after_model** - 每次模型响应后 - **after_agent** - 代理完成时(每次调用最多一次) """ from typing import Any , Callable from langchain.agents import create_agent, AgentStatefrom langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponsefrom langchain.agents.middleware.types import StateT, ModelCallResultfrom langchain.tools.tool_node import ToolCallRequest, ToolRuntimefrom langchain_core.messages import ToolMessagefrom langchain_core.tools import toolfrom langgraph.runtime import Runtimefrom langgraph.types import Commandfrom langgraph.typing import ContextTfrom src.core.llms import model_clientdef print_stream_result (response ): """流式结果的输出""" for item in response: for key, value in item.items(): if key == "model" : print ("-----执行步骤:调用大模型" ) if value['messages' ][0 ].content: print (f"-----大模型分析的结果:{value["messages" ][0 ].content} " ) elif value['messages' ][0 ].tool_calls: print ("-----大模型分析的结果为调用以下工具:" ) for tool_ in value['messages' ][0 ].tool_calls: print (f"工具名称:{tool_['name' ]} ,调用工具的入参:{tool_['args' ]} " ) elif key == "tools" : print (f"智能体执行工具:{value['messages' ][0 ].name} " ) print (f"工具执行结果:{value['messages' ][0 ].content} " ) class MyMiddleware (AgentMiddleware ): """自定义的agent中间件类,注意继承类需要重写对应方法""" def before_model (self, state: StateT, runtime: Runtime[ContextT] ) -> dict [str , Any ] | None : """调用大模型之前的中间件""" print ("调用大模型之前的中间件" ) print (f"当前state:{state} " ) print (f"传递给大模型的消息列表:{state['messages' ]} " ) def after_model (self, state: StateT, runtime: Runtime[ContextT] ) -> dict [str , Any ] | None : """调用大模型之后的中间件""" print ("调用大模型之后的中间件" ) print (f"当前state:{state} " ) print (f"传递给大模型的消息列表:{state['messages' ]} " ) def before_agent (self, state: StateT, runtime: Runtime[ContextT] ) -> dict [str , Any ] | None : """agent开始之前""" print ("agent开始之前" ) print (f"当前state:{state} " ) print (f"传递给大模型的消息列表:{state['messages' ]} " ) def after_agent (self, state: StateT, runtime: Runtime[ContextT] ) -> dict [str , Any ] | None : """agent结束之后""" print ("agent结束之后" ) print (f"当前state:{state} " ) print (f"传递给大模型的消息列表:{state['messages' ]} " ) def wrap_model_call ( self, request: ModelRequest, handler: Callable [[ModelRequest], ModelResponse], ) -> ModelCallResult: """大模型调用时执行""" print ("大模型调用时执行" ) res = "" try : res = handler(request) print ("大模型调用成功" ) except Exception as e: print (f"大模型调用错误:{str (e)} " ) else : print ("大模型调用失败" ) return res def wrap_tool_call ( self, request: ToolCallRequest, handler: Callable [[ToolCallRequest], ToolMessage | Command], ) -> ToolMessage | Command: """工具调用时执行""" print ("工具调用时执行" ) return handler(request) class UserInfo (AgentState ): user_id: str nickname: str @tool("get_user_info" , description="用户获取当前用户信息的工具" ) def get_user_info (runtime: ToolRuntime ): """获取用户信息""" return {"user_id" : runtime.state.get("user_id" ), "nickname" : runtime.state.get("nickname" ), "age" : 18 } agent = create_agent( model_client, tools=[get_user_info], state_schema=UserInfo, middleware=[MyMiddleware()] ) res = agent.stream({"messages" : [{"role" : "user" , "content" : "获取用户的信息" }], "user_id" : "9527" , "nickname" : "花花" }) print_stream_result(res)
自定义中间件工具执行失败重试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 """ 自定义中间件 在执行流中的特定点运行 - **before_agent** - 代理启动前(每次调用一次) - **before_model** - 每次模型调用前 - **after_model** - 每次模型响应后 - **after_agent** - 代理完成时(每次调用最多一次) """ from typing import Callable from langchain.agents import create_agent, AgentStatefrom langchain.agents.middleware import AgentMiddlewarefrom langchain.tools.tool_node import ToolCallRequest, ToolRuntimefrom langchain_core.messages import ToolMessagefrom langchain_core.tools import toolfrom langgraph.types import Commandfrom src.core.llms import model_clientdef print_stream_result (response ): """流式结果的输出""" for item in response: for key, value in item.items(): if key == "model" : print ("-----执行步骤:调用大模型" ) if value['messages' ][0 ].content: print (f"-----大模型分析的结果:{value["messages" ][0 ].content} " ) elif value['messages' ][0 ].tool_calls: print ("-----大模型分析的结果为调用以下工具:" ) for tool_ in value['messages' ][0 ].tool_calls: print (f"工具名称:{tool_['name' ]} ,调用工具的入参:{tool_['args' ]} " ) elif key == "tools" : print (f"智能体执行工具:{value['messages' ][0 ].name} " ) print (f"工具执行结果:{value['messages' ][0 ].content} " ) class ToolsCallMiddleware (AgentMiddleware ): """自定义的agent中间件类,注意继承类需要重写对应方法""" def __init__ (self, count: int = 3 ): self .count = count def wrap_tool_call ( self, request: ToolCallRequest, handler: Callable [[ToolCallRequest], ToolMessage | Command], ) -> ToolMessage | Command: """工具调用时执行""" print (f"开始调用工具{request} " ) for i in range (self .count): try : return handler(request) except Exception as e: print (f"调用工具出错了,错误信息是{str (e)} " ) if i == len (self .count - 1 ): raise e print (f"工具调用开始第{i + 1 } 轮调试!" ) class UserInfo (AgentState ): user_id: str nickname: str @tool("get_user_info" , description="用户获取当前用户信息的工具" ) def get_user_info (runtime: ToolRuntime ): """获取用户信息""" return {"user_id" : runtime.state.get("user_id" ), "nickname" : runtime.state.get("nickname" ), "age" : 18 } agent = create_agent( model_client, tools=[get_user_info], state_schema=UserInfo, middleware=[ToolsCallMiddleware()] ) res = agent.stream({"messages" : [{"role" : "user" , "content" : "获取用户的信息" }], "user_id" : "9527" , "nickname" : "花花" }) print_stream_result(res)
中间件执行顺序 前置钩子按顺序运行
middleware1.before_agent()
middleware2.before_agent()
middleware3.before_agent()
代理循环开始
middleware1.before_model()
middleware2.before_model()
middleware3.before_model()
包装钩子像函数调用一样嵌套
middleware1.wrap_model_call()
middleware2.wrap_model_call()
middleware3.wrap_model_call()
调用模型
后置钩子按相反顺序运行
middleware3.after_model()
middleware2.after_model()
middleware1.after_model()
代理循环结束
middleware3.after_agent()
middleware2.after_agent()
middleware1.after_agent()
关键规则
before_* 钩子:从头到尾
after_* 钩子:从尾到头(反向)
wrap_* 钩子:嵌套(第一个中间件包装所有其他中间件)
Agent跳转 要从中间件中提前退出,返回一个包含 jump_to 的字典
中间件(middleware)时,如何通过返回一个包含jump_to字段的字典来改变代理(agent)的执行流程。具体来说,它列出了可用的跳转目标(jump targets),以及每个目标对应的含义。以下是对这段内容的详细解释:
可用跳转目标
‘end’ : 跳转到代理执行的末尾(或者第一个after_agent钩子)。当在中间件中返回{“jump_to”: “end”}时,代理会跳过当前执行流程中剩余的步骤,直接执行after_agent钩子(如果有的话),然后结束执行。这通常用于在满足某些条件时提前终止代理的执行,例如达到对话限制或检测到不适当的内容。
‘tools’ : 跳转到工具节点(tools node)。当返回{“jump_to”: “tools”}时,代理会跳过当前的模型调用或其他中间件逻辑,直接进入工具调用阶段。这在需要直接调用工具而跳过模型推理时很有用,例如在某些情况下,根据上下文直接选择一个工具来执行。
‘model’ : 跳转到模型节点(model node)或者第一个before_model钩子。返回{“jump_to”: “model”}会使得代理跳过当前的中间件逻辑,直接进入模型调用阶段。这通常用于在中间件中进行了一些预处理后,直接将控制权交给模型,或者在某些情况下需要重新触发模型调用。
使用场景 这些跳转目标为开发者提供了灵活的控制手段,可以在中间件中根据不同的逻辑和条件动态地改变代理的执行流程。例如:
提前终止执行 :当检测到对话达到预设的限制或者出现不适当的内容时,使用jump_to: “end”来提前结束代理的执行,避免进一步的处理。
直接调用工具 :在某些情况下,根据上下文直接选择并调用工具,而跳过模型推理阶段,可以提高效率并简化执行流程。
重新触发模型调用 :在中间件中对输入数据进行了一些修改或调整后,使用jump_to: “model”来重新触发模型调用,确保模型能够基于最新的数据进行推理。
重要提示:
从 before_model 或 after_model 跳转时,跳转到 “model” 将导致所有before_model 中间件再次运行。
要启用跳转,请使用 @hook_config(can_jump_to=[…]) 装饰您的钩子
官方示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from langchain.agents.middleware import after_model, hook_config, AgentStatefrom langchain.messages import AIMessagefrom langgraph.runtime import Runtimefrom typing import Any @after_model @hook_config(can_jump_to=["end" ] ) def check_for_blocked (state: AgentState, runtime: Runtime ) -> dict [str , Any ] | None : last_message = state["messages" ][-1 ] if "BLOCKED" in last_message.content: return { "messages" : [AIMessage("I cannot respond to that request." )], "jump_to" : "end" } return None
练习代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 from typing import Any from langchain.agents import create_agent, AgentStatefrom langchain.agents.middleware import AgentMiddleware, hook_configfrom langchain.tools.tool_node import ToolRuntime, StateT, ContextTfrom langchain_core.messages import AIMessagefrom langchain_core.tools import toolfrom langgraph.runtime import Runtimefrom src.core.llms import model_clientdef print_stream_result (response ): """流式结果的输出""" for item in response: for key, value in item.items(): if key == "model" : print ("-----执行步骤:调用大模型" ) if value['messages' ][0 ].content: print (f"-----大模型分析的结果:{value["messages" ][0 ].content} " ) elif value['messages' ][0 ].tool_calls: print ("-----大模型分析的结果为调用以下工具:" ) for tool_ in value['messages' ][0 ].tool_calls: print (f"工具名称:{tool_['name' ]} ,调用工具的入参:{tool_['args' ]} " ) elif key == "tools" : print (f"智能体执行工具:{value['messages' ][0 ].name} " ) print (f"工具执行结果:{value['messages' ][0 ].content} " ) class ToolsCallMiddleware (AgentMiddleware ): """自定义的agent中间件类,注意继承类需要重写对应方法""" @hook_config(can_jump_to=["end" , "tools" ] ) def before_agent (self, state: StateT, runtime: Runtime[ContextT] ) -> dict [str , Any ] | None : """工具调用时执行""" print ("调用大模型之前的中间件" ) print (f"当前state:{state} " ) print (f"传递给大模型的消息列表:{state['messages' ]} " ) return { "jump_to" : "tools" , "messages" : [AIMessage( content="" , tool_calls=[{"name" : "get_user_info" , "args" : {}, "id" : "123" }] )] } class UserInfo (AgentState ): user_id: str nickname: str @tool("get_user_info" , description="用户获取当前用户信息的工具" ) def get_user_info (runtime: ToolRuntime ): """获取用户信息""" return {"user_id" : runtime.state.get("user_id" ), "nickname" : runtime.state.get("nickname" ), "age" : 18 } agent = create_agent( model_client, tools=[get_user_info], state_schema=UserInfo, middleware=[ToolsCallMiddleware()] ) res = agent.stream({"messages" : [{"role" : "user" , "content" : "获取用户的信息" }], "user_id" : "9527" , "nickname" : "花花" }) print_stream_result(res)
在中间件中修改 State(短期记忆) 中间件可以通过自定义属性扩展 Agent 的 State。定义一个自定义状态类型并将其设置为 state_schema
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from langchain.agents.middleware import AgentState, AgentMiddlewarefrom typing_extensions import NotRequiredfrom typing import Any class CustomState (AgentState ): model_call_count: NotRequired[int ] user_id: NotRequired[str ] class CallCounterMiddleware (AgentMiddleware[CustomState]): state_schema = CustomState def before_model (self, state: CustomState, runtime ) -> dict [str , Any ] | None : count = state.get("model_call_count" , 0 ) if count > 3 : return {"jump_to" : "end" } return None def after_model (self, state: CustomState, runtime ) -> dict [str , Any ] | None : return {"model_call_count" : state.get("model_call_count" , 0 ) + 1 }
中间件使用最佳实践
保持中间件的专注性——每个中间件只做好一件事
优雅地处理错误——不要让中间件错误导致代理崩溃
使用适当的钩子类型 :
节点式用于顺序逻辑(日志记录、验证)
包装式用于控制流(重试、回退、缓存)
清晰地记录所有自定义状态属性(State)
在集成之前独立进行中间件单元测试
考虑执行顺序——将关键中间件放在列表前面
尽可能使用内置中间件,不要重复造轮子