Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangGraph之中断恢复和人工审核机制

检查点

检查点(Checkpoint)

  • 作用:检查点是图执行过程中每个“超步骤”(superstep)时刻状态的快照。它使得以下功能成为可能:
    • 状态恢复
    • 重播执行
    • 容错与断点续跑
    • 人机交互与内存更新
  • 自动持久化:使用 LangGraph 时,无需手动保存状态,它会自动在后台处理所有检查点
1
2
3
4
5
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
# 对graph对象进行编译,并配置检查点
app = graph.compile(checkpointer=checkpointer)

检查点会自动在每个节点执行后保存当前状态,包括:

  • 当前节点名 : state.next
  • 当前值 : state.values
  • 当前执行上下文 : state.config

线程(Thread)

  • 每次运行图时,都必须指定一个唯一的 thread_id
  • 一个 thread 保存该次执行过程的所有检查点状态。
1
2
config = {"configurable": {"thread_id": "1"}}
graph.invoke(input_data, config)
  • 可使用 graph.get_state(config) 获取最新状态。
  • 可使用 graph.get_state_history(config) 获取全部历史状态。

Langgraph 中使用检查点(示例代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from typing import TypedDict


# 一、定义状态
class State(TypedDict):
"""
定义状态
"""
value_1: str
value_2: int
report: str
input_value: str


import time


# 二、定义工作节点(本质就是一个函数)

def step_1(state: State):
"""定义工作节点"""
print("执行step_1节点----------开始-----------")
time.sleep(2)
print("执行step_1节点----------结束-----------")
return {"value_1": 100}


def step_2(state: State):
"""定义工作节点"""
print("执行step_2节点----------开始-----------")
time.sleep(2)
print("执行step_2节点----------结束-----------")
return {"value_2": 100}


def generator_test_report(state: State):
"""定义工作节点"""
print("执行step_3节点----------开始-----------")
time.sleep(2)
print("执行step_3节点----------结束-----------")
return {"report": "这个是一个测试报告"}


# ===========================================================================
# 三、开发工作流
# 3.1 初始化工作流
graph = StateGraph(State)
# 3.2 把节点(node)添加到状态图(工作流)中
graph.add_node("步骤一", step_1)
graph.add_node("步骤二", step_2)
graph.add_node("步骤三", generator_test_report)
# 3.3 对工作节点进行编排

graph.add_edge(START, "步骤一")
graph.add_edge("步骤一", "步骤二")
graph.add_edge("步骤二", "步骤三")
graph.add_edge("步骤三", END)

# 3.4 创建一个状态检查点
print("=================添加检查点配置信息=================")
checkpointer = InMemorySaver()
# 4、对graph对象进行编译
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
app.invoke({"input_value": "你好,世界"}, config)
# 获取检查点

print("===============获取执行的历史检查点===================")
states = list(app.get_state_history(config))

agent 中使用检查点(示例代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent

checkpointer = InMemorySaver()
agent = create_react_agent(
model=self.llm,
tools=[mysql_executor],
prompt="""
你是一位资深的DBA,现在需要你根据用户的需求,编写对应的sql语句,调用数据库操作的工具,执行sql语句,并返回执行的结果,
每一步执行完都需要去分析当前的执行进度,以及规划下一步的任务执行
""",
# 配置启用检查点
checkpointer=checkpointer
)
# 运行agent时传入配置
response = agent.invoke(input={"messages": input},
config={"configurable": {"thread_id": "thread_1"}}
)

print("==================检查点====================")
# 执行完获去历史检查点状态
states = agent.get_state_history(config={"configurable": {"thread_id": "thread_1"}})
for state in states:
print(state.next) # 下一节点
print(state.config["configurable"]["checkpoint_id"])

持久化检查点

实现名称 说明
InMemorySaver 内存持久化
MongoDBSaver 使用 mongodb 持久化
RedisSaver 使用 Redis持久化

使用 redis 存储检查点

  • 安装依赖

    1
    pip install langgraph-checkpoint-redis
  • 使用案例

    1
    2
    3
    4
    from langgraph.checkpoint.redis import RedisSaver

    with RedisSaver.from_conn_string("redis://192.168.0.108:6379") as checkpointer:
    print(checkpointer)

使用 mongodb存储检查点

  • 安装依赖

    1
    pip install langgraph-checkpoint-mongodb
  • 使用案例

    1
    2
    3
    4
    from langgraph.checkpoint.mongodb import MongoDBSaver

    with MongoDBSaver.from_conn_string("localhost:27017") as checkpointer:
    print("checkpointer:", checkpointer)

重运行机制

在使用由大模型驱动的非确定性系统(例如 Agent)时,我们常常希望深入理解其决策过程。LangGraph 提供了 时间旅行功能(Time Travel) 来支持以下用途:

  • 🤔 理解推理过程:分析系统如何得出当前结果。
  • 🐞 调试错误:找出错误发生的位置和原因。
  • 🔍 探索备选路径:尝试不同的执行分支,寻找更优结果。

核心功能是:可以从某个历史检查点(checkpoint)恢复执行,你可以选择重放旧状态,或修改状态后探索新路径。每次恢复执行都会在执行历史中生成一个新的分支。

获取历史检查点

1
2
3
4
5
6
7
print("===============获取执行的历史检查点===================")
states = list(app.get_state_history(config))

for state in states:
print(state.next) # 下一节点
print(state.config["configurable"]["checkpoint_id"]) # 检查点 ID

选中某个重放的检查点进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
print("========================重放的检查点进行更新===========================")
# 获取特定的执行节点
selected_state = states[2]
# 输出节点信息
print(selected_state.next)
print(selected_state.values)

# 创建重新执行的节点数据
# 更新状态并创建新检查点(主题改为 "chickens")
new_config = app.update_state(
selected_state.config,
values={"input_value": "你好 python771"}
)
print("================新的执行配置信息===================")
print(new_config) # 打印新的 checkpoint 配置

从检查点恢复执行

注意:重放是输入的值直接传入 None 即可,然后传入更新后的检查点配置

1
2
3
print("================重新执行===================")
result = app.invoke(None, new_config)
print("重新直接的结果如下:", result)

完整示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from typing import TypedDict


# 一、定义状态
class State(TypedDict):
"""
定义状态
"""
value_1: str
value_2: int
report: str
input_value: str


# 二、定义工作节点(本质就是一个函数)

def step_1(state: State):
"""定义工作节点"""
print("执行step_1节点")
return {"value_1": 100}


def step_2(state: State):
"""定义工作节点"""
print("执行step_2节点")
return {"value_2": 100}


def generator_test_report(state: State):
"""定义工作节点"""
print("执行step_3节点")
return {"report": "这个是一个测试报告"}


# ===========================================================================
# 三、开发工作流
# 3.1 初始化工作流
graph = StateGraph(State)
# 3.2 把节点(node)添加到状态图(工作流)中
graph.add_node("生成测试用例", step_1)
graph.add_node("执行测试用例", step_2)
graph.add_node("生成测试报告", generator_test_report)
# 3.3 对工作节点进行编排

graph.add_edge(START, "生成测试用例")
graph.add_edge("生成测试用例", "执行测试用例")
graph.add_edge("执行测试用例", "生成测试报告")
graph.add_edge("生成测试报告", END)

# =============启用检查点的配置==========================
from langgraph.checkpoint.memory import InMemorySaver

checkpoint = InMemorySaver()
# 4、对graph对象进行编译
app = graph.compile(checkpointer=checkpoint)
config = {"configurable": {"thread_id": "1"}}
app.invoke(input={"input_value": "你好,pythonAI"}, config=config)

# ==================获取检查点信息=========================
result = list(app.get_state_history(config=config))[::-1]
# 使用检查点进行节点重复置
step3_checkpoint = result[3]
print("检查点的信息:", step3_checkpoint.next, "检查点id:", step3_checkpoint.config["configurable"]["checkpoint_id"])

# ===================实现节点重运行=========================
new_config = app.update_state(
step3_checkpoint.config,
values={"input_value": "你好,pythonAI"}
)
print("===============开始节点重复运行===================")
# 注意点:再进行节点重复时,input输出的值为None,第二个值为配置信息
app.invoke(None, config=new_config)

人工干预机制

LangGraph 支持强大的人工参与循环(HIL)工作流,允许在自动化过程中的任何环节进行人工干预。这在大型语言模型(LLM)驱动的应用程序中尤其有用,因为模型输出可能需要验证、更正或额外的上下文。

主要功能

  • 持久化执行状态:LangGraph 在每个步骤后都会检查图状态,允许在定义好的节点处无限期地暂停执行。这支持异步的人工审查或输入,不受时间限制。
  • 灵活的集成点:HIL 逻辑可以在工作流的任何点引入。这允许有针对性的人工参与,例如批准 API 调用、更正输出或引导对话

使用场景

  • 审查工具调用:在工具执行之前,人工可以审查、编辑或批准 LLM 请求的工具调用。
  • 验证 LLM 输出:人工可以审查、编辑或批准 LLM 生成的内容。
  • 提供上下文:使 LLM 能够明确请求人工输入以进行澄清或提供额外细节,或支持多轮对话。

核心点

  • interrupt(…) 暂停图执行,并返回需要人工处理的内容
  • Command(resume=…) 用于恢复图,并携带人工提供的输入

interrupt 的使用

1
2
3
4
5
6
7
8
9
10
11
from langgraph.types import interrupt, Command

def human_node(state):
value = interrupt({"text_to_revise": state["some_text"]})
return {"some_text": value}

result = graph.invoke({"some_text": "原始文本"}, config)
print(result["__interrupt__"]) # 图将在此中断,等待输入

# 恢复执行
graph.invoke(Command(resume="修改后的文本"), config)

注意:

  • __interrupt__ 是执行结果中保存中断上下文的关键字段。

  • 中断恢复时,会重新执行节点中从头到 interrupt() 的代码块。

Command 的使用

1
2
3
4
5
6
7
from langgraph.types import Command

Command(
goto: Optional[str] = None, # 指定跳转节点
update: Optional[dict] = None, # 更新状态值
resume: Optional[Any] = None # 恢复中断(仅适用于graph节点外部调用)
)

常见中断场景

审批 / 拒绝

1
2
3
4
5
6
def human_review(state: State):
action = interrupt("确认操作?")
if action == "确认":
return Command(goto="生成LLM", update={"messages": ["新的用户反馈"]})
else:
return Command(goto="下一个步骤")

根据人工输入,控制图表路径走向

数据修改

1
2
3
4
5
6
def human_editing(state: State):
result = interrupt({
"task": "编辑摘要",
"llm_generated_summary": state["llm_generated_summary"]
})
return {"llm_generated_summary": result}

适用人类对 LLM 输出进行修改。

信息补充

1
2
3
def request_input(state: State):
info = interrupt("请补充信息:")
return {"info": info}

实战案例

强调在AI系统中加入人工审核环节以提升系统成熟度和稳定性,特别是在对错误容忍度低的场景下,通过人工审批、状态编辑、工具调用审查等方式保障决策正确性与系统安全。

  • 审查工具调用情况(是否正确等)
  • 审查和验证LLM输出
  • 人工提供更好的上下文背景

人机交互的场景:

image-20250918090250968

基本运用:等待用户数据

等待用户输入的本质是在节点间增加人类反馈节点,定义包含input和user feedback属性的状态对象,引入interrupt组件来打断流程并等待用户反馈,通过Command组件恢复被打断的流程

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display

# 定义节点间通讯的消息类型
class State(TypedDict):
input: str
user_feedback: str

# 定义节点
def step_1(state):
print("---Step 1---")
pass

# 用户反馈节点
def human_feedback(state):
print("---human_feedback---")
feedback = interrupt("Please provide feedback:")
return {"user_feedback": feedback}


def step_3(state):
print("---Step 3---")
pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)

# 设计记忆内存
memory = MemorySaver()

# 图的编译
graph = builder.compile(checkpointer=memory)

# 查看节点与图结构
display(Image(graph.get_graph().draw_mermaid_png()))

# 调用
# Input
initial_input = {"input": "你好"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="updates"):
print(event)
print("\n")

# 添加human反馈
from langgraph.types import Command

for event in graph.stream(
# 人类反馈内容:resume="go to step 3!"
Command(resume="go to step 3!"), thread, stream_mode="updates"
):
print(event)
print("\n")

在智能体中引入人工介入环节,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import os
from langchain_core.messages import ToolMessage
from pydantic import BaseModel
from IPython.display import Image, display

from langgraph.graph import MessagesState, START, END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek

# --- 1. 设置状态 ---
# MessagesState 是一个内置的状态类型,它简单地将所有消息累加起来。
# 这对于大多数聊天机器人应用来说都非常方便。


# --- 2. 设置工具 ---

@tool
def search(query: str):
"""调用此函数来浏览网络以查找信息。"""
# 这是一个实际工具实现的占位符
print(f"---正在执行搜索: {query}---")
return f"我查询了:{query}。结果:北京天气晴朗,温度25度。"


# 我们将有一个真实工具和一个"假"工具"ask_human"
tools = [search]
tool_node = ToolNode(tools)


# 我们为"ask_human"工具定义一个Pydantic模型,以便模型知道它的签名
class AskHuman(BaseModel):
"""当你需要用户的澄清或额外信息时,调用此工具向人类提问。"""
question: str


# --- 3. 设置模型 ---

# 初始化模型
model = ChatDeepSeek(
model="deepseek-chat",
temperature=0,
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url=os.environ.get("DEEPSEEK_API_BASE"),
)

# 将所有工具(真实的和模拟的)绑定到模型上
model = model.bind_tools(tools + [AskHuman])


# --- 4. 定义图的节点和边 ---

# 定义调用模型的函数
def call_model(state: MessagesState):
print("---调用大模型---")
messages = state["messages"]
response = model.invoke(messages)
# 我们返回一个列表,因为这将被添加到现有列表中
return {"messages": [response]}


# 我们定义一个节点来处理"ask_human"工具调用
# 这个节点会暂停流程并等待用户输入
def interrupt(question: str) -> str:
"""一个简单的函数,用于在命令行中暂停并向用户提问。"""
print(f"\n[需要人类输入] 问题: {question}")
answer = input("你的回答: ")
return answer

def ask_human(state: MessagesState):
print("---等待人类输入---")
# 获取最后一条消息中的工具调用信息
last_message = state["messages"][-1]
tool_call = last_message.tool_calls[0]
tool_call_id = tool_call["id"]

# 解析工具调用的参数
ask_args = AskHuman.model_validate(tool_call["args"])

# 调用interrupt函数暂停并获取用户输入
user_response = interrupt(ask_args.question)

# 将用户的回答构造成一个ToolMessage
tool_message = ToolMessage(content=user_response, tool_call_id=tool_call_id)

return {"messages": [tool_message]}


# ★★★ 修改点 1: 修改条件函数 ★★★
# 定义决定流程走向的函数
def should_continue(state: MessagesState):
last_message = state["messages"][-1]

# 如果模型没有进行工具调用,则流程结束
if not last_message.tool_calls:
return "__end__"

# 如果模型调用的是 "AskHuman" 工具
if last_message.tool_calls[0]["name"] == "AskHuman":
return "ask_human"

# 否则,执行常规工具调用
return "action"


# --- 5. 构建图 ---

# 定义一个新图
workflow = StateGraph(MessagesState)

# 定义我们将循环的节点
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)

# 设置入口点
workflow.add_edge(START, "agent")


# ★★★ 修改点 2: 添加显式的路径映射 ★★★
# 添加条件边,这是图的核心逻辑
workflow.add_conditional_edges(
"agent",
should_continue,
{
"action": "action",
"ask_human": "ask_human",
"__end__": END,
},
)

# 添加从工具执行节点返回到agent节点的边
workflow.add_edge("action", "agent")
# 添加从人类输入节点返回到agent节点的边
workflow.add_edge("ask_human", "agent")

# 设置内存检查点
memory = MemorySaver()

# 编译图,使其成为可运行的应用
app = workflow.compile(checkpointer=memory)


# --- 6. 可视化和运行 ---
display(Image(app.get_graph().draw_mermaid_png()))

# 调用
# 创建一个线程ID,用于保持对话状态
config = {"configurable": {"thread_id": "2"}}

for event in app.stream(
{
"messages": [
(
"user",
"询问用户他们在哪里,然后查询那里的天气",
)
]
},
config,
stream_mode="values",
):
event["messages"][-1].pretty_print()
# 注意:会出现一个输入用户区域地方,然后等待输入,输入信息后才会进行接续内容,输入内容:我在北京

执行查看节点与图结构

image-20250918100254757

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
---Step 1---
{'step_1': None}


---human_feedback---
{'__interrupt__': (Interrupt(value='Please provide feedback:', id='e8b5902129c677d43844e5aff7857cc4'),)}


---human_feedback---
{'human_feedback': {'user_feedback': 'go to step 3!'}}


---Step 3---
{'step_3': None}


================================ Human Message =================================

询问用户他们在哪里,然后查询那里的天气
---调用大模型---
================================== Ai Message ==================================
Tool Calls:
AskHuman (call_00_TpzUc16Jdly70xacNN1qq9PW)
Call ID: call_00_TpzUc16Jdly70xacNN1qq9PW
Args:
question: 请问您在哪里?我想为您查询当地的天气信息。
---等待人类输入---

[需要人类输入] 问题: 请问您在哪里?我想为您查询当地的天气信息。
================================= Tool Message =================================

我在北京
---调用大模型---
================================== Ai Message ==================================
Tool Calls:
search (call_00_RlGPZ0yP4eioQByUs8rzdZe2)
Call ID: call_00_RlGPZ0yP4eioQByUs8rzdZe2
Args:
query: 北京天气
---正在执行搜索: 北京天气---
================================= Tool Message =================================
...
- 天气状况:晴朗
- 温度:25度

这是一个相当宜人的天气,适合外出活动!

基本运用:审查工具调用

通过人机协作审查智能体的工具调用,包括设置审查节点、判断人类反馈、执行工具及结果插入,并展示了在不同反馈(继续、更新、反馈)下流程的导航与处理方式。

根据不同的人类反馈(continue、update、feedback)导航至不同节点。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command, interrupt
from langchain_core.tools import tool
from langchain_core.messages import AIMessage
from IPython.display import Image, display


@tool
def weather_search(city: str):
"""搜索天气"""
print("----")
print(f"正在搜索:{city}")
print("----")
return "晴朗!"


# 设置模型
from langchain_deepseek import ChatDeepSeek
import os

deepseek = ChatDeepSeek(
model="deepseek-chat",
temperature=0,
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url=os.environ.get("DEEPSEEK_API_BASE"),
)

model = deepseek.bind_tools(
[weather_search]
)


class State(MessagesState):
"""简单状态。"""


def call_llm(state):
return {"messages": [model.invoke(state["messages"])]}


def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
last_message = state["messages"][-1]
tool_call = last_message.tool_calls[-1]

# 这是我们将通过Command(resume=<human_review>)提供的值
human_review = interrupt(
{
"question": "这是正确的吗?",
# 显示工具调用以供审核
"tool_call": tool_call,
}
)

review_action = human_review["action"]
review_data = human_review.get("data")

# 如果批准,调用工具
if review_action == "continue":
return Command(goto="run_tool")

# 更新AI消息并调用工具
elif review_action == "update":
updated_message = {
"role": "ai",
"content": last_message.content,
"tool_calls": [
{
"id": tool_call["id"],
"name": tool_call["name"],
# 这是人类提供的更新
"args": review_data,
}
],
# 这很重要 - 这需要与你替换的消息相同!
# 否则,它将显示为一个单独的消息
"id": last_message.id,
}
return Command(goto="run_tool", update={"messages": [updated_message]})

# 向LLM提供反馈
elif review_action == "feedback":
# 注意:我们将反馈消息添加为ToolMessage
# 以保持消息历史中的正确顺序
# (带有工具调用的AI消息需要后跟工具调用消息)
tool_message = {
"role": "tool",
# 这是我们的自然语言反馈
"content": review_data,
"name": tool_call["name"],
"tool_call_id": tool_call["id"],
}
return Command(goto="call_llm", update={"messages": [tool_message]})

# 得到工具的允许结果,然后插入到消息里面去
def run_tool(state):
new_messages = []
tools = {"weather_search": weather_search}
tool_calls = state["messages"][-1].tool_calls
for tool_call in tool_calls:
tool = tools[tool_call["name"]]
result = tool.invoke(tool_call["args"])
new_messages.append(
{
"role": "tool",
"name": tool_call["name"],
"content": result,
"tool_call_id": tool_call["id"],
}
)
return {"messages": new_messages}


def route_after_llm(state) -> Literal[END, "human_review_node"]:
if len(state["messages"][-1].tool_calls) == 0: # 拿到最新一条数据的tool_calls
return END
else:
return "human_review_node"


builder = StateGraph(State)
builder.add_node(call_llm)
builder.add_node(run_tool)
builder.add_node(human_review_node)
builder.add_edge(START, "call_llm")
builder.add_conditional_edges("call_llm", route_after_llm)
builder.add_edge("run_tool", "call_llm")

# 设置内存
memory = MemorySaver()

# 添加
graph = builder.compile(checkpointer=memory)

# 查看节点与图结构
display(Image(graph.get_graph().draw_mermaid_png()))

执行查看节点与图结构

image-20250918100530221

  • 当不涉及工具调用的时候,不会触发人工审核

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # Input
    initial_input = {"messages": [{"role": "user", "content": "你好!"}]}

    # Thread
    thread = {"configurable": {"thread_id": "1"}}

    for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")

    执行结果

    1
    {'call_llm': {'messages': [AIMessage(content='你好!很高兴为您服务。我可以帮您查询天气信息,如果您需要了解某个城市的天气情况,请告诉我城市名称,我会为您查询最新的天气信息。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 33, 'prompt_tokens': 144, 'total_tokens': 177, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}, 'prompt_cache_hit_tokens': 0, 'prompt_cache_miss_tokens': 144}, 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_08f168e49b_prod0820_fp8_kvcache', 'id': 'd95943e0-85bc-4aae-a037-393a500e8c7a', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--b490a400-5a86-403d-b459-3d972a6b1474-0', usage_metadata={'input_tokens': 144, 'output_tokens': 33, 'total_tokens': 177, 'input_token_details': {'cache_read': 0}, 'output_token_details': {}})]}}
  • 一旦涉及到工具调用 就会触发人工介入

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # Input,提问到天气相关的问题
    initial_input = {"messages": [{"role": "user", "content": "北京的天气如何?"}]}

    # Thread
    thread = {"configurable": {"thread_id": "2"}}

    for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")

    执行结果

    1
    2
    3
    4
    {'call_llm': {'messages': [AIMessage(content='我来帮您查询北京的天气情况。', additional_kwargs={'tool_calls': [{'id': 'call_00_nrZyXDGPbp73qAdYJ7IpMxHv', 'function': {'arguments': '{"city": "\\u5317\\u4eac"}', 'name': 'weather_search'}, 'type': 'function', 'index': 0}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 27, 'prompt_tokens': 146, 'total_tokens': 173, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 128}, 'prompt_cache_hit_tokens': 128, 'prompt_cache_miss_tokens': 18}, 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_08f168e49b_prod0820_fp8_kvcache', 'id': '03bd6516-9777-4042-b4ed-02ea6d6d004f', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--b4693d60-2881-4ab6-9e96-4670276ec95a-0', tool_calls=[{'name': 'weather_search', 'args': {'city': '北京'}, 'id': 'call_00_nrZyXDGPbp73qAdYJ7IpMxHv', 'type': 'tool_call'}], usage_metadata={'input_tokens': 146, 'output_tokens': 27, 'total_tokens': 173, 'input_token_details': {'cache_read': 128}, 'output_token_details': {}})]}}


    {'__interrupt__': (Interrupt(value={'question': '这是正确的吗?', 'tool_call': {'name': 'weather_search', 'args': {'city': '北京'}, 'id': 'call_00_nrZyXDGPbp73qAdYJ7IpMxHv', 'type': 'tool_call'}}, id='183597080f78b79c566b124a34223821'),)}

    使用Command进行人机交互

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from langgraph.types import Command

    for event in graph.stream(
    # 输入值
    Command(resume={"action": "continue"}),
    thread,
    stream_mode="updates",
    ):
    print(event)
    print("\n")

    执行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    {'human_review_node': None}


    ----
    正在搜索:北京
    ----
    {'run_tool': {'messages': [{'role': 'tool', 'name': 'weather_search', 'content': '晴朗!', 'tool_call_id': 'call_00_nrZyXDGPbp73qAdYJ7IpMxHv'}]}}


    {'call_llm': {'messages': [AIMessage(content='根据查询结果,北京目前的天气是晴朗的!天气状况很好,适合外出活动。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 19, 'prompt_tokens': 172, 'total_tokens': 191, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 128}, 'prompt_cache_hit_tokens': 128, 'prompt_cache_miss_tokens': 44}, 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_08f168e49b_prod0820_fp8_kvcache', 'id': 'cd0943c6-f6d0-4228-aa4e-2be23df3a798', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--d5f3dae7-d3f1-4ee5-98e3-98abc54e7071-0', usage_metadata={'input_tokens': 172, 'output_tokens': 19, 'total_tokens': 191, 'input_token_details': {'cache_read': 128}, 'output_token_details': {}})]}}


  • 更进一步,对智能体调用的工具进行参数编辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # Input
    initial_input = {"messages": [{"role": "user", "content": "深圳的天气如何?"}]}

    # Thread
    thread = {"configurable": {"thread_id": "3"}}

    for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")

    执行结果

    1
    2
    3
    4
    5
    6
    {'call_llm': {'messages': [AIMessage(content='我来帮您查询深圳的天气情况。', additional_kwargs={'tool_calls': [{'id': 'call_00_VFXEbMl2mMOo0WXRr3BdKjWP', 'function': {'arguments': '{"city": "\\u6df1\\u5733"}', 'name': 'weather_search'}, 'type': 'function', 'index': 0}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 28, 'prompt_tokens': 147, 'total_tokens': 175, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 128}, 'prompt_cache_hit_tokens': 128, 'prompt_cache_miss_tokens': 19}, 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_08f168e49b_prod0820_fp8_kvcache', 'id': 'b944f986-9c42-4d99-ac66-1e90ac5d8514', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--2d7d12ff-727e-4649-a735-44aac3346a5b-0', tool_calls=[{'name': 'weather_search', 'args': {'city': '深圳'}, 'id': 'call_00_VFXEbMl2mMOo0WXRr3BdKjWP', 'type': 'tool_call'}], usage_metadata={'input_tokens': 147, 'output_tokens': 28, 'total_tokens': 175, 'input_token_details': {'cache_read': 128}, 'output_token_details': {}})]}}


    {'__interrupt__': (Interrupt(value={'question': '这是正确的吗?', 'tool_call': {'name': 'weather_search', 'args': {'city': '深圳'}, 'id': 'call_00_VFXEbMl2mMOo0WXRr3BdKjWP', 'type': 'tool_call'}}, id='0f55d9ce7b22cdd30d7d25afd89223ae'),)}


    1
    2
    3
    4
    5
    6
    7
    8
    # 直接对工具的参数进行编辑
    for event in graph.stream(
    Command(resume={"action": "update", "data": {"city": "上海,中国"}}),
    thread,
    stream_mode="updates",
    ):
    print(event)
    print("\n")

    执行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    {'human_review_node': {'messages': [{'role': 'ai', 'content': '我来帮您查询深圳的天气情况。', 'tool_calls': [{'id': 'call_00_VFXEbMl2mMOo0WXRr3BdKjWP', 'name': 'weather_search', 'args': {'city': '上海,中国'}}], 'id': 'run--2d7d12ff-727e-4649-a735-44aac3346a5b-0'}]}}


    ----
    正在搜索:上海,中国
    ----
    {'run_tool': {'messages': [{'role': 'tool', 'name': 'weather_search', 'content': '晴朗!', 'tool_call_id': 'call_00_VFXEbMl2mMOo0WXRr3BdKjWP'}]}}


    {'call_llm': {'messages': [AIMessage(content='让我重新查询深圳的天气:', additional_kwargs={'tool_calls': [{'id': 'call_00_eY6qquS2SjREeAhv2oD2Q9hL', 'function': {'arguments': '{"city": "深圳"}', 'name': 'weather_search'}, 'type': 'function', 'index': 0}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 176, 'total_tokens': 196, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 128}, 'prompt_cache_hit_tokens': 128, 'prompt_cache_miss_tokens': 48}, 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_08f168e49b_prod0820_fp8_kvcache', 'id': '73984e27-0ca3-421b-9d35-348907eabdf3', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--ae52f621-4c85-4e13-95a4-673939846f23-0', tool_calls=[{'name': 'weather_search', 'args': {'city': '深圳'}, 'id': 'call_00_eY6qquS2SjREeAhv2oD2Q9hL', 'type': 'tool_call'}], usage_metadata={'input_tokens': 176, 'output_tokens': 20, 'total_tokens': 196, 'input_token_details': {'cache_read': 128}, 'output_token_details': {}})]}}


    {'__interrupt__': (Interrupt(value={'question': '这是正确的吗?', 'tool_call': {'name': 'weather_search', 'args': {'city': '深圳'}, 'id': 'call_00_eY6qquS2SjREeAhv2oD2Q9hL', 'type': 'tool_call'}}, id='5d83228c73ed5e60fe886c76117d2402'),)}


基本使用:编辑图的状态

通过设置中断点和人工干预,可以在流程执行中修改状态值,实现对智能体工具选择或动作的审核编辑。

image-20250918101102384

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
input: str


def step_1(state):
print("---Step 1---")
pass


def step_2(state):
print("---Step 2---")
pass


def step_3(state):
print("---Step 3---")
pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add 注意interrupt_before
graph = builder.compile(checkpointer=memory, interrupt_before=["step_2"])

# View
display(Image(graph.get_graph().draw_mermaid_png()))

# 调用
# Input
initial_input = {"input": "你好"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
print(event)

执行查看节点与图结构

image-20250918101243460

执行结果

1
2
{'input': '你好'}
---Step 1---

执行到节点Step 1,就中断了,在代码中使用interrupt_before对Step 2设置了断点,此时我们可以进行人工干预,更新流状态并传入新值

1
2
3
graph.update_state(thread, {"input": "你好 everybody!"})
print("---\n---\nUpdated state!")
print(graph.get_state(thread).values)

执行结果

1
2
3
4
---
---
Updated state!
{'input': '你好 everybody!'}
1
2
3
# 继续执行
for event in graph.stream(None, thread, stream_mode="values"):
print(event)

执行结果

1
2
3
{'input': '你好 everybody!'}
---Step 2---
---Step 3---