Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangGraph之中断恢复和人工审核机制

检查点

检查点(Checkpoint)

  • 作用:检查点是图执行过程中每个“超步骤”(superstep)时刻状态的快照。它使得以下功能成为可能:
    • 状态恢复
    • 重播执行
    • 容错与断点续跑
    • 人机交互与内存更新
  • 自动持久化:使用 LangGraph 时,无需手动保存状态,它会自动在后台处理所有检查点
1
2
3
4
5
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
# 对graph对象进行编译,并配置检查点
app = graph.compile(checkpointer=checkpointer)

检查点会自动在每个节点执行后保存当前状态,包括:

  • 当前节点名 : state.next
  • 当前值 : state.values
  • 当前执行上下文 : state.config

线程(Thread)

  • 每次运行图时,都必须指定一个唯一的 thread_id
  • 一个 thread 保存该次执行过程的所有检查点状态。
1
2
config = {"configurable": {"thread_id": "1"}}
graph.invoke(input_data, config)
  • 可使用 graph.get_state(config) 获取最新状态。
  • 可使用 graph.get_state_history(config) 获取全部历史状态。

Langgraph 中使用检查点(示例代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from typing import TypedDict


# 一、定义状态
class State(TypedDict):
"""
定义状态
"""
value_1: str
value_2: int
report: str
input_value: str


import time


# 二、定义工作节点(本质就是一个函数)

def step_1(state: State):
"""定义工作节点"""
print("执行step_1节点----------开始-----------")
time.sleep(2)
print("执行step_1节点----------结束-----------")
return {"value_1": 100}


def step_2(state: State):
"""定义工作节点"""
print("执行step_2节点----------开始-----------")
time.sleep(2)
print("执行step_2节点----------结束-----------")
return {"value_2": 100}


def generator_test_report(state: State):
"""定义工作节点"""
print("执行step_3节点----------开始-----------")
time.sleep(2)
print("执行step_3节点----------结束-----------")
return {"report": "这个是一个测试报告"}


# ===========================================================================
# 三、开发工作流
# 3.1 初始化工作流
graph = StateGraph(State)
# 3.2 把节点(node)添加到状态图(工作流)中
graph.add_node("步骤一", step_1)
graph.add_node("步骤二", step_2)
graph.add_node("步骤三", generator_test_report)
# 3.3 对工作节点进行编排

graph.add_edge(START, "步骤一")
graph.add_edge("步骤一", "步骤二")
graph.add_edge("步骤二", "步骤三")
graph.add_edge("步骤三", END)

# 3.4 创建一个状态检查点
print("=================添加检查点配置信息=================")
checkpointer = InMemorySaver()
# 4、对graph对象进行编译
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
app.invoke({"input_value": "你好,世界"}, config)
# 获取检查点

print("===============获取执行的历史检查点===================")
states = list(app.get_state_history(config))

agent 中使用检查点(示例代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent

checkpointer = InMemorySaver()
agent = create_react_agent(
model=self.llm,
tools=[mysql_executor],
prompt="""
你是一位资深的DBA,现在需要你根据用户的需求,编写对应的sql语句,调用数据库操作的工具,执行sql语句,并返回执行的结果,
每一步执行完都需要去分析当前的执行进度,以及规划下一步的任务执行
""",
# 配置启用检查点
checkpointer=checkpointer
)
# 运行agent时传入配置
response = agent.invoke(input={"messages": input},
config={"configurable": {"thread_id": "thread_1"}}
)

print("==================检查点====================")
# 执行完获去历史检查点状态
states = agent.get_state_history(config={"configurable": {"thread_id": "thread_1"}})
for state in states:
print(state.next) # 下一节点
print(state.config["configurable"]["checkpoint_id"])

持久化检查点

实现名称 说明
InMemorySaver 内存持久化
MongoDBSaver 使用 mongodb 持久化
RedisSaver 使用 Redis持久化

使用 redis 存储检查点

  • 安装依赖

    1
    pip install langgraph-checkpoint-redis
  • 使用案例

    1
    2
    3
    4
    from langgraph.checkpoint.redis import RedisSaver

    with RedisSaver.from_conn_string("redis://192.168.0.108:6379") as checkpointer:
    print(checkpointer)

使用 mongodb存储检查点

  • 安装依赖

    1
    pip install langgraph-checkpoint-mongodb
  • 使用案例

    1
    2
    3
    4
    from langgraph.checkpoint.mongodb import MongoDBSaver

    with MongoDBSaver.from_conn_string("localhost:27017") as checkpointer:
    print("checkpointer:", checkpointer)

重运行机制

在使用由大模型驱动的非确定性系统(例如 Agent)时,我们常常希望深入理解其决策过程。LangGraph 提供了 时间旅行功能(Time Travel) 来支持以下用途:

  • 🤔 理解推理过程:分析系统如何得出当前结果。
  • 🐞 调试错误:找出错误发生的位置和原因。
  • 🔍 探索备选路径:尝试不同的执行分支,寻找更优结果。

核心功能是:可以从某个历史检查点(checkpoint)恢复执行,你可以选择重放旧状态,或修改状态后探索新路径。每次恢复执行都会在执行历史中生成一个新的分支。

获取历史检查点

1
2
3
4
5
6
7
print("===============获取执行的历史检查点===================")
states = list(app.get_state_history(config))

for state in states:
print(state.next) # 下一节点
print(state.config["configurable"]["checkpoint_id"]) # 检查点 ID

选中某个重放的检查点进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
print("========================重放的检查点进行更新===========================")
# 获取特定的执行节点
selected_state = states[2]
# 输出节点信息
print(selected_state.next)
print(selected_state.values)

# 创建重新执行的节点数据
# 更新状态并创建新检查点(主题改为 "chickens")
new_config = app.update_state(
selected_state.config,
values={"input_value": "你好 python771"}
)
print("================新的执行配置信息===================")
print(new_config) # 打印新的 checkpoint 配置

从检查点恢复执行

注意:重放是输入的值直接传入 None 即可,然后传入更新后的检查点配置

1
2
3
print("================重新执行===================")
result = app.invoke(None, new_config)
print("重新直接的结果如下:", result)

完整示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from typing import TypedDict


# 一、定义状态
class State(TypedDict):
"""
定义状态
"""
value_1: str
value_2: int
report: str
input_value: str


# 二、定义工作节点(本质就是一个函数)

def step_1(state: State):
"""定义工作节点"""
print("执行step_1节点")
return {"value_1": 100}


def step_2(state: State):
"""定义工作节点"""
print("执行step_2节点")
return {"value_2": 100}


def generator_test_report(state: State):
"""定义工作节点"""
print("执行step_3节点")
return {"report": "这个是一个测试报告"}


# ===========================================================================
# 三、开发工作流
# 3.1 初始化工作流
graph = StateGraph(State)
# 3.2 把节点(node)添加到状态图(工作流)中
graph.add_node("生成测试用例", step_1)
graph.add_node("执行测试用例", step_2)
graph.add_node("生成测试报告", generator_test_report)
# 3.3 对工作节点进行编排

graph.add_edge(START, "生成测试用例")
graph.add_edge("生成测试用例", "执行测试用例")
graph.add_edge("执行测试用例", "生成测试报告")
graph.add_edge("生成测试报告", END)

# =============启用检查点的配置==========================
from langgraph.checkpoint.memory import InMemorySaver

checkpoint = InMemorySaver()
# 4、对graph对象进行编译
app = graph.compile(checkpointer=checkpoint)
config = {"configurable": {"thread_id": "1"}}
app.invoke(input={"input_value": "你好,pythonAI"}, config=config)

# ==================获取检查点信息=========================
result = list(app.get_state_history(config=config))[::-1]
# 使用检查点进行节点重复置
step3_checkpoint = result[3]
print("检查点的信息:", step3_checkpoint.next, "检查点id:", step3_checkpoint.config["configurable"]["checkpoint_id"])

# ===================实现节点重运行=========================
new_config = app.update_state(
step3_checkpoint.config,
values={"input_value": "你好,pythonAI"}
)
print("===============开始节点重复运行===================")
# 注意点:再进行节点重复时,input输出的值为None,第二个值为配置信息
app.invoke(None, config=new_config)

人工审核

LangGraph 支持强大的人工参与循环(HIL)工作流,允许在自动化过程中的任何环节进行人工干预。这在大型语言模型(LLM)驱动的应用程序中尤其有用,因为模型输出可能需要验证、更正或额外的上下文。

主要功能

  • 持久化执行状态:LangGraph 在每个步骤后都会检查图状态,允许在定义好的节点处无限期地暂停执行。这支持异步的人工审查或输入,不受时间限制。
  • 灵活的集成点:HIL 逻辑可以在工作流的任何点引入。这允许有针对性的人工参与,例如批准 API 调用、更正输出或引导对话

使用场景

  • 审查工具调用:在工具执行之前,人工可以审查、编辑或批准 LLM 请求的工具调用。
  • 验证 LLM 输出:人工可以审查、编辑或批准 LLM 生成的内容。
  • 提供上下文:使 LLM 能够明确请求人工输入以进行澄清或提供额外细节,或支持多轮对话。

核心点

  • interrupt(…) 暂停图执行,并返回需要人工处理的内容
  • Command(resume=…) 用于恢复图,并携带人工提供的输入

interrupt 的使用

1
2
3
4
5
6
7
8
9
10
11
from langgraph.types import interrupt, Command

def human_node(state):
value = interrupt({"text_to_revise": state["some_text"]})
return {"some_text": value}

result = graph.invoke({"some_text": "原始文本"}, config)
print(result["__interrupt__"]) # 图将在此中断,等待输入

# 恢复执行
graph.invoke(Command(resume="修改后的文本"), config)

注意:

  • __interrupt__ 是执行结果中保存中断上下文的关键字段。

  • 中断恢复时,会重新执行节点中从头到 interrupt() 的代码块。

Command 的使用

1
2
3
4
5
6
7
from langgraph.types import Command

Command(
goto: Optional[str] = None, # 指定跳转节点
update: Optional[dict] = None, # 更新状态值
resume: Optional[Any] = None # 恢复中断(仅适用于graph节点外部调用)
)

常见中断场景

审批 / 拒绝

1
2
3
4
5
6
def human_review(state: State):
action = interrupt("确认操作?")
if action == "确认":
return Command(goto="生成LLM", update={"messages": ["新的用户反馈"]})
else:
return Command(goto="下一个步骤")

根据人工输入,控制图表路径走向

数据修改

1
2
3
4
5
6
def human_editing(state: State):
result = interrupt({
"task": "编辑摘要",
"llm_generated_summary": state["llm_generated_summary"]
})
return {"llm_generated_summary": result}

适用人类对 LLM 输出进行修改。

信息补充

1
2
3
def request_input(state: State):
info = interrupt("请补充信息:")
return {"info": info}