标准状态图构建 LangGraph 使用 StateGraph 来构建整个流程的执行图。下面是常用方法的分类和说明:
方法分类
方法名
功能描述
代码示例
节点管理
add_node
添加单个节点
builder.add_node("node_name", node_func)
边(流程)管理
add_edge
添加固定流程连接
builder.add_edge("node_a", "node_b")
add_conditional_edges
添加条件分支
builder.add_conditional_edges("start", router_func)
入口和出口
set_entry_point
设置流程起点
builder.set_entry_point("first_node")
set_finish_point
设置流程终点
builder.set_finish_point("last_node")
编译执行
compile
将构建图编译执行体
graph = builder.compile()
可视化图
get_graph
获取图对象绘制流程图
graph.get_graph().draw_mermaid_png()
LangGraph 流程构建标准步骤 状态定义 定义三种种方式:
TypedDict:属于 Python 标准库 typing 模块的一部分,仅提供静态类型检查,运行时不执行验证, 最轻量,适合大多数场景,首选常用 。
Pydantic:第三方库,需要单独安装,提供运行时数据验证和序列化功能,如果你需要数据校验 (例如确保某个字段必须是正整数、或者符合某种格式)。
Dataclasses:Python 原生,如果你希望给状态设置默认值 (TypedDict 本身不支持默认值),现在可以使用 Python 的 dataclass。
官方示例
1 2 3 4 5 6 7 8 from langchain.messages import AnyMessagefrom typing_extensions import TypedDict, Annotatedimport operatorclass MessagesState (TypedDict ): messages: Annotated[list [AnyMessage], operator.add] llm_calls: int
定义模型节点 定义模型节点,这是在构建整个流程或系统架构中的一个关键步骤。
“模型节点”是系统中的一个组成部分,它承担着重要的功能。“用于调用LLM(大型语言模型)”说明这个模型节点的主要作用之一是去触发、调用大型语言模型,让其参与到整个流程中来。例如,在一些智能问答系统或者自动化任务处理系统中,需要借助LLM强大的语言理解和生成能力来处理各种输入信息。
官方示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from langchain.messages import SystemMessagedef llm_call (state: dict ): """LLM decides whether to call a tool or not""" return { "messages" : [ model_with_tools.invoke( [ SystemMessage( content="You are a helpful assistant tasked with performing arithmetic on a set of inputs." ) ] + state["messages" ] ) ], "llm_calls" : state.get('llm_calls' , 0 ) + 1 }
定义工具节点
工具节点的作用 :工具节点的主要功能是调用之前定义好的工具(如multiply、add、divide等)并返回结果。这些工具是用于执行特定任务的函数,例如进行数学运算。
调用工具 :在工具节点中,会检查传入状态(state)中最后一条消息的tool_calls属性。如果存在工具调用请求,就会根据请求调用相应的工具。
返回结果 :工具节点会收集所有工具调用的结果,并将这些结果以ToolMessage的形式返回。这些结果会被添加到状态的messages列表中,供后续节点使用。
官方示例
1 2 3 4 5 6 7 8 9 10 11 12 from langchain.messages import ToolMessagedef tool_node (state: dict ): """Performs the tool call""" result = [] for tool_call in state["messages" ][-1 ].tool_calls: tool = tools_by_name[tool_call["name" ]] observation = tool.invoke(tool_call["args" ]) result.append(ToolMessage(content=observation, tool_call_id=tool_call["id" ])) return {"messages" : result}
定义函数节点 每一个节点就是一个处理函数,它接受一个State作为输入,并返回一个更新后的部分State(字典) 。
示例代码
1 2 3 4 5 6 7 8 9 10 11 def step_1 (state: State ) -> dict : """初始化 value_1 字段""" return {"value_1" : "a" } def step_2 (state: State ) -> dict : """增量更新 value_1 字段""" return {"value_1" : state["value_1" ] + " b" } def step_3 (state: State ) -> dict : """初始化 value_2 字段""" return {"value_2" : 10 }
定义条件边 定义了一个逻辑,用于决定是否继续循环或停止,具体取决于 LLM 是否进行了工具调用。
官方示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from typing import Literal from langgraph.graph import StateGraph, START, ENDdef should_continue (state: MessagesState ) -> Literal ["tool_node" , END]: """Decide if we should continue the loop or stop based upon whether the LLM made a tool call""" messages = state["messages" ] last_message = messages[-1 ] if last_message.tool_calls: return "tool_node" return END
构建智能体 StateGraph类提供了一个框架,用于定义和管理代理的各个节点(nodes)和它们之间的连接(edges)。在这个例子中,代理的构建过程包括以下几个步骤:
Add nodes :添加节点到代理中。节点可以是执行特定功能的函数,比如llm_call节点用于调用语言模型(LLM),tool_node节点用于调用工具(如加法、乘法等)。
Add edges to connect nodes :添加边来连接这些节点,定义节点之间的执行顺序。例如,从起点(START)到llm_call节点,以及从llm_call节点根据条件到tool_node节点或结束(END)。
构建完成后,使用compile方法来编译代理。编译过程是将定义好的节点和边组合起来,形成一个可以运行的代理。编译后的代理可以根据输入的消息(如用户的问题)来执行一系列的操作,包括调用语言模型和工具,最终生成输出结果。
官方示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 agent_builder = StateGraph(MessagesState) agent_builder.add_node("llm_call" , llm_call) agent_builder.add_node("tool_node" , tool_node) agent_builder.add_edge(START, "llm_call" ) agent_builder.add_conditional_edges( "llm_call" , should_continue, ["tool_node" , END] ) agent_builder.add_edge("tool_node" , "llm_call" ) agent = agent_builder.compile () from IPython.display import Image, displaydisplay(Image(agent.get_graph(xray=True ).draw_mermaid_png())) from langchain.messages import HumanMessagemessages = [HumanMessage(content="Add 3 and 4." )] messages = agent.invoke({"messages" : messages}) for m in messages["messages" ]: m.pretty_print()
查看节点与图结构(内置的方法) Mermaid 是一种基于文本的图表和可视化工具,它允许用户通过简单的文本语法来创建复杂的图表和流程图。它特别适合开发者、文档编写者和技术人员在文档、代码库或网页中嵌入可视化内容。
示例代码
1 2 3 from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
执行结果展示
调用 示例代码
1 2 3 4 from langchain_core.messages import HumanMessageresult = graph.invoke({"messages" : [HumanMessage("你好啊,我是花花!" )]}) print (result)
执行结果
1 2 3 {'messages' : [HumanMessage(content='你好啊,我是花花!' , additional_kwargs={}, response_metadata={}), AIMessage(content='你好!我是一个节点' , additional_kwargs={}, response_metadata={})], 'extra_field' : 10}
使用 pretty_print 来格式化显示 输出结果更清晰
示例代码
1 2 3 4 5 from langchain_core.messages import HumanMessageresult = graph.invoke({"messages" : [HumanMessage("你好啊,我是花花!" )]}) for message in result["messages" ]: message.pretty_print()
执行结果
1 2 3 4 5 6 ================================ Human Message ================================= 你好啊,我是花花! ================================== Ai Message ================================== 你好!我是一个节点
Node(节点) 节点参数 节点是图形的核心组成部分,可以使用 add_node 方法将这些节点添加到图形中
在 LangGraph 中,节点 可以 接受以下参数的 Python 函数(同步或异步):
state:图形的状态
config:包含配置信息(如)和跟踪信息(如RunnableConfig``thread_id``tags
runtime:包含运行时上下文和其他信息(如 Runtime``store``stream_writer
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 """ 节点函数可以接受以下三种类型的参数 1. state:图形的状态 继承typeDict的类 2. config:包含配置信息(如)和跟踪信息(如RunnableConfigthread_idtags 3. runtime:包含运行时上下文和其他信息(如 Runtimestorestream_writer """ from typing import TypedDictfrom langgraph.graph import StateGraphfrom langgraph.constants import START, ENDfrom dataclasses import dataclassfrom langchain_core.runnables import RunnableConfigfrom langgraph.runtime import Runtimeclass State (TypedDict ): """状态""" user_input: str test_cases: list @dataclass class RuntimeContext : """运行时上下文参数""" test_env: str tester_name: str def generator_test_case (state: State ): """生成测试用例""" print (f"用户输入的需求是:{state.get('user_input' )} ,开始进行测试用例生成" ) print ("测试用例已经生成" , ) return {"test_cases" : ["测试用例1" , "测试用例2" ]} def run_test_cases (state: State, runtime: Runtime[RuntimeContext] ): """执行测试用例""" print ("正在执行测试用例:" , state['test_cases' ]) print ("当前执行的测试环境" , runtime.context.get("test_env" )) print ("执行人" , runtime.context.get('tester_name' )) return {"report" : "这个是一个测试报告" } def generator_test_report (state: State, config: RunnableConfig ): """生成测试报告""" print ("执行generator_test_report节点" ) print ("配置信息:" , config) return {"report" : "这个是一个测试报告" } graph = StateGraph(State, context_schema=RuntimeContext) graph.add_node("生成测试用例" , generator_test_case) graph.add_node("执行测试用例" , run_test_cases) graph.add_node("生成测试报告" , generator_test_report) graph.add_edge(START, "生成测试用例" ) graph.add_edge("生成测试用例" , "执行测试用例" ) graph.add_edge("执行测试用例" , "生成测试报告" ) graph.add_edge("生成测试报告" , END) app = graph.compile () res = app.invoke({"user_input" : "测试项目A" }, config={"recursion_limit" : 5 }, context={"test_env" : "测试环境A" , "tester_name" : "张三" } ) print (res)
特殊节点 START节点节点是一个特殊节点,表示将用户输入发送到图表的节点。引用此节点的主要目的是确定应该首先调用哪些节点。
1 2 3 from langgraph.graph import STARTgraph.add_edge(START, "node_a" )
END节点节点是一个表示终端节点的特殊节点。当您想要指示哪些边在完成后没有作时,将引用此节点。
1 2 3 from langgraph.graph import ENDgraph.add_edge("node_a" , END)
Edges(边) Edges (边) 定义逻辑的路由方式以及图形决定停止的方式。这是代理工作方式以及不同节点之间如何通信的重要组成部分。边缘有几种关键类型:
法线边:直接从一个节点转到下一个节点。
条件边:调用函数来确定下一步要转到哪个节点。
入口点:当用户输入到达时首先调用哪个节点。
条件入口点:调用一个函数来确定在用户输入到达时首先调用哪个节点。
一个节点可以有多个传出边。如果一个节点有多个传出边,则所有这些 目标节点将作为下一个超级步骤的一部分并行执行。
切入点 入口点是图形启动时运行的第一个节点。您可以使用从虚拟 START 节点到第一个要执行的节点的 add_edge 方法来指定进入图形的位置。
1 2 3 from langgraph.graph import STARTgraph.add_edge(START, "node_a" )
法线边(Normal Edges) 如果想从节点A转到节点B,可以直接使用add_edge 方法。
1 graph.add_edge("node_a" , "node_b" )
条件边 如果要选择性地 路由到 1 个或多个边(或选择性地终止),可以使用 add_conditional_edges 方法。此方法接受节点的名称和在执行该节点后调用的“路由函数”:
1 graph.add_conditional_edges("node_a" , routing_function)
条件入口点 条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点中的add_conditional_edges 来完成此作
1 graph.add_conditional_edges(START, routing_function, {True : "node_b" , False : "node_c" })
Send(节点并发执行) 核心作用 :在运行时动态生成多条边,适用于处理未知数量的并行任务(如批量生成测试用例)。测试场景 :一个测试生成节点产生多个测试用例,需要分发到不同的测试执行节点。
向哪个节点发送数据 (目标节点名称)
发送什么数据 (携带的状态内容)
基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from langgraph.types import Senddef run_tasks (state ): return [ Send("task" , {"id" : 1 }), Send("task" , {"id" : 2 }), Send("task" , {"id" : 3 }) ] graph.add_conditional_edges( "start" , run_tasks, ["task" ] )
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 from typing import TypedDictfrom langgraph.graph import StateGraphfrom langgraph.constants import START, ENDfrom langgraph.types import Send""" # 并发执行特定节点: 使用Send去实现: 输入一个接口文档: 生成测试点 :10个测试点 10个测试点--> 10条可以执行的用例 # 将测试点--->转换为可执行用例的节点 (调用用大模型去实现) """ from typing import Annotatedimport operatorclass State (TypedDict ): """状态""" user_input: str test_cases: list runnable_api_case: Annotated[list [dict ], operator.add] case : str def generator_test_case (state: State ): """生成测试用例""" print ("开始执行生成用例的节点,用户输入的需求是:" , state.get("user_input" )) import random if random.randint(0 , 5 ) > 2 : return {"test_cases" : ["测试用例1" , "测试用例2" ]} else : return {"test_cases" : ["测试用例1" , "测试用例2" , "测试用例3" ]} def run_test_cases (state: State ): """执行测试用例""" print ("开始执行测试用例:" , state.get("test_cases" )) return {"report" : "这个是一个测试报告" } def generator_test_report (state: State, ): """生成测试报告""" print ("执行generator_test_report节点,生成测试报告" ) return {"report" : "这个是一个测试报告" } def generator_runnable_api_case (state: State ): """生成可执行的接口用例""" print ("正在生成可执行的接口用例" , state.get("test_cases" )) result = [] for case in state.get("test_cases" ): print ("case:" , case ) result.append(Send("api用例生成" , {"case" : case })) return result def runnable_api (state: State ): print ("正在执行api用例生成,生成可执行的接口用例:" , state.get('case' )) return { "runnable_api_case" : [{"api_name" : state.get('case' ), "api_url" : "http://127.0.0.1:8000/api/a" , "api_method" : "POST" }] } graph = StateGraph(State) graph.add_node("生成测试用例点" , generator_test_case) graph.add_node("生成可执行接口用例" , generator_runnable_api_case) graph.add_node("api用例生成" , runnable_api) graph.add_node("执行测试用例" , run_test_cases) graph.add_node("生成测试报告" , generator_test_report) graph.add_edge(START, "生成测试用例点" ) graph.add_conditional_edges("生成测试用例点" , generator_runnable_api_case) graph.add_edge("api用例生成" , "执行测试用例" ) graph.add_edge("执行测试用例" , "生成测试报告" ) graph.add_edge("生成测试报告" , END) app = graph.compile () res = app.invoke({"user_input" : "测试项目A" }) print (res)
状态合并机制 在并行任务中(如批量测试、分布式处理),多个节点可能同时修改共享状态。LangGraph 通过 Annotated 类型注解和运算符(如 operator.add )定义状态字段的合并规则,确保并发更新时数据正确聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from typing import TypedDict, Annotatedimport operatorfrom langgraph.graph import StateGraphclass OverallState (TypedDict ): cases: Annotated[list [str ], operator.add] cases2: Annotated[dict , operator.or_] def task_a (state: OverallState ) -> dict : return {"cases" : ["结果A" ]} def task_b (state: OverallState ) -> dict : return {"cases" : ["结果B" ]} builder = StateGraph(OverallState) builder.add_node("task_a" , task_a) builder.add_node("task_b" , task_b) builder.add_edge("task_a" , "task_b" ) workflow = builder.compile () result = workflow.invoke({"cases" : []}) print (result)
Annotated 类型注解
支持的运算符
基本控制:串行控制 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from typing_extensions import TypedDictfrom IPython.display import Image, displayfrom langgraph.graph import START, END, StateGraphclass State (TypedDict ): value_1: str value_2: str def step_1 (state: State ): return {"value_1" : "a" } def step_2 (state: State ): current_value_1 = state["value_1" ] return {"value_1" : f"{current_value_1} + b" } def step_3 (state: State ): return {"value_2" : 10 } graph_builder = StateGraph(State) graph_builder.add_node(step_1) graph_builder.add_node(step_2) graph_builder.add_node(step_3) graph_builder.add_edge(START, "step_1" ) graph_builder.add_edge("step_1" , "step_2" ) graph_builder.add_edge("step_2" , "step_3" ) graph_builder.add_edge("step_3" , END) graph = graph_builder.compile () display(Image(graph.get_graph().draw_mermaid_png())) res = graph.invoke({"value_1" : "c" }) print (res)
查看节点与图结构
执行结果:
1 {'value_1' : 'a + b' , 'value_2' : 10}
基本控制:分支控制 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import operatorfrom typing import Any , Annotatedfrom typing_extensions import TypedDictfrom IPython.display import Image, displayfrom langgraph.graph import START, END, StateGraphclass State (TypedDict ): aggregate: Annotated[list , operator.add] def a (state: State ): print (f"添加'A'到{state['aggregate' ]} " ) return {"aggregate" : ["A" ]} def b (state: State ): print (f"添加'B'到{state['aggregate' ]} " ) return {"aggregate" : ["B" ]} def c (state: State ): print (f"添加'C'到{state['aggregate' ]} " ) return {"aggregate" : ["C" ]} def d (state: State ): print (f"添加'D'到{state['aggregate' ]} " ) return {"aggregate" : ["D" ]} graph_builder = StateGraph(State) graph_builder.add_node(a) graph_builder.add_node(b) graph_builder.add_node(c) graph_builder.add_node(d) graph_builder.add_edge(START, "a" ) graph_builder.add_edge("a" , "b" ) graph_builder.add_edge("a" , "c" ) graph_builder.add_edge("b" , "d" ) graph_builder.add_edge("c" , "d" ) graph_builder.add_edge("d" , END) graph = graph_builder.compile () display(Image(graph.get_graph().draw_mermaid_png())) res = graph.invoke({"aggregate" : []}, {"configurable" : {"thread_id" : "foo" }}) print (res)
查看节点与图结构
执行结果:
1 2 3 4 5 6 添加'A' 到[] 添加'B' 到['A' ] 添加'C' 到['A' ] 添加'D' 到['A' , 'B' , 'C' ] {'aggregate' : ['A' , 'B' , 'C' , 'D' ]}
基本控制:条件分支与循环 分支条件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import operatorfrom typing import Annotated, Literal from typing_extensions import TypedDictfrom IPython.display import Image, displayfrom langgraph.graph import StateGraph, START, ENDclass State (TypedDict ): aggregate: Annotated[list , operator.add] def a (state: State ): print (f'Node A sees {state["aggregate" ]} ' ) return {"aggregate" : ["A" ]} def b (state: State ): print (f'Node B sees {state["aggregate" ]} ' ) return {"aggregate" : ["B" ]} builder = StateGraph(State) builder.add_node(a) builder.add_node(b) def route (state: State ) -> Literal ["b" , END]: if len (state["aggregate" ]) < 7 : return "b" else : return END builder.add_edge(START, "a" ) builder.add_conditional_edges("a" , route) builder.add_edge("b" , "a" ) graph = builder.compile () display(Image(graph.get_graph().draw_mermaid_png())) res = graph.invoke({"aggregate" : []}) print (res)
查看节点与图结构
执行结果:
1 2 3 4 5 6 7 8 9 Node A sees [] Node B sees ['A' ] Node A sees ['A' , 'B' ] Node B sees ['A' , 'B' , 'A' ] Node A sees ['A' , 'B' , 'A' , 'B' ] Node B sees ['A' , 'B' , 'A' , 'B' , 'A' ] Node A sees ['A' , 'B' , 'A' , 'B' , 'A' , 'B' ] {'aggregate' : ['A' , 'B' , 'A' , 'B' , 'A' , 'B' , 'A' ]}
注意:使用递归限制recursion_limit,防止异常情况下的大量无用调用
1 2 3 4 5 6 from langgraph.errors import GraphRecursionErrortry : graph.invoke({"aggregate" : []}, {"recursion_limit" : 4 }) except GraphRecursionError: print ("Recursion Error" )
执行结果
1 2 3 4 5 Node A sees [] Node B sees ['A' ] Node A sees ['A' , 'B' ] Node B sees ['A' , 'B' , 'A' ] Recursion Error
循环 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import operatorfrom typing import Annotated, Literal from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom IPython.display import Image, displayclass State (TypedDict ): aggregate: Annotated[list , operator.add] def a (state: State ): print (f'Node A sees {state["aggregate" ]} ' ) return {"aggregate" : ["A" ]} def b (state: State ): print (f'Node B sees {state["aggregate" ]} ' ) return {"aggregate" : ["B" ]} def c (state: State ): print (f'Node C sees {state["aggregate" ]} ' ) return {"aggregate" : ["C" ]} def d (state: State ): print (f'Node D sees {state["aggregate" ]} ' ) return {"aggregate" : ["D" ]} builder = StateGraph(State) builder.add_node(a) builder.add_node(b) builder.add_node(c) builder.add_node(d) def route (state: State ) -> Literal ["b" , END]: if len (state["aggregate" ]) < 7 : return "b" else : return END builder.add_edge(START, "a" ) builder.add_conditional_edges("a" , route) builder.add_edge("b" , "c" ) builder.add_edge("b" , "d" ) builder.add_edge(["c" , "d" ], "a" ) graph = builder.compile () display(Image(graph.get_graph().draw_mermaid_png())) res = graph.invoke({"aggregate" : []}) print (res)
查看节点与图结构
执行结果
1 2 3 4 5 6 7 8 9 10 11 Node A sees [] Node B sees ['A' ] Node C sees ['A' , 'B' ] Node D sees ['A' , 'B' ] Node A sees ['A' , 'B' , 'C' , 'D' ] Node B sees ['A' , 'B' , 'C' , 'D' , 'A' ] Node C sees ['A' , 'B' , 'C' , 'D' , 'A' , 'B' ] Node D sees ['A' , 'B' , 'C' , 'D' , 'A' , 'B' ] Node A sees ['A' , 'B' , 'C' , 'D' , 'A' , 'B' , 'C' , 'D' ] {'aggregate' : ['A' , 'B' , 'C' , 'D' , 'A' , 'B' , 'C' , 'D' , 'A' ]}