Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangChain自定义中间件的开发

中间件的章节中已经介绍了自定义中间件的内容,这章节主要详细说明自定义中间件的开发。

@before_agent

@before_agent 装饰器,它是在代理执行流程开始前运行的钩子函数,主要用于初始化、数据准备或权限验证等操作。

模块 内容
作用 在代理处理用户请求前执行,用于拦截初始状态,进行初始化、数据准备或权限验证。
装饰器参数 can_jump_to:可选参数,字符串列表,指定钩子函数可合法跳转到的工作流节点。
回调函数参数 - state: AgentState:代理完整状态对象,包含 messages(消息历史)、user_id(用户标识)、workflow_state(工作流状态)及其他自定义字段。
- runtime: Runtime:运行时实例,提供执行环境、配置信息和可用工具。
返回值 - None:不修改状态,继续正常流程。
- dict[str, Any]:修改后的状态字典。若包含 jump_to 键且值在 can_jump_to 列表中,则系统会跳转到指定节点,绕过正常流程。

工作流程解析

  1. 用户发起请求,代理开始执行。
  2. 所有被 @before_agent 装饰的函数,会按照注册顺序依次执行。
  3. 每个钩子函数都可以修改代理状态,或者触发流程跳转。
  4. 经过所有钩子处理后的最终状态,会被传递给代理的主处理逻辑。

适用场景

场景 说明
会话初始化 添加系统提示、初始化会话上下文、设置默认参数等。
用户身份验证 验证用户凭证、检查访问权限、记录用户活动等。
输入预处理 规范化输入格式、补充缺失信息、验证必要参数等。
条件拦截 根据特定条件提前终止流程,或跳转到其他处理路径。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain.agents.middleware import before_agent, AgentState
from langchain.messages import SystemMessage
from langgraph.runtime import Runtime
from typing import Any


@before_agent(can_jump_to=["end"])
def initialize_session(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
# 检查是否为首次调用,添加系统提示
if not any(isinstance(msg, SystemMessage) for msg in state["messages"]):
return {
"messages": [
SystemMessage(content="你是一个有帮助的AI助手。"),
*state["messages"]
]
}

if not state.get("user_id"):
# 如果没有用户ID,拒绝处理
return {
"messages": state["messages"] + [SystemMessage(content="缺少必要的用户信息。")],
"jump_to": "end"
}

return None

这段代码实现了一个 @before_agent 中间件,主要做了两件事:

  1. 会话初始化:在首次调用时,自动添加系统提示,设定 AI 助手的角色。
  2. 用户身份验证:检查 user_id 是否存在,如果不存在则添加错误提示,并直接跳转到流程结束节点 end,拒绝后续处理。

@after_agent

@after_agent 是在代理执行完成后执行的钩子函数,主要用于清理工作、结果汇总等后置处理场景。

  1. 核心作用

    • 执行时机:在代理的主处理逻辑执行完毕后触发。

    • 主要用途

      • 清理临时资源、关闭连接。

      • 对代理的输出结果进行格式化、汇总或日志记录。

      • 进行后置的权限检查或审计。

  2. 装饰器参数

    | 参数名 | 类型 | 说明 |
    | :—————— | :————————- | :—————————————————————————————- |
    | can_jump_to | list [str](可选) | 指定当前钩子函数可以合法跳转到的节点名称列表。当钩子返回包含 jump_to 键的字典时,系统会验证目标节点是否在此列表中。 |

  3. 回调函数参数

    | 参数名 | 类型 | 说明 |
    | :———— | :—————- | :—————————————————————————————- |
    | state | AgentState | 当前代理的完整状态对象,包含消息历史、用户 ID、工作流状态等所有信息。 |
    | runtime | Runtime | 当前运行时实例,提供执行环境上下文,包括配置信息、可用工具等。 |

  4. 返回值

    | 返回类型 | 说明 |
    | :———————- | :—————————————————————————————- |
    | None | 表示不修改状态,继续执行后续的正常流程。 |
    | dict[str, Any] | 返回修改后的状态字典。可以更新现有状态字段,或添加新字段。如果字典中包含 jump_to 键,且其值在 can_jump_to 列表中,系统会跳转到指定节点。 |

工作流程解析

@after_agent 钩子的执行流程如下:

  1. 触发时机:代理的主处理逻辑执行完成后,自动触发此钩子。
  2. 参数传入:系统会将当前代理的完整状态对象(state)和运行时实例(runtime)传入回调函数。
  3. 逻辑处理:回调函数根据业务逻辑处理状态,并决定是否对其进行修改。
  4. 状态更新
    • 返回 None:保持原状态不变,流程继续。
    • 返回修改后的状态字典:系统将使用新的状态覆盖原有状态。

适用场景

场景 说明
会话总结和记录 收集会话统计信息,生成会话摘要,用于后续复盘或存档。
结果优化 对代理生成的最终结果进行增强、修正或格式化,提升输出质量。
资源清理 释放会话中使用的临时文件、数据库连接或内存缓存等资源。
日志记录 记录会话完成情况、耗时、用户 ID 等信息,便于分析和调试。
状态重置 清空或重置部分状态字段,为下一次用户交互准备干净的初始环境。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from langchain.agents.middleware import after_agent, AgentState
from langchain.messages import SystemMessage
from langgraph.runtime import Runtime
from typing import Any
import time


@after_agent
def session_summary(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
# 收集会话统计信息
human_messages = sum(1 for msg in state["messages"] if msg.type == "human")
ai_messages = sum(1 for msg in state["messages"] if msg.type == "ai")

# 创建会话摘要
summary = f"会话已完成。\n" \
f"- 人类消息:{human_messages}\n" \
f"- AI消息:{ai_messages}\n" \
f"- 总消息数:{len(state['messages'])}"

# 保存会话记录
timestamp = time.strftime("%Y%m%d-%H%M%S")
session_id = f"session_{timestamp}"

# 在实际应用中,这里会将会话信息保存到数据库或文件系统
print(f"会话 {session_id} 已保存:{summary}")

# 添加会话结束消息
return {
"messages": state["messages"] + [
SystemMessage(content=summary)
]
}

@before_model

@before_model,这是一个专门的装饰器,用于标记后续定义的函数是模型调用前的钩子。在每次调用语言模型生成响应之前执行。作为代理决策调用模型前的最后一道关卡,用于对输入进行处理、优化、过滤,是实现输入质量控制和安全检查的关键。

  1. 装饰器参数 @before_model

    | 项目 | 内容详情 |
    | :————- | :————————————————————————— |
    | 装饰器名称 | @before_model |
    | 作用 | 标记函数为模型调用前钩子,在 LLM 生成响应前执行 |
    | 核心参数 | can_jump / can_jump_to |
    | 参数类型 | List[str](字符串列表) |
    | 参数含义 | 声明当前钩子允许跳转到的工作流节点 |
    | 使用规则 | 跳转目标必须是已定义的合法节点 |
    | 核心价值 | 实现流程控制、中断模型调用、分支跳转 |

  2. 回调函数参数(钩子固定入参)

    钩子函数被调用时,自动传入这两个固定参数,无需手动传参:

    | 参数名 | 类型 | 核心作用 | 包含关键内容 |
    | :———— | :————————— | :——————————- | :—————————————————————————————- |
    | state | AgentState / 字典 | 存储会话全量状态 | 1. messages:对话历史列表2. model_input:模型入参3. 会话上下文数据 |
    | runtime | Runtime | 提供运行时环境能力 | 1. 配置信息2. 可用工具 / 接口3. 环境实例 |

  3. 回调函数返回值(决定执行流程)

    | 返回值类型 | 返回值示例 | 执行逻辑 | 业务场景 |
    | :———————- | :——————————————— | :—————————————————- | :——————————————— |
    | None | return None | 不修改状态,直接调用模型 | 无需处理,直接放行 |
    | 普通字典 dict | return {"model_input": "xxx"} | 修改输入状态,再调用模型 | 数据清洗、优化 Prompt、过滤内容 |
    | 带跳转指令的字典 | return {"jump_to": "节点名"} | 中断模型调用,强制跳转到指定节点 | 输入违规、流程分支、异常拦截 |

工作流程解析

这是 @before_model 钩子的完整执行链路,清晰描述了从 “代理决定调用模型” 到 “最终执行” 的全过程:

  1. 触发起点:代理判断当前需要调用语言模型来生成响应。

  2. 钩子执行:所有被 @before_model 装饰的函数,会按照注册的先后顺序依次执行,形成一个处理流水线。

  3. 状态干预:每个钩子函数都可以对输入状态做两件事:

    • 修改输入内容(如清洗数据、优化提示词)
    • 触发流程跳转(返回 jump_to 指令)
  1. 正常流程:如果所有钩子都没有触发跳转,最终处理后的状态会被传递给语言模型,执行正常的生成逻辑。

  2. 跳转流程:如果任意一个钩子函数返回了 jump_to 指令,则立即中断后续钩子执行和模型调用,直接跳转到工作流中指定的节点去处理。

适用场景

@before_model 钩子的核心价值在于前置干预,以下是它的典型业务场景:

场景分类 核心作用 具体实现示例
1. 内容安全过滤 防止有害输入进入模型 检测敏感词、不当言论,直接拦截或替换后再发送
2. 输入优化与规范化 提升模型输入质量 格式化用户输入、修正拼写错误、自动补充上下文信息
3. 上下文管理 控制对话历史长度与质量 截断过长的对话历史、优化提示词工程、注入系统指令
4. 条件控制 实现灵活的流程分支 根据输入内容判断是否需要调用模型,或切换到其他处理路径
5. 成本控制 降低模型调用开销 限制输入长度、合并相似请求、拦截重复查询以减少无效调用
6. 业务规则集成 嵌入业务逻辑 验证业务规则(如权限校验、格式校验)、添加业务专属处理逻辑

核心价值总结 ✨

  • 流程可控:通过多钩子顺序执行和跳转机制,实现高度灵活的工作流控制。
  • 安全前置:把安全检查、内容过滤放在模型调用前,从源头规避风险。
  • 性能优化:通过上下文裁剪、请求合并等手段,有效降低模型调用成本。
  • 业务解耦:将业务逻辑(如权限、规则)与模型调用解耦,便于维护和扩展。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain.agents.middleware import before_model, AgentState
from langchain.messages import HumanMessage, SystemMessage
from langgraph.runtime import Runtime
from typing import Any

@before_model(can_jump_to=["end"])
def filter_sensitive_content(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
# 检查是否包含敏感内容
sensitive_words = ["敏感词1", "敏感词2"]

for msg in state["messages"]:
if isinstance(msg, HumanMessage):
for word in sensitive_words:
if word in msg.content.lower():
return {
"messages": [SystemMessage(content="检测到敏感内容,请重新输入。")],
"jump_to": "end"
}

# 截断过长的消息
processed_messages = []
for msg in state["messages"]:
if len(msg.content) > 1000:
msg.content = msg.content[:990] + "...[内容已截断]"
processed_messages.append(msg)

return {"messages": processed_messages}

导入模块

  • langchain.agents.middleware 导入核心装饰器 before_model 和状态类型 AgentState
  • 导入消息类型 HumanMessageSystemMessage 用于构造对话
  • 导入 Runtime 获取运行时环境信息
  • 导入 Any 用于类型注解

装饰器定义

  • @before_model(can_jump_to=["end"]):将函数标记为模型调用前钩子,并声明允许跳转到 end 节点

敏感内容过滤逻辑

  • 遍历会话消息,仅检查用户消息(HumanMessage
  • 若检测到敏感词,返回包含系统提示和 jump_to: "end" 的字典,直接中断流程并跳转至结束节点

消息截断逻辑

  • 遍历所有消息,将长度超过 1000 字符的内容截断为前 990 字符并添加截断提示
  • 返回处理后的消息列表,继续正常模型调用流程

返回值逻辑

  • 检测到敏感内容:返回带 jump_to 的字典,中断模型调用
  • 未检测到敏感内容:返回处理后的消息字典,将截断后的消息传递给模型

@after_model

@after_model模型生成响应后、返回给用户前执行的钩子函数,它是对模型输出进行后处理、优化、格式化和分析的关键环节,用于实现输出质量控制、内容增强与结果优化。

装饰器参数:can_jump_to

项目 详情
参数类型 可选参数,字符串列表(List[str]
作用 声明当前钩子函数允许跳转到的工作流节点名称
约束 跳转目标必须是工作流中已定义的有效节点
使用场景 当检测到模型输出需要重新处理、纠错或特殊操作时,返回包含 'jump_to' 键的字典,实现流程跳转(如重跑模型、切换处理分支)

回调函数参数(钩子固定入参)

钩子执行时会自动传入两个核心参数,提供模型调用后的完整上下文:

参数名 类型 核心作用 包含关键内容
state AgentState(字典类型) 存储模型调用后的完整会话状态 1. messages:包含模型最新响应的完整消息历史2. model_response:模型的原始响应数据3. 当前会话的上下文信息
runtime Runtime 提供运行时环境能力 1. 运行环境信息2. 可用工具与配置参数3. 用于访问外部服务或执行额外操作

返回值:决定后续流程

返回值类型 含义 执行逻辑 典型场景
None 不修改状态 直接将模型原始响应传递给后续处理(如返回给用户) 无需后处理,直接放行
dict[str, Any] 修改后的状态字典 对模型输出进行干预:・修改响应内容・添加额外信息・格式化输出・纠正错误若包含 'jump_to' 键,则中断当前流程,跳转到指定节点 内容格式化、结果增强、错误修正、触发重试逻辑

@before_model 的核心区别 ✨

维度 @before_model @after_model
触发时机 模型调用 模型生成响应
核心目标 输入质量控制、安全检查、流程前置拦截 输出质量控制、内容增强、结果后处理
状态内容 待发送给模型的输入消息与参数 已生成的模型响应与完整消息历史
典型用途 敏感词过滤、输入规范化、上下文裁剪 输出格式化、错误修正、内容补充、结果分析

核心价值总结 💡

  • 输出可控:在返回用户前对模型结果做最后一层校验与优化,提升输出质量。
  • 灵活流程:通过 jump_to 实现输出异常时的重试、分支处理或特殊业务逻辑。
  • 业务增强:可集成业务规则(如格式校验、数据补充),让模型输出更贴合业务需求。

工作流程解析

这是 @after_model 钩子的完整执行链路,清晰描述了从 “模型生成响应” 到 “最终返回用户” 的全过程:

  1. 触发起点:语言模型生成响应并返回给代理。
  2. 钩子执行:所有被 @after_model 装饰的函数,会按照注册的先后顺序依次执行,形成一条后处理流水线。
  3. 状态干预:每个钩子函数都可以对响应做两件事:
    • 处理响应内容(如格式化、增强、纠错)
    • 触发流程跳转(返回 jump_to 指令)
  4. 正常流程:如果所有钩子都没有触发跳转,最终处理后的响应会被返回给用户。
  5. 跳转流程:如果任意一个钩子函数返回了 jump_to 指令,则立即中断后续钩子执行和返回流程,直接跳转到工作流中指定的节点去执行后续逻辑(如重新调用模型、调用工具、追问用户等)。

适用场景

@after_model 钩子的核心价值在于后置干预,以下是它的典型业务场景:

场景分类 核心作用 具体实现示例
1. 输出格式化 美化输出样式,适配不同终端 将 Markdown 转为 HTML、添加代码高亮、生成表格、适配聊天框样式
2. 内容增强 补充信息,提升回答完整性 给专业术语加注释、插入资料来源链接、补充背景知识说明
3. 响应监控与日志 记录与分析模型行为 记录响应耗时、内容长度、用户反馈,统计调用频次,分析回答质量
4. 结果验证与修正 校验输出准确性,修正错误 检查事实错误、补充缺失数据、纠正逻辑偏差,必要时触发模型重试
5. 条件处理 实现灵活的后续流程 判断是否需要调用工具(如查数据库)、是否需要追问用户获取更多信息
6. 多语言处理 适配多语言环境 将模型输出翻译为目标语言、处理中英混合内容、根据区域设置调整表述

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from langchain.agents.middleware import after_model, AgentState
from langchain.messages import AIMessage, SystemMessage
from langgraph.runtime import Runtime
from typing import Any
import json

@after_model
def format_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
# 分析最后一条AI消息
if state["messages"] and isinstance(state["messages"][-1], AIMessage):
last_message = state["messages"][-1]

# 记录响应内容到日志
with open("model_responses.log", "a") as f:
log_entry = {
"timestamp": "2023-05-15T12:00:00Z",
"response_length": len(last_message.content),
"has_code": "```" in last_message.content
}
f.write(json.dumps(log_entry) + "\n")

# 为包含代码的响应添加提示
if "```" in last_message.content:
enhanced_content = last_message.content + "\n\n提示:以上代码仅供参考,请根据实际需求进行调整。"

# 创建新的消息列表,替换最后一条消息
new_messages = state["messages"][:-1] + [
AIMessage(content=enhanced_content, name=last_message.name)
]

return {"messages": new_messages}

return None

导入模块

  • langchain.agents.middleware 导入 after_model 装饰器和 AgentState 状态类型
  • 导入消息类型 AIMessageSystemMessage
  • 导入 Runtime 运行时实例和 Any 类型注解
  • 导入 json 用于日志格式化

装饰器定义

  • @after_model:将函数标记为模型响应后的钩子函数,无跳转节点配置

消息判断与日志记录

  • 检查消息列表是否存在且最后一条为 AIMessage
  • 提取最后一条模型响应消息,记录时间戳、响应长度、是否包含代码到日志文件 model_responses.log

内容增强与替换

  • 若响应包含代码块(`````),在末尾添加提示语
  • 构造新的消息列表,替换原模型响应为增强后的内容
  • 返回修改后的消息状态,传递给后续处理

默认返回

  • 若未触发任何处理逻辑,返回 None,保持原始响应不变

@wrap_model_call

@wrap_model_call包装并拦截模型调用的钩子,属于 Wrap-style 钩子,核心能力是对模型请求 / 响应做底层拦截与控制,适用于重试、缓存、修改请求 / 响应等场景。

回调函数参数详解

参数名 类型 核心作用 关键信息
request ModelRequest 封装模型请求的完整数据 包含发送给模型的所有参数:messages(对话历史)、model(模型名称)、temperature(采样温度)等
handler Callable[[ModelRequest], ModelResponse] 原始模型调用处理函数 调用 handler(request) 会执行真实的模型请求,是被包装的核心逻辑

返回值要求

必须返回 ModelResponse 类型的对象,且必须包含:

  • content:模型生成的响应内容
  • model:调用的模型标识
  • 其他必要字段(如 usage token 统计等)

注意:不能返回 None 或字典,必须返回完整的 ModelResponse 对象,否则会中断流程。

工作流程解析

触发时机:当系统准备调用模型时,自动触发 @wrap_model_call 包装器。

参数传入:系统自动传入 request(模型请求对象)和 handler(原始处理函数)。

自定义逻辑:回调函数可以在 handler(request) 之前 / 之后插入自定义逻辑:

  • 前置:修改请求参数、记录日志、权限校验、缓存检查
  • 核心:调用 handler(request) 执行模型请求
  • 后置:修改响应内容、缓存结果、异常重试、统计耗时

返回响应:回调函数必须返回处理后的 ModelResponse 对象。

流程继续:系统使用返回的 ModelResponse 继续后续 Agent 流程。

适用场景

@wrap_model_call 最核心的业务场景,每个场景都对应了底层拦截与控制的能力:

场景 核心作用 实现思路
请求 / 响应缓存 缓存常见查询,减少 API 调用、提升响应速度 调用前根据 request 生成缓存键(如消息哈希),命中则直接返回缓存的 ModelResponse;未命中则调用 handler 并缓存结果
重试机制 为失败的模型调用添加自动重试逻辑 捕获 handler(request) 抛出的异常(如网络错误、限流),在异常后重试调用,直到成功或达到最大重试次数
请求修改 发送前动态调整模型参数 在调用 handler 前修改 request 对象,如调整 temperaturemax_tokensmodel名称等
响应增强 对模型原始输出做后处理与优化 调用 handler 获取响应后,修改 ModelResponse.content,如格式转换、内容补充、错误修正
监控和日志 记录调用详情,用于性能分析与问题排查 在调用前后记录时间戳、参数、耗时、token 用量、异常信息等,输出到日志或监控系统
请求限流 限制调用频率,避免超出 API 配额 在调用前检查频率阈值,超出则等待或抛出限流异常;也可实现滑动窗口、令牌桶等限流算法

场景背后的核心逻辑 💡

这些场景都依赖 @wrap_model_call 的两个关键能力:

  1. 完全控制调用流程:可以决定是否调用 handler(如缓存命中时直接返回)、何时调用(如重试、限流等待)。
  2. 直接操作请求 / 响应对象:可以读写 ModelRequest 的所有参数,也能修改或替换 ModelResponse 的内容。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable, Dict, Any
import hashlib
import time

# 简单的内存缓存
response_cache: Dict[str, Dict[str, Any]] = {}

@wrap_model_call
def cached_model_call(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
# 生成更可靠的缓存键,只使用消息内容和模型名称
messages_content = "".join([msg.content for msg in request.messages])
cache_key = hashlib.md5(f"{messages_content}:{request.model}".encode()).hexdigest()

# 检查缓存
if cache_key in response_cache:
cached = response_cache[cache_key]
if time.time() - cached["timestamp"] < 3600: # 1小时缓存
print("从缓存获取响应")
# 直接返回缓存的响应对象,避免创建新的ModelResponse
return cached["response"]

# 执行原始调用并添加重试逻辑
for attempt in range(3):
try:
response = handler(request)
# 缓存响应对象本身,而不是尝试访问其内部属性
response_cache[cache_key] = {
"response": response, # 缓存完整响应对象
"timestamp": time.time()
}
return response
except Exception as e:
if attempt == 2:
raise
print(f"模型调用失败,第 {attempt + 1} 次重试...")
time.sleep(2) # 等待后重试

代码完整解读 📝

  1. 导入与全局缓存

    • 导入 wrap_model_call 装饰器和模型请求 / 响应类型,用于实现模型调用拦截。

    • 定义全局字典 response_cache,用于存储缓存的响应对象和时间戳。

  2. 缓存键生成

    • 拼接所有消息内容和模型名称,生成 MD5 哈希作为缓存键,保证相同请求命中同一缓存。
  3. 缓存检查逻辑

    • 若缓存键存在且未过期(1 小时内),直接返回缓存的 ModelResponse,避免重复调用模型。
  4. 重试 + 缓存写入逻辑

    • 最多重试 3 次模型调用,捕获异常后等待 2 秒重试,最后一次失败则抛出异常。

    • 调用成功后,将完整的 ModelResponse 对象和当前时间戳存入缓存,供后续请求复用。

@wrap_tool_call

@wrap_tool_call包装并拦截工具调用的钩子,属于 Wrap-style 钩子,核心能力是对工具调用做底层拦截与控制,适用于工具调用前检查、权限控制、参数校验等场景。

回调函数参数详解

参数名 类型 核心作用 关键信息
request Any 封装工具调用的完整请求数据 包含 tool_call 属性,其中有工具名称调用参数工具调用 ID 等核心信息
handler Callable[[Any], Any] 原始工具调用处理函数 调用 handler(request) 会执行真实的工具调用逻辑,是被包装的核心执行单元

返回值要求

必须返回一个有效的响应对象,通常是 ToolMessage 类型,且需包含:

  • content:工具执行后的结果内容
  • tool_call_id:与本次工具调用对应的 ID,用于关联请求
  • 其他必要字段(如工具名称、执行状态等)

注意:不能返回 None,必须返回可被系统识别的响应对象,否则会中断流程。

工作流程解析

触发时机:当代理决定调用工具时,自动触发 @wrap_tool_call 包装器。

参数传入:系统自动传入 request(工具请求对象)和 handler(原始处理函数)。

自定义逻辑:回调函数可以在 handler(request) 之前 / 之后插入自定义逻辑:

  • 前置:权限校验、参数合法性检查、工具调用频率限制、日志记录
  • 核心:调用 handler(request) 执行真实工具调用
  • 后置:结果格式化、错误处理、结果缓存、审计日志

返回响应:回调函数必须返回处理后的响应对象(通常是 ToolMessage)。

流程继续:系统使用返回的响应对象继续后续 Agent 流程(如将工具结果喂给模型生成最终回答)。

适用场景

@wrap_tool_call 最核心的业务场景,每个场景都对应了底层拦截与控制的能力:

场景 核心作用 实现思路
权限控制 验证用户是否有权限调用特定工具 调用前从请求或上下文获取用户权限信息,判断是否允许调用该工具;无权限则直接返回拒绝的 ToolMessage,不执行工具
参数校验 检查工具参数的有效性和安全性 解析工具调用参数,验证格式、数值范围、敏感内容等;非法参数则返回错误提示,避免工具执行失败
调用监控 记录工具调用详情,用于审计与分析 调用前后记录工具名称、参数、执行耗时、结果状态等,输出到日志或审计系统
错误处理 为工具调用失败提供自定义错误响应 捕获工具执行异常,返回友好的错误信息(如 “工具暂时不可用,请稍后再试”),避免异常向上层扩散
结果过滤 对工具返回结果进行过滤或增强 工具执行成功后,修改 ToolMessage.content,如过滤敏感信息、补充说明、格式化输出
限流保护 限制工具调用频率,防止滥用 调用前检查调用频次,超出阈值则等待或返回限流提示;可实现滑动窗口、令牌桶等限流算法

场景背后的核心逻辑 💡

这些场景都依赖 @wrap_tool_call 的两个关键能力:

  1. 完全控制调用流程:可以决定是否调用 handler(如无权限 / 限流时直接拒绝)、何时调用(如限流等待)。
  2. 直接操作请求 / 响应对象:可以读写工具调用的参数、工具名称,也能修改或替换工具返回的 ToolMessage 内容。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langchain.agents.middleware import wrap_tool_call
from typing import Any, Callable
from langchain_core.messages import ToolMessage

@wrap_tool_call
def tool_permission_check(
request: Any,
handler: Callable[[Any], Any],
) -> Any:
# 获取工具名称和参数
tool_name = request.tool_call.get("name")
tool_args = request.tool_call.get("args", {})

# 工具权限控制
admin_tools = ["get_weather"]
restricted_tools = ["payment_processor"]

# 获取用户角色(在实际应用中,这会从状态或上下文获取)
user_role = "user" # 示例值,实际使用时需要从上下文获取

# 检查是否有权限调用该工具
if tool_name in admin_tools and user_role != "admin":
return ToolMessage(
content=f"权限不足:需要管理员权限才能使用 {tool_name} 工具。",
tool_call_id=request.tool_call.get("id")
)

# 执行原始工具调用
return handler(request)

代码完整解读 📝

  1. 导入部分

    • 导入 wrap_tool_call 装饰器,用于拦截工具调用。

    • 导入 ToolMessage,用于返回权限不足的错误响应。

    • 导入类型注解 AnyCallable 保证类型安全。

  2. 工具信息提取

    • request.tool_call 中获取工具名称 tool_name 和参数 tool_args
  3. 权限控制逻辑

    • 定义两类工具:admin_tools(仅管理员可用)和 restricted_tools(受限工具)。

    • 模拟用户角色 user_role = "user",实际场景中应从 Agent 状态或上下文获取。

    • 若用户尝试调用管理员工具但角色不是 admin,直接返回 ToolMessage,提示权限不足,不执行工具调用

  4. 正常流程

    • 权限校验通过后,调用 handler(request) 执行原始工具调用,并返回结果。

@wrap_model_call 的核心区别 ⚡

维度 @wrap_tool_call @wrap_model_call
拦截目标 工具调用(ToolCall) 模型调用(ModelRequest)
操作对象 工具请求 / 响应(ToolCallRequest/ToolMessage) 模型请求 / 响应(ModelRequest/ModelResponse)
典型场景 权限控制、参数校验、工具调用审计 请求缓存、重试、模型参数调整
返回值 ToolMessage(工具结果) ModelResponse(模型回答)

@dynamic_prompt

@dynamic_prompt动态生成系统提示的钩子,属于 Node-style 钩子,核心能力是根据对话上下文实时生成个性化系统提示,适用于个性化提示、上下文感知提示、角色化交互等场景。

回调函数参数详解

参数名 类型 核心作用 关键信息
request Any 封装模型请求的完整上下文 包含要发送给模型的消息历史、模型配置(如模型名称、temperature)等信息,是生成动态提示的依据

返回值要求

必须返回 str 类型的文本:

  • 该文本会被系统自动包装为 SystemMessage,添加到模型输入的系统提示部分。
  • 用于替换或增强原有的静态系统提示,实现个性化引导。

注意:返回值必须是字符串,不能返回 None 或其他类型,否则无法生成有效系统提示。

工作流程解析

  1. 触发时机:当代理准备构建模型提示时,自动触发 @dynamic_prompt 钩子。
  2. 上下文传入:系统传入包含完整对话历史的 request 对象,作为分析依据。
  3. 上下文分析:回调函数分析消息历史,识别用户的语言类型(如中文 / 英文)、对话主题(如技术问题 / 生活咨询)、用户意图等。
  4. 动态提示生成:根据分析结果,选择合适的角色(如 “技术专家”/“生活顾问”)、专业领域知识或个性化规则,生成对应的系统提示文本。
  5. 提示注入:系统将生成的文本作为 SystemMessage 添加 / 替换到模型输入中。
  6. 模型执行:模型基于增强后的个性化提示生成回答,输出更贴合用户需求的结果。

适用场景

@dynamic_prompt 最强大的业务场景,每个场景都对应了提示词的动态注入与定制

场景 核心作用 实现思路
角色化交互 给 AI 设定特定身份,使其回答更专业 分析用户问题主题,生成对应的角色指令。例:用户问代码→生成 “你是资深 Python 专家”;用户问数学→生成 “你是数学大师”。
上下文感知提示 确保 AI 回复与当前话题高度相关 分析对话历史,根据上下文动态调整提示。例:用户前句聊 “Python 异步”,后句问 “如何处理”→自动在提示中加入异步相关背景。
领域专家模式 提供专业、深入的领域回答 识别用户输入的专业领域,注入领域知识和规范。例:医疗问题→注入医疗术语规范;法律问题→注入法律问答逻辑。
多语言智能切换 自动适配语言环境 检测用户输入语言,切换提示语和回答风格。例:用户输入英文→生成英文系统提示;输入日文→切换日文风格。
个性化体验定制 定制专属的 AI 助手 根据用户交互习惯(如偏好简洁 / 详细 / 幽默)生成对应风格的提示。
任务特定指令增强 针对特定任务添加专属指导 识别任务类型,插入任务专用指令。例:编程→添加 “代码需符合 PEP8 规范”;写作→添加 “需分点清晰、逻辑严密”。
跨文化交流支持 确保交流的文化适配性 在多语言环境中,添加文化背景相关的表达,避免文化冲突。

背后的核心逻辑 💡

这些场景都依赖 @dynamic_prompt 的两个核心能力:

  1. 上下文分析:能读取 request 中的对话历史,识别主题、语言、领域、任务类型
  2. 动态生成:根据分析结果,程序化生成系统提示文本(str),实现一劳永逸的 “动态 Prompt 工程”。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from langchain.agents.middleware import dynamic_prompt
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import HumanMessage, ToolMessage
import re

# 定义正则表达式模式,用于识别用户意图
# 预编译正则表达式,提升匹配效率
re_code = re.compile(r'代码|编程|开发|python|javascript|java|c\+\+|html|css', re.IGNORECASE)
re_math = re.compile(r'数学|计算|算法|统计', re.IGNORECASE)
re_history = re.compile(r'历史|科学|文化|艺术', re.IGNORECASE)

@dynamic_prompt
def generate_contextual_prompt(request: Any) -> str:
# 基础提示模板
base_prompt = "你是一个有帮助的AI助手。"

# 分析上下文,确定对话主题和领域
last_user_message = None

# 从 request 对象中获取最后一条用户消息
# 兼容不同结构的 request 对象
if hasattr(request, 'messages'):
# 反向遍历消息列表,寻找最近的 HumanMessage
for msg in reversed(request.messages):
if isinstance(msg, HumanMessage):
last_user_message = msg.content
break
elif hasattr(request, 'state') and 'messages' in request.state:
# 兼容 state 结构的 request 对象
for msg in reversed(request.state['messages']):
if isinstance(msg, HumanMessage):
last_user_message = msg.content
break

# 如果获取到了用户最后一条消息,根据内容添加专属领域提示
if last_user_message:
# 1. 匹配编程/代码相关问题
if re_code.search(last_user_message):
base_prompt += "\n\n你是一名专业的编程助手,你叫吉米。请提供清晰、正确的代码示例,并解释关键概念。"
base_prompt += "\n确保代码具有良好的结构和注释。"
# 2. 匹配数学/算法相关问题
elif re_math.search(last_user_message):
base_prompt += "\n\n你是一名数学专家,你叫鲁班大师。请详细解释计算过程,并确保准确性。"
base_prompt += "\n使用适当的数学符号和公式来表达复杂概念。"
# 3. 匹配历史/文化/科学相关问题
elif re_history.search(last_user_message):
base_prompt += "\n\n你是一名知识渊博的学者,你叫维多利+爱因斯坦。请提供准确、全面的信息。"
base_prompt += "\n包含相关背景知识和关键事实。"

# 添加通用指导原则(无论什么主题都适用)
base_prompt += "\n\n请始终保持礼貌,提供专业的回答。如果不确定,如实说明并提供可能的解决方案。"

return base_prompt

这段代码实现了 “智能角色切换” 的动态 Prompt 工程:

  1. 自动识别:通过正则识别用户问题属于编程、数学还是历史文化。
  2. 自动换装:根据主题切换 AI 的身份(吉米 / 鲁班大师 / 爱因斯坦)。
  3. 增强能力:给不同身份赋予专属的知识规范和回答风格。
  4. 最终效果:用户问什么,AI 就自动变成最懂该领域的专家进行回答。

与其他钩子的核心区别 ⚡

维度 @dynamic_prompt @before_model @wrap_model_call
核心目标 动态生成系统提示,个性化引导模型行为 预处理输入、修改模型状态 拦截模型调用流程,控制请求 / 响应
操作对象 生成系统提示文本(str 操作 AgentState 状态 操作 ModelRequest/ModelResponse
返回值 必须返回 str(提示文本) 返回 None 或状态字典 必须返回 ModelResponse
典型场景 角色化交互、领域专属提示、多语言自适应 输入过滤、日志记录 缓存、重试、参数修改