Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangChain核心组件Middleware

什么是Middleware(中间件)

Langchain的v.0.3.x版本,推荐使用langgraph.prebuilt.create_react_agent来构建智能代理中会使用一些hook钩子函数(例如:prehook)进行大模型执行前后或者其中位置进行能力增强,在此基础上,Langchain的v1.x.x版本后,增加了中间件的机制,为 Agent 提供 可插拔的控制层,允许你在 每一次模型调用、每一次工具执行之前/之后,插入自定义逻辑。

官方文档介绍:https://docs.langchain.com/oss/javascript/langchain/middleware/overview

有主要以下用途:

应用场景 具体说明
追踪与监控 记录日志、进行行为分析和调试,便于观察(跟踪)代理的决策过程。
内容与流程转换 动态修改提示词、调整工具选择逻辑,或重新格式化最终输出。
增强鲁棒性 为核心流程添加重试机制、失败回退方案、异常兜底、提前终止的逻辑。
安全与合规 实施速率限制、设置安全护栏、检测个人敏感信息(PII)等。

中间件的执行位于Agent核心循环的关键节点,执行流程图如下:

graph TD
    %% 节点定义(椭圆((xxx))、矩形[xxx],单独行无注释)
    A((request)) --> B[model]
    B -- action --> C[tools]
    B --> D((result))
    C -- observation --> B

    %% linkStyle单独行,仅写样式+分号(无任何注释/额外内容)
    linkStyle 1 stroke-dasharray:5,5;
    linkStyle 2 stroke-dasharray:5,5;

中间件可在该循环的每个阶段插入钩子(hook)

graph TD
    %% 节点定义
    request([request])
    before_agent[before_agent]
    before_model[before_model]

    %% 定义右侧分支
    subgraph wrap_model_call [wrap_model_call]
        model[model]
    end

    %% 定义左侧分支
    subgraph wrap_tool_call [wrap_tool_call]
        tools[tools]
    end

    after_model[after_model]
    after_agent[after_agent]
    result([result])

    %% 流程连接
    request --> before_agent
    before_agent --> before_model

    %% 核心循环与并列结构
    before_model --> wrap_model_call
    wrap_model_call --> after_model

    %% 虚线分支
    after_model -.-> wrap_tool_call
    wrap_tool_call --> before_model

    after_model -.-> after_agent
    after_agent --> result

    %% 样式美化:匹配原图紫色调
    classDef purpleNode fill:#ede9fe,stroke:#a78bfa,stroke-width:1px
    classDef container fill:#fff,stroke:#a78bfa,stroke-width:1px
    classDef capsule fill:#f5f3ff,stroke:#a78bfa,stroke-width:1px

    class request,result capsule
    class before_agent,before_model,after_model,after_agent,tools,model purpleNode
    class wrap_tool_call,wrap_model_call container
钩子点 执行时机 典型用途
before_agent 智能体开始前(每次调用一次) 初始化日志、验证用户输入、加载上下文
before_model 每次模型调用前 对输入进行预处理、添加系统提示、检查敏感词
after_model 每次模型响应后 解析模型输出、记录 Token 消耗、进行格式转换
after_agent 智能体完成后(每次调用一次) 生成最终报告、清理会话、发送通知

这些钩子在实际开发中非常有用,常见的应用包括:

  • 日志记录和监控:在 before_modelafter_model 中记录输入输出、耗时和 Token 使用情况,用于调试和分析。
  • 输入验证和清理:在 before_agent 中检查用户输入是否合规,过滤掉恶意内容或敏感信息。
  • 响应格式转换:在 after_model 中将模型的自然语言输出转换为结构化数据(如 JSON),方便后续处理。
  • 错误处理和重试:在 after_model 中检测模型输出是否有效,如果无效则触发重试或回退策略。
  • 安全检查(PII 检测):在 before_agentafter_model 中扫描并移除个人可识别信息(PII),确保数据安全。

中间件的分类

节点式钩子(Node-style Hooks)

在固定事件点执行:

钩子 触发时机
before_agent 每次 agent 调用开始之前
before_model 每次模型调用之前
after_model 每次模型调用之后
after_agent 整个 agent 调用结束之后

主要用于 顺序逻辑、检查、修改状态

包装式钩子(Wrap-style Hooks)

直接包裹模型或工具调用:

钩子 用途
wrap_model_call 拦截模型调用,用于重试/回退/缓存/动态改 prompt
wrap_tool_call 拦截工具调用,用于监控/Mock/工具重试等

主要用于 控制流程、重试、模拟、替换、缓存等

如何使用中间件?

使用时只需在 create_agent() 里传入,官方示例如下:

1
2
3
4
5
6
7
8
9
10
11
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware, HumanInTheLoopMiddleware

agent = create_agent(
model="gpt-4o",
tools=[...],
middleware=[
SummarizationMiddleware(...),
HumanInTheLoopMiddleware(...)
],
)

内置中间件

基于LangChain的代理应用时,会遇到各种各样的需求和场景,例如对话管理、工具调用控制、数据安全等。为了帮助开发者更高效地应对这些常见问题,LangChain预先设计并实现了一系列中间件,这些中间件就像是一个个功能模块,可以直接被集成到代理应用中,从而节省了开发者从头开始编写代码的时间和精力。

官方文档:https://docs.langchain.com/oss/python/langchain/middleware/built-in

以下列出些常用的内置中间件,具体的内置中间件可到官方文档中查看说明及使用方式。

  • Summarization(摘要):接近令牌(token)限制时,自动对对话历史进行总结。这有助于在长对话中保持上下文的连贯性,同时避免因令牌数量过多而导致的性能问题。
  • Human-in-the-loop(人工介入):在工具调用之前暂停执行,等待人工批准。这对于需要人工监督或在高风险操作中确保安全和合规性非常有用。
  • Model call limit(模型调用限制):限制模型调用的次数,以防止成本过高。这对于控制生产环境中的资源使用和成本非常有效
  • Tool call limit(工具调用限制):通过限制调用次数来控制工具的执行。这有助于避免工具被过度使用,特别是在调用外部API或执行资源密集型任务时。
  • Tool retry(工具重试):当工具调用失败时,自动以指数退避策略进行重试。这有助于处理临时性故障,提高工具调用的可靠性。

自定义中间件

自定义中间件是一种在 LangChain 代理执行流程的特定节点插入自定义逻辑的机制,目的是扩展代理的能力。

根据 LangChain 官方文档,它提供了两种拦截代理执行的钩子风格:

  • Node-style 钩子:以节点形式介入流程
  • Wrap-style 钩子:以包装函数形式包裹执行逻辑

中间件的核心功能

中间件主要用于在代理执行的关键点进行干预,核心功能包括:

功能 说明
执行流程控制 在代理执行的关键节点插入自定义逻辑,控制流程走向
请求 / 响应修改 在请求发送到模型前修改输入,或在响应返回给用户前修改输出
错误处理与重试 实现自定义的错误捕获、处理和重试策略
日志与监控 记录执行过程中的关键信息,用于调试和监控
状态管理 维护执行过程中的上下文状态,确保信息传递和持久化

自定义中间件的装饰器实现方式

实现方式 适用场景 特点 优势 典型使用场景
基于装饰器 - 只需要一个钩子
- 无需复杂配置
使用 @before_model/ @after_model / @wrap_model_call等装饰器 简单、快速、无类结构 - 打印日志
- 单次验证
- 简单重写提示词
- 单次响应校验
基于类的中间件 - 需要多个钩子
- 需要复杂配置
- 需要跨项目复用
继承 AgentMiddleware实现多个 Hook 功能强大、可配置、可复用 - 日志系统(前后钩子)
- 重试 + 校验多组合
- 动态模型选择
- 状态统计
- 企业级中间件库

装饰器方式(官方推荐)

这是 LangChain 官方推荐的一种简单快速的实现方式,特别适合只需要单个钩子的中间件场景。它通过 Python 装饰器语法,优雅地将一个普通函数包装成中间件。

装饰器方式的工作原理

装饰器方式的核心是利用 Python 的装饰器语法糖:

  • 将一个普通函数转换为中间件组件。
  • 当装饰器应用到函数上时,该函数会被自动注册为对应钩子的处理函数,在代理执行到该钩子时被调用。

装饰器优缺点对比

优点 缺点
代码简洁易读 每个函数只能关联一个钩子
实现快速,适合原型开发 状态管理较困难
不需要创建类和处理实例化 难以共享逻辑和配置
直观明了的钩子关联 不适合需要多个钩子的复杂中间件

类继承方式

这种方式通过继承 AgentMiddleware 基类来创建中间件,提供了比装饰器方式更大的灵活性和功能扩展性,特别适合以下场景:

  • 需要实现多个钩子
  • 需要进行复杂的状态管理
  • 需要复杂配置和逻辑复用的中间件

类继承工作原理

类继承方式的核心是面向对象的设计:

  1. 创建一个子类,继承自 AgentMiddleware 基类。
  2. 在子类中重写或实现特定的钩子方法(如 on_agent_start, on_agent_action 等)。
  3. 这种方式允许在单个中间件类中实现多个钩子,并且可以通过类的实例变量来管理和维护执行过程中的状态。

类继承优缺点对比

优点 缺点
可以在单个类中实现多个钩子 代码相对复杂
支持状态管理和实例变量 需要实例化后使用
便于配置和参数化 对于简单中间件可能过于繁琐
适合复杂的中间件逻辑 -
支持继承和代码复用 -

实现方式对比与选择指南

比较维度 装饰器方式 类继承方式
语法复杂度 简单,一行装饰器即可完成 中等,需要创建类和实现方法
钩子数量 每个函数只能关联一个钩子 一个类可以实现多个钩子
状态管理 困难,需要依赖外部变量或闭包 容易,直接使用类的实例变量
配置灵活性 有限,需要借助工厂函数 高,可通过构造函数灵活配置
代码复用 有限,主要通过函数组合实现 高,支持继承和方法复用
适用场景 简单中间件,仅需单个钩子逻辑 复杂中间件,多钩子协同工作

在实际开发中,可以遵循以下原则来选择实现方式:

  • 只需要一个钩子:优先使用装饰器方式,代码更简洁。
  • 需要多个钩子协同工作:必须使用类继承方式,这是唯一支持多钩子的方案。
  • 需要保存状态或配置:优先考虑类继承方式,其状态管理能力更强。
  • 简单原型或快速开发:使用装饰器方式,能快速实现功能。
  • 生产环境或复杂需求:使用类继承方式,更稳定、可维护。

简单来说,装饰器方式是轻量、快速的 “单钩子工具”,而类继承方式是强大、灵活的 “复杂场景解决方案”。

flowchart TD
    A[开始] --> B{需要多个钩子协同工作吗?}
    B -->|是| C[必须使用类继承方式]
    B -->|否| D{需要保存状态或复杂配置吗?}
    D -->|是| E[优先使用类继承方式]
    D -->|否| F{是生产环境还是快速原型?}
    F -->|生产环境/复杂需求| G[推荐使用类继承方式]
    F -->|快速原型/简单场景| H[使用装饰器方式]

Agent Hooks

Node-style 钩子(顺序执行型)

核心特点:在代理执行的特定节点顺序运行,适用于日志记录、验证和状态更新等场景。

可用的 Node-style 钩子

  • before_agent:在代理开始前运行(每次调用一次)
  • before_model:在每次模型调用前运行
  • after_model:在每次模型响应后运行
  • after_agent:在代理完成后运行(每次调用一次)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from langchain.agents.middleware import before_model, after_model, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any

# 在模型调用前检查消息长度,超过50条就直接结束对话
@before_model(can_jump_to=["end"])
def check_message_limit(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
if len(state["messages"]) >= 50:
return {
"messages": [AIMessage("会话限制已达到。")],
"jump_to": "end" # 直接跳转到结束节点
}
return None

# 在模型响应后打印返回内容,用于日志记录
@after_model
def log_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f"模型返回: {state['messages'][-1].content}")
return None
  • check_message_limit:在每次调用大模型前,检查对话历史长度。如果超过 50 条,就直接返回提示并终止代理流程。
  • log_response:在每次模型响应后,自动打印返回的内容,方便调试和日志记录。

Wrap-style 钩子(拦截控制型)

核心特点:拦截并控制处理程序的调用,适用于重试、缓存和转换等场景。

可用的 Wrap-style 钩子

  • wrap_model_call:包装每次模型调用
  • wrap_tool_call:包装每次工具调用
  • 你可以决定处理程序被调用零次(短路)、一次(正常流程)或多次(重试逻辑)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable

# 为模型调用添加最多3次的自动重试逻辑
@wrap_model_call
def retry_model(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
for attempt in range(3):
try:
return handler(request) # 执行原始的模型调用
except Exception as e:
if attempt == 2: # 如果是最后一次尝试,仍然失败则抛出异常
raise
print(f"重试 {attempt + 1}/3 发生错误: {e}")
  • retry_model:包装了所有的模型调用。当模型调用失败时,它会自动重试,最多尝试 3 次。如果前两次都失败,第三次会直接抛出异常。

两种钩子的核心区别

维度 Node-style 钩子 Wrap-style 钩子
执行时机 在代理执行的特定节点(如模型调用前后)按顺序触发 拦截并包裹整个处理函数的执行
主要用途 日志记录、状态验证、流程控制(如提前结束) 重试机制、缓存、请求 / 响应转换
控制能力 可以修改状态或决定流程跳转 可以完全控制处理函数的执行次数和方式

示例代码

自定义中间件基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# @Time:2025/12/30 15:15
# @Author:jinglv
"""
自定义中间件
在执行流中的特定点运行

- **before_agent** - 代理启动前(每次调用一次)
- **before_model** - 每次模型调用前
- **after_model** - 每次模型响应后
- **after_agent** - 代理完成时(每次调用最多一次)
"""
from typing import Any

from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import before_model, ModelRequest, ModelResponse, wrap_model_call, after_model
from langgraph.runtime import Runtime

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")


@before_model
def log_before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""调用大模型之前的中间件:进行日志输出"""
print(f"调用大模型之前{len(state['messages'])} messages")
return None


@wrap_model_call
def model_call(request: ModelRequest, handler) -> ModelResponse:
"""模型调用时会执行的中间件"""
print("开始调用模型")
return handler(request)


@after_model
def log_after_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""调用大模型之后的中间件:进行日志输出"""
print(f"调用大模型之后{len(state['messages'])} messages")
return None


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[],
middleware=[log_before_model, model_call, log_after_model]
)

res = agent.stream({"messages": [{"role": "user", "content": "你好"}]})
print_stream_result(res)

执行结果

1
2
3
4
5
6
7
8
9
10
调用大模型之前1 messages
开始调用模型
-----执行步骤:调用大模型
-----大模型分析的结果:你好!很高兴见到你!😊 我是DeepSeek,由深度求索公司创造的AI助手。无论你有什么问题、需要什么帮助,或者只是想聊聊天,我都很乐意为你提供支持!

我可以帮你解答各种问题,处理文本内容,进行创作和分析等等。虽然我不支持多模态识别,但我可以处理你上传的图像、txt、pdf、ppt、word、excel等文件,从中读取文字信息来帮助你。

有什么我可以为你做的吗?不管是学习、工作还是生活中的问题,我都很愿意帮助你!✨
调用大模型之后2 messages

自定义中间件类的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# @Time:2025/12/30 15:15
# @Author:jinglv
"""
自定义中间件,定义中间件类
在执行流中的特定点运行

- **before_agent** - 代理启动前(每次调用一次)
- **before_model** - 每次模型调用前
- **after_model** - 每次模型响应后
- **after_agent** - 代理完成时(每次调用最多一次)
"""
from typing import Any, Callable

from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from langchain.agents.middleware.types import StateT, ModelCallResult
from langchain.tools.tool_node import ToolCallRequest, ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.runtime import Runtime
from langgraph.types import Command
from langgraph.typing import ContextT

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")


# 定义中间件类
class MyMiddleware(AgentMiddleware):
"""自定义的agent中间件类,注意继承类需要重写对应方法"""

def before_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None:
"""调用大模型之前的中间件"""
print("调用大模型之前的中间件")
print(f"当前state:{state}")
print(f"传递给大模型的消息列表:{state['messages']}")

def after_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None:
"""调用大模型之后的中间件"""
print("调用大模型之后的中间件")
print(f"当前state:{state}")
print(f"传递给大模型的消息列表:{state['messages']}")

def before_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None:
"""agent开始之前"""
print("agent开始之前")
print(f"当前state:{state}")
print(f"传递给大模型的消息列表:{state['messages']}")

def after_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None:
"""agent结束之后"""
print("agent结束之后")
print(f"当前state:{state}")
print(f"传递给大模型的消息列表:{state['messages']}")

def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelCallResult:
"""大模型调用时执行"""
print("大模型调用时执行")
res = ""
try:
res = handler(request)
print("大模型调用成功")
except Exception as e:
print(f"大模型调用错误:{str(e)}")
else:
print("大模型调用失败")
return res

def wrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
"""工具调用时执行"""
print("工具调用时执行")
return handler(request)


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
middleware=[MyMiddleware()]
)

res = agent.stream({"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"})
print_stream_result(res)

自定义中间件工具执行失败重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# @Time:2025/12/30 15:15
# @Author:jinglv
"""
自定义中间件
在执行流中的特定点运行

- **before_agent** - 代理启动前(每次调用一次)
- **before_model** - 每次模型调用前
- **after_model** - 每次模型响应后
- **after_agent** - 代理完成时(每次调用最多一次)
"""
from typing import Callable

from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain.tools.tool_node import ToolCallRequest, ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.types import Command

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")


# 定义中间件类
class ToolsCallMiddleware(AgentMiddleware):
"""自定义的agent中间件类,注意继承类需要重写对应方法"""

def __init__(self, count: int = 3):
self.count = count

def wrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
"""工具调用时执行"""
print(f"开始调用工具{request}")
for i in range(self.count):
try:
return handler(request)
except Exception as e:
print(f"调用工具出错了,错误信息是{str(e)}")
if i == len(self.count - 1):
raise e
print(f"工具调用开始第{i + 1}轮调试!")


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
middleware=[ToolsCallMiddleware()]
)

res = agent.stream({"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"})
print_stream_result(res)

Agent跳转

要从中间件中提前退出,返回一个包含 jump_to 的字典

中间件(middleware)时,如何通过返回一个包含jump_to字段的字典来改变代理(agent)的执行流程。具体来说,它列出了可用的跳转目标(jump targets),以及每个目标对应的含义。以下是对这段内容的详细解释:

可用跳转目标

  • ‘end’: 跳转到代理执行的末尾(或者第一个after_agent钩子)。当在中间件中返回{“jump_to”: “end”}时,代理会跳过当前执行流程中剩余的步骤,直接执行after_agent钩子(如果有的话),然后结束执行。这通常用于在满足某些条件时提前终止代理的执行,例如达到对话限制或检测到不适当的内容。
  • ‘tools’: 跳转到工具节点(tools node)。当返回{“jump_to”: “tools”}时,代理会跳过当前的模型调用或其他中间件逻辑,直接进入工具调用阶段。这在需要直接调用工具而跳过模型推理时很有用,例如在某些情况下,根据上下文直接选择一个工具来执行。
  • ‘model’: 跳转到模型节点(model node)或者第一个before_model钩子。返回{“jump_to”: “model”}会使得代理跳过当前的中间件逻辑,直接进入模型调用阶段。这通常用于在中间件中进行了一些预处理后,直接将控制权交给模型,或者在某些情况下需要重新触发模型调用。

使用场景

这些跳转目标为开发者提供了灵活的控制手段,可以在中间件中根据不同的逻辑和条件动态地改变代理的执行流程。例如:

  • 提前终止执行:当检测到对话达到预设的限制或者出现不适当的内容时,使用jump_to: “end”来提前结束代理的执行,避免进一步的处理。
  • 直接调用工具:在某些情况下,根据上下文直接选择并调用工具,而跳过模型推理阶段,可以提高效率并简化执行流程。
  • 重新触发模型调用:在中间件中对输入数据进行了一些修改或调整后,使用jump_to: “model”来重新触发模型调用,确保模型能够基于最新的数据进行推理。

重要提示:

  • before_modelafter_model 跳转时,跳转到 “model” 将导致所有before_model中间件再次运行。
  • 要启用跳转,请使用 @hook_config(can_jump_to=[…])装饰您的钩子

官方示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain.agents.middleware import after_model, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
last_message = state["messages"][-1]
if "BLOCKED" in last_message.content:
return {
"messages": [AIMessage("I cannot respond to that request.")],
"jump_to": "end"
}
return None

练习代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# @Time:2025/12/30 15:15
# @Author:jinglv
from typing import Any

from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import AgentMiddleware, hook_config
from langchain.tools.tool_node import ToolRuntime, StateT, ContextT
from langchain_core.messages import AIMessage
from langchain_core.tools import tool
from langgraph.runtime import Runtime

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")


# 定义中间件类
class ToolsCallMiddleware(AgentMiddleware):
"""自定义的agent中间件类,注意继承类需要重写对应方法"""

@hook_config(can_jump_to=["end", "tools"])
def before_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None:
"""工具调用时执行"""
print("调用大模型之前的中间件")
print(f"当前state:{state}")
print(f"传递给大模型的消息列表:{state['messages']}")
return {
"jump_to": "tools",
"messages": [AIMessage(
content="",
tool_calls=[{"name": "get_user_info", "args": {}, "id": "123"}]
)]
}


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
middleware=[ToolsCallMiddleware()]
)

res = agent.stream({"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"})
print_stream_result(res)

在中间件中修改 State(短期记忆)

中间件可以通过自定义属性扩展 Agent 的 State。定义一个自定义状态类型并将其设置为 state_schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langchain.agents.middleware import AgentState, AgentMiddleware
from typing_extensions import NotRequired
from typing import Any

class CustomState(AgentState):
model_call_count: NotRequired[int]
user_id: NotRequired[str]

class CallCounterMiddleware(AgentMiddleware[CustomState]):
state_schema = CustomState

def before_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
# Access custom state properties
count = state.get("model_call_count", 0)

if count > 3:
return {"jump_to": "end"}

return None

def after_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
# Update custom state
return {"model_call_count": state.get("model_call_count", 0) + 1}

中间件使用最佳实践

  1. 保持中间件的专注性——每个中间件只做好一件事
  2. 优雅地处理错误——不要让中间件错误导致代理崩溃
  3. 使用适当的钩子类型:
    • 节点式用于顺序逻辑(日志记录、验证)
    • 包装式用于控制流(重试、回退、缓存)
  4. 清晰地记录所有自定义状态属性(State)
  5. 在集成之前独立进行中间件单元测试
  6. 考虑执行顺序——将关键中间件放在列表前面
  7. 尽可能使用内置中间件,不要重复造轮子