检查点 检查点(Checkpoint)
作用:检查点是图执行过程中每个“超步骤”(superstep)时刻状态的快照。它使得以下功能成为可能:
状态恢复
重播执行
容错与断点续跑
人机交互与内存更新
自动持久化:使用 LangGraph 时,无需手动保存状态 ,它会自动在后台处理所有检查点
1 2 3 4 5 from langgraph.checkpoint.memory import InMemorySavercheckpointer = InMemorySaver() app = graph.compile (checkpointer=checkpointer)
检查点会自动在每个节点执行后保存当前状态,包括:
当前节点名 : state.next
当前值 : state.values
当前执行上下文 : state.config
线程(Thread)
每次运行图时,都必须指定一个唯一的 thread_id。
一个 thread 保存该次执行过程的所有检查点状态。
1 2 config = {"configurable" : {"thread_id" : "1" }} graph.invoke(input_data, config)
可使用 graph.get_state(config) 获取最新状态。
可使用 graph.get_state_history(config) 获取全部历史状态。
Langgraph 中使用检查点(示例代码) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.constants import START, ENDfrom langgraph.graph import StateGraphfrom typing import TypedDictclass State (TypedDict ): """ 定义状态 """ value_1: str value_2: int report: str input_value: str import timedef step_1 (state: State ): """定义工作节点""" print ("执行step_1节点----------开始-----------" ) time.sleep(2 ) print ("执行step_1节点----------结束-----------" ) return {"value_1" : 100 } def step_2 (state: State ): """定义工作节点""" print ("执行step_2节点----------开始-----------" ) time.sleep(2 ) print ("执行step_2节点----------结束-----------" ) return {"value_2" : 100 } def generator_test_report (state: State ): """定义工作节点""" print ("执行step_3节点----------开始-----------" ) time.sleep(2 ) print ("执行step_3节点----------结束-----------" ) return {"report" : "这个是一个测试报告" } graph = StateGraph(State) graph.add_node("步骤一" , step_1) graph.add_node("步骤二" , step_2) graph.add_node("步骤三" , generator_test_report) graph.add_edge(START, "步骤一" ) graph.add_edge("步骤一" , "步骤二" ) graph.add_edge("步骤二" , "步骤三" ) graph.add_edge("步骤三" , END) print ("=================添加检查点配置信息=================" )checkpointer = InMemorySaver() app = graph.compile (checkpointer=checkpointer) config = {"configurable" : {"thread_id" : "1" }} app.invoke({"input_value" : "你好,世界" }, config) print ("===============获取执行的历史检查点===================" )states = list (app.get_state_history(config))
agent 中使用检查点(示例代码) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.prebuilt import create_react_agentcheckpointer = InMemorySaver() agent = create_react_agent( model=self .llm, tools=[mysql_executor], prompt=""" 你是一位资深的DBA,现在需要你根据用户的需求,编写对应的sql语句,调用数据库操作的工具,执行sql语句,并返回执行的结果, 每一步执行完都需要去分析当前的执行进度,以及规划下一步的任务执行 """ , checkpointer=checkpointer ) response = agent.invoke(input ={"messages" : input }, config={"configurable" : {"thread_id" : "thread_1" }} ) print ("==================检查点====================" )states = agent.get_state_history(config={"configurable" : {"thread_id" : "thread_1" }}) for state in states: print (state.next ) print (state.config["configurable" ]["checkpoint_id" ])
持久化检查点
实现名称
说明
InMemorySaver
内存持久化
MongoDBSaver
使用 mongodb 持久化
RedisSaver
使用 Redis持久化
使用 redis 存储检查点
安装依赖
1 pip install langgraph-checkpoint-redis
使用案例
1 2 3 4 from langgraph.checkpoint.redis import RedisSaverwith RedisSaver.from_conn_string("redis://192.168.0.108:6379" ) as checkpointer: print (checkpointer)
使用 mongodb存储检查点
安装依赖
1 pip install langgraph-checkpoint-mongodb
使用案例
1 2 3 4 from langgraph.checkpoint.mongodb import MongoDBSaverwith MongoDBSaver.from_conn_string("localhost:27017" ) as checkpointer: print ("checkpointer:" , checkpointer)
重运行机制 在使用由大模型驱动的非确定性系统(例如 Agent)时,我们常常希望深入理解其决策过程。LangGraph 提供了 时间旅行功能(Time Travel) 来支持以下用途:
🤔 理解推理过程:分析系统如何得出当前结果。
🐞 调试错误:找出错误发生的位置和原因。
🔍 探索备选路径:尝试不同的执行分支,寻找更优结果。
核心功能是:可以从某个历史检查点(checkpoint)恢复执行,你可以选择重放旧状态,或修改状态后探索新路径。每次恢复执行都会在执行历史中生成一个新的分支。
获取历史检查点 1 2 3 4 5 6 7 print ("===============获取执行的历史检查点===================" )states = list (app.get_state_history(config)) for state in states: print (state.next ) print (state.config["configurable" ]["checkpoint_id" ])
选中某个重放的检查点进行更新 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 print ("========================重放的检查点进行更新===========================" )selected_state = states[2 ] print (selected_state.next )print (selected_state.values)new_config = app.update_state( selected_state.config, values={"input_value" : "你好 python771" } ) print ("================新的执行配置信息===================" )print (new_config)
从检查点恢复执行 注意:重放是输入的值直接传入 None 即可,然后传入更新后的检查点配置
1 2 3 print ("================重新执行===================" )result = app.invoke(None , new_config) print ("重新直接的结果如下:" , result)
完整示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 from langgraph.constants import START, ENDfrom langgraph.graph import StateGraphfrom typing import TypedDictclass State (TypedDict ): """ 定义状态 """ value_1: str value_2: int report: str input_value: str def step_1 (state: State ): """定义工作节点""" print ("执行step_1节点" ) return {"value_1" : 100 } def step_2 (state: State ): """定义工作节点""" print ("执行step_2节点" ) return {"value_2" : 100 } def generator_test_report (state: State ): """定义工作节点""" print ("执行step_3节点" ) return {"report" : "这个是一个测试报告" } graph = StateGraph(State) graph.add_node("生成测试用例" , step_1) graph.add_node("执行测试用例" , step_2) graph.add_node("生成测试报告" , generator_test_report) graph.add_edge(START, "生成测试用例" ) graph.add_edge("生成测试用例" , "执行测试用例" ) graph.add_edge("执行测试用例" , "生成测试报告" ) graph.add_edge("生成测试报告" , END) from langgraph.checkpoint.memory import InMemorySavercheckpoint = InMemorySaver() app = graph.compile (checkpointer=checkpoint) config = {"configurable" : {"thread_id" : "1" }} app.invoke(input ={"input_value" : "你好,pythonAI" }, config=config) result = list (app.get_state_history(config=config))[::-1 ] step3_checkpoint = result[3 ] print ("检查点的信息:" , step3_checkpoint.next , "检查点id:" , step3_checkpoint.config["configurable" ]["checkpoint_id" ])new_config = app.update_state( step3_checkpoint.config, values={"input_value" : "你好,pythonAI" } ) print ("===============开始节点重复运行===================" )app.invoke(None , config=new_config)
人工审核 LangGraph 支持强大的人工参与循环(HIL) 工作流,允许在自动化过程中的任何环节进行人工干预。这在大型语言模型(LLM)驱动的应用程序中尤其有用,因为模型输出可能需要验证、更正或额外的上下文。
主要功能
持久化执行状态:LangGraph 在每个步骤后都会检查图状态,允许在定义好的节点处无限期地暂停执行。这支持异步的人工审查或输入,不受时间限制。
灵活的集成点:HIL 逻辑可以在工作流的任何点引入。这允许有针对性的人工参与,例如批准 API 调用、更正输出或引导对话
使用场景 :
审查工具调用:在工具执行之前,人工可以审查、编辑或批准 LLM 请求的工具调用。
验证 LLM 输出:人工可以审查、编辑或批准 LLM 生成的内容。
提供上下文:使 LLM 能够明确请求人工输入以进行澄清或提供额外细节,或支持多轮对话。
核心点 :
interrupt(…) 暂停图执行,并返回需要人工处理的内容
Command(resume=…) 用于恢复图,并携带人工提供的输入
interrupt 的使用 1 2 3 4 5 6 7 8 9 10 11 from langgraph.types import interrupt, Commanddef human_node (state ): value = interrupt({"text_to_revise" : state["some_text" ]}) return {"some_text" : value} result = graph.invoke({"some_text" : "原始文本" }, config) print (result["__interrupt__" ]) graph.invoke(Command(resume="修改后的文本" ), config)
注意:
Command 的使用 1 2 3 4 5 6 7 from langgraph.types import CommandCommand( goto: Optional [str ] = None , update: Optional [dict ] = None , resume: Optional [Any ] = None )
常见中断场景 审批 / 拒绝 1 2 3 4 5 6 def human_review (state: State ): action = interrupt("确认操作?" ) if action == "确认" : return Command(goto="生成LLM" , update={"messages" : ["新的用户反馈" ]}) else : return Command(goto="下一个步骤" )
根据人工输入,控制图表路径走向
数据修改 1 2 3 4 5 6 def human_editing (state: State ): result = interrupt({ "task" : "编辑摘要" , "llm_generated_summary" : state["llm_generated_summary" ] }) return {"llm_generated_summary" : result}
适用人类对 LLM 输出进行修改。
信息补充 1 2 3 def request_input (state: State ): info = interrupt("请补充信息:" ) return {"info" : info}