Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangGraph之工作流(图)

工作流核心概念

LangGraph 的核心是将 Agent 工作流建模为图形。您可以使用三个关键组件定义代理的行为:

  1. State(状态):表示应用程序当前快照的共享数据结构。它可以是任何 Python 类型,但通常是 a 或 Pydantic 。TypedDict BaseModel
  2. Node(节点):对代理的逻辑进行编码的 Python 函数。它们接收电流作为输入,执行一些计算或副作用,并返回更新的 State State
  3. Edges (边):Python 函数,根据当前它们可以是条件分支或固定过渡。Node State

标准状态图构建

StateGraph 核心方法

LangGraph 使用 StateGraph 来构建整个流程的执行图。下面是常用方法的分类和说明:

方法分类 方法名 功能描述 代码示例
节点管理 add_node 添加单个节点 builder.add_node("node_name", node_func)
边(流程)管理 add_edge 添加固定流程连接 builder.add_edge("node_a", "node_b")
add_conditional_edges 添加条件分支 builder.add_conditional_edges("start", router_func)
入口和出口 set_entry_point 设置流程起点 builder.set_entry_point("first_node")
set_finish_point 设置流程终点 builder.set_finish_point("last_node")
编译执行 compile 将构建图编译执行体 graph = builder.compile()
可视化图 get_graph 获取图对象绘制流程图 graph.get_graph().draw_mermaid_png()

LangGraph 流程构建标准步骤

通过 LangGraph,你可以非常清晰地分步骤搭建测试用例生成或处理流程。

1. 状态定义

在 LangGraph 中,状态可以是 TypedDictPydantic 模型,保证每个节点都能正确读取与更新数据。

  • TypedDict:属于 Python 标准库 typing 模块的一部分,仅提供静态类型检查,运行时不执行验证
  • Pydantic:第三方库,需要单独安装,提供运行时数据验证和序列化功能
1
2
3
4
5
from typing import TypedDict

class State(TypedDict):
value_1: str # 字符串类型的状态值
value_2: int # 整型的状态值

2. 节点函数开发

每一个节点就是一个处理函数,它接受一个State作为输入,并返回一个更新后的部分State(字典)

1
2
3
4
5
6
7
8
9
10
11
def step_1(state: State) -> dict:
"""初始化 value_1 字段"""
return {"value_1": "a"}

def step_2(state: State) -> dict:
"""增量更新 value_1 字段"""
return {"value_1": state["value_1"] + " b"}

def step_3(state: State) -> dict:
"""初始化 value_2 字段"""
return {"value_2": 10}

注意

  • 必须返回字典
  • 可以只返回局部更新字段,LangGraph自动合并回完整状态

3. 标准流程构建方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langgraph.graph import START, END, StateGraph

# 1. 初始化 StateGraph 构建器
builder = StateGraph(State)

# 2. 添加节点
builder.add_node("init_step", step_1)
builder.add_node("update_step", step_2)
builder.add_node("final_step", step_3)

# 3. 建立节点之间的连接关系
builder.add_edge(START, "init_step") # 从 START 到第一个节点
builder.add_edge("init_step", "update_step")
builder.add_edge("update_step", "final_step")
graph_builder.add_edge("step_3", END) # 结束的边

# 4. 编译为可执行图
graph = builder.compile()

提示

  • START 是固定起点
  • 不显式设置出口时,默认最后一个节点执行完即结束,也可以使用END设置结束边

4. 流程执行与调试

执行流程时,需要传入一个初始状态:

1
2
3
4
5
6
# 执行
result = graph.invoke({"value_1": "initial"})

print(result)
# 输出: {'value_1': 'a b', 'value_2': 10}

注意

  • 初始传入的数据字典,必须与 State 类型定义一致。s

5. 获取并绘制执行流程图

流程图有助于快速理解当前系统的执行路径。

1
2
3
4
5
6
7
8
9
# 获取 mermaid 格式流程图
res = graph.get_graph().draw_mermaid()
print(res) # 输出二进制编码


# 获取执行流程图写入文件
with open('graph.png','wb') as f:
content = app.get_graph().draw_mermaid_png()
f.write(content)

总结

  • StateGraph 是 LangGraph 流程构建的基础
  • 需要先定义状态(TypedDict)
  • 每个节点处理输入并返回部分更新
  • 流程控制可以串行、并行、条件跳转
  • 支持流程图可视化,方便理解和调试

Node(节点)

节点的参数

节点是图形的核心组成部分,可以使用 add_node 方法将这些节点添加到图形中

在 LangGraph 中,节点 可以 接受以下参数的 Python 函数(同步或异步):

  1. state:图形的状态
  2. config:包含配置信息(如)和跟踪信息(如RunnableConfig thread_id tags
  3. runtime:包含运行时上下文和其他信息(如 Runtime store stream_writer

注意:需要将 langgraph 升级到0.6.0a2 的版本,langgraph.runtime模块最新的版本中才加入的,之前的版本中没有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# @Time:2025/9/29 14:40
# @Author:jinglv
"""
节点函数可以接受以下三种类型的参数
1. state:图形的状态
继承typeDict的类
2. config:包含配置信息(如)和跟踪信息(如RunnableConfigthread_idtags)
3. runtime:包含运行时上下文和其他信息(如 Runtimestorestream_writer)
"""
from dataclasses import dataclass
from typing import TypedDict

from langchain_core.runnables import RunnableConfig
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime


# 定义状态
class State(TypedDict):
"""状态"""
user_input: str
test_cases: list


# 定义运行时的上下文参数
@dataclass
class RuntimeContext:
"""运行时上下文参数"""
test_env: str # 测试环境
tester_name: str # 测试人员名称


# ================定义运行节点函数=================

def generator_test_case(state: State):
"""生成测试用例"""
print(f"用户输入的需求是:{state.get('user_input')},开始进行测试用例生成")
# 这里核心的功能实现需要调用llm进行生成,暂时跳过
print("测试用例已经生成", )
return {"test_cases": ["测试用例1", "测试用例2"]}


def run_test_cases(state: State, runtime: Runtime[RuntimeContext]):
"""执行测试用例"""
print("正在执行测试用例:", state['test_cases'])
print("当前执行的测试环境", runtime.context.test_env)
print("执行人", runtime.context.tester_name)
return {"report": "这个是一个测试报告"}


def generator_test_report(state: State, config: RunnableConfig):
"""生成测试报告"""
print("执行generator_test_report节点")
print("配置信息:", config)
return {"report": "这个是一个测试报告"}


# =================工作流的创建个编排===================
graph = StateGraph(State, context_schema=RuntimeContext)
graph.add_node("生成测试用例", generator_test_case)
graph.add_node("执行测试用例", run_test_cases)
graph.add_node("生成测试报告", generator_test_report)

# 设置起点
# graph.set_entry_point("生成测试用例")
graph.add_edge(START, "生成测试用例")
graph.add_edge("生成测试用例", "执行测试用例")
graph.add_edge("执行测试用例", "生成测试报告")
# graph.set_finish_point("生成测试报告")
graph.add_edge("生成测试报告", END)

app = graph.compile()
res = app.invoke({"user_input": "测试项目A"},
config={"recursion_limit": 5},
context={"test_env": "测试环境A", "tester_name": "阿花"}
)

print(res)

特殊的节点

START节点

节点是一个特殊节点,表示将用户输入发送到图表的节点。引用此节点的主要目的是确定应该首先调用哪些节点。

1
2
3
from langgraph.graph import START

graph.add_edge(START, "node_a")

END节点

节点是一个表示终端节点的特殊节点。当您想要指示哪些边在完成后没有作时,将引用此节点。

1
2
3
from langgraph.graph import END

graph.add_edge("node_a", END)

Edges(边)

Edges (边)定义逻辑的路由方式以及图形决定停止的方式。这是代理工作方式以及不同节点之间如何通信的重要组成部分。边缘有几种关键类型:

  • 法线边:直接从一个节点转到下一个节点。
  • 条件边:调用函数来确定下一步要转到哪个节点。
  • 入口点:当用户输入到达时首先调用哪个节点。
  • 条件入口点:调用一个函数来确定在用户输入到达时首先调用哪个节点。

一个节点可以有多个传出边。如果一个节点有多个传出边,则所有这些目标节点将作为下一个超级步骤的一部分并行执行。

切入点

入口点是图形启动时运行的第一个节点。您可以使用从虚拟 START 节点到第一个要执行的节点的 add_edge 方法来指定进入图形的位置。

1
2
3
from langgraph.graph import START

graph.add_edge(START, "node_a")

法线边(Normal Edges)

如果想从节点A转到节点B,可以直接使用add_edge方法。

1
graph.add_edge("node_a", "node_b")

条件边

如果要选择性地路由到 1 个或多个边(或选择性地终止),可以使用 add_conditional_edges 方法。此方法接受节点的名称和在执行该节点后调用的“路由函数”:

1
graph.add_conditional_edges("node_a", routing_function)

条件入口点

条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点中的add_conditional_edges来完成此作

1
graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})

Send(节点并发执行)

核心作用:在运行时动态生成多条边,适用于处理未知数量的并行任务(如批量生成测试用例)。
测试场景:一个测试生成节点产生多个测试用例,需要分发到不同的测试执行节点。

  • 向哪个节点发送数据(目标节点名称)
  • 发送什么数据(携带的状态内容)

基本使用(示例代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langgraph.types import Send

def run_tasks(state):
return [
Send("task", {"id": 1}),
Send("task", {"id": 2}),
Send("task", {"id": 3})
]

graph.add_conditional_edges(
"start",
run_tasks, # 返回[Send(...), Send(...), ...]
["task"]
)

案例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# @Time:2025/9/29 15:16
# @Author:jinglv
from typing import TypedDict

from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Send

"""
# 并发执行特定节点:
使用Send去实现:

输入一个接口文档:
生成测试点 :10个测试点
10个测试点--> 10条可以执行的用例
# 将测试点--->转换为可执行用例的节点 (调用用大模型去实现)
"""
from typing import Annotated
import operator


# 定义状态
class State(TypedDict):
"""状态"""
user_input: str
test_cases: list
# 如果多个节点需求去修改某个状态的值,那么可以使用Annotated
runnable_api_case: Annotated[list[dict], operator.add]
case: str


# ================定义运行节点函数=================
def generator_test_case(state: State):
"""生成测试用例"""
# 实际在使用的时候,是使用大模型去生成的
print("开始执行生成用例的节点,用户输入的需求是:", state.get("user_input"))
import random
if random.randint(0, 5) > 2:
return {"test_cases": ["测试用例1", "测试用例2"]}
else:
return {"test_cases": ["测试用例1", "测试用例2", "测试用例3"]}


def run_test_cases(state: State):
"""执行测试用例"""
print("开始执行测试用例:", state.get("test_cases"))
return {"report": "这个是一个测试报告"}


def generator_test_report(state: State, ):
"""生成测试报告"""
print("执行generator_test_report节点,生成测试报告")
return {"report": "这个是一个测试报告"}


def generator_runnable_api_case(state: State):
"""生成可执行的接口用例"""
print("正在生成可执行的接口用例", state.get("test_cases"))
result = []
for case in state.get("test_cases"):
print("case:", case)
result.append(Send("api用例生成", {"case": case}))
return result


def runnable_api(state: State):
print("正在执行api用例生成,生成可执行的接口用例:", state.get('case'))
return {
"runnable_api_case": [{"api_name": state.get('case'), "api_url": "http://127.0.0.1:8000/api/a",
"api_method": "POST"}]
}


# =================工作流的创建个编排===================
graph = StateGraph(State)
graph.add_node("生成测试用例点", generator_test_case)
graph.add_node("生成可执行接口用例", generator_runnable_api_case)
graph.add_node("api用例生成", runnable_api)
graph.add_node("执行测试用例", run_test_cases)
graph.add_node("生成测试报告", generator_test_report)

# 设置起点
graph.add_edge(START, "生成测试用例点")
# 设置一个边进行并发执行
graph.add_conditional_edges("生成测试用例点", generator_runnable_api_case)

graph.add_edge("api用例生成", "执行测试用例")
graph.add_edge("执行测试用例", "生成测试报告")

graph.add_edge("生成测试报告", END)

app = graph.compile()
res = app.invoke({"user_input": "测试项目A"})

print(res)

状态合并机制

在并行任务中(如批量测试、分布式处理),多个节点可能同时修改共享状态。LangGraph 通过 Annotated 类型注解和运算符(如 operator.add)定义状态字段的合并规则,确保并发更新时数据正确聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# @Time:2025/9/29 15:20
# @Author:jinglv
import operator
from typing import TypedDict, Annotated

from langgraph.constants import START
from langgraph.graph import StateGraph


# 定义状态:使用 Annotated 声明 cases 字段的合并规则(通过 + 操作符合并列表)
class OverallState(TypedDict):
cases: Annotated[list[str], operator.add] # 关键点:并发时自动合并列表
cases2: Annotated[dict, operator.or_] # 并发时自动合并字典


# 模拟并行任务节点
def task_a(state: OverallState) -> dict:
return {"cases": ["结果A"]}


def task_b(state: OverallState) -> dict:
return {"cases": ["结果B"]}


# 构建并行流程图
builder = StateGraph(OverallState)
builder.add_edge(START, "task_a")
builder.add_node("task_a", task_a)
builder.add_node("task_b", task_b)
builder.add_edge("task_a", "task_b") # 实际场景中可能是并行分支

workflow = builder.compile()
result = workflow.invoke({"cases": []})
print(result) # 输出: {'cases': ['结果A', '结果B'], 'cases2': {}}

  1. Annotated 类型注解
    • 语法:Annotated[类型, 操作符]
    • 作用:声明字段的合并行为。例如 operator.add 表示用 + 操作符合并数据。
  2. 支持的运算符
    • operator.add:合并列表/数值(list1 + list2)
    • operator.or_:字典合并(dict1 | dict2)

分支和循环流程

条件分支流程

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# @Time:2025/9/29 15:25
# @Author:jinglv
from typing import TypedDict

from langgraph.constants import START, END
from langgraph.graph import StateGraph


# 定义状态
class State(TypedDict):
"""状态"""
user_input: str
test_cases: list
check_result: bool


# ================定义运行节点函数=================

def generator_test_case(state: State):
"""生成测试用例"""
print("开始执行生成用例的节点,用户输入的需求是:", state.get("user_input"))
return {"test_cases": ["测试用例1", "测试用例2", '测试用例3']}


def check_test_case_router(state: State):
"""检查测试用例是否可用"""
print("校验测试用例是否可用,当前的用例为:", state.get("test_cases"))
# 校验逻辑后续讲项目的时候完善,校验用例的数量是否大于3
if len(state.get("test_cases")) >= 3:
# return "执行测试用例"
return "a"
else:
# return "补充生成测试用例"
return "b"


def generator_test_case2(state: State):
"""补充生成测试用例的节点"""
print("正在补充生成测试用例的节点")
return {"test_cases": state.get("test_cases") + ['测试用例3']}


def run_test_cases(state: State):
"""执行测试用例"""
print("开始执行测试用例:", state.get("test_cases"))
return {"report": "这个是一个测试报告"}


def generator_test_report(state: State, ):
"""生成测试报告"""
print("执行generator_test_report节点,生成测试报告")
return {"report": "这个是一个测试报告"}


# =================工作流的创建个编排===================
graph = StateGraph(State)
graph.add_node("生成测试用例", generator_test_case)
graph.add_node("检查测试用例", check_test_case_router)
graph.add_node("补充生成测试用例", generator_test_case2)
graph.add_node("执行测试用例", run_test_cases)
graph.add_node("生成测试报告", generator_test_report)

# 设置起点
graph.add_edge(START, "生成测试用例")
# 设置一个控制节点执行分支的条件
# graph.add_conditional_edges("生成测试用例", check_test_case_router, ["补充生成测试用例", "执行测试用例"])
graph.add_conditional_edges("生成测试用例", check_test_case_router, {
"a": "执行测试用例",
"b": "补充生成测试用例"
})
graph.add_edge("补充生成测试用例", "执行测试用例")
graph.add_edge("执行测试用例", "生成测试报告")
graph.add_edge("生成测试报告", END)

app = graph.compile()
res = app.invoke({"user_input": "测试项目A"})

print(res)

循环流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# @Time:2025/9/29 15:28
# @Author:jinglv
from typing import TypedDict

from langgraph.constants import START, END
from langgraph.graph import StateGraph


# 定义状态
class State(TypedDict):
"""状态"""
user_input: str
test_cases: list
check_result: bool


# ================定义运行节点函数=================
def generator_test_case(state: State):
"""生成测试用例"""
# 实际在使用的时候,是使用大模型去生成的
print("开始执行生成用例的节点,用户输入的需求是:", state.get("user_input"))
import random
if random.randint(0, 5) > 2:
return {"test_cases": ["测试用例1", "测试用例2"]}
else:
return {"test_cases": ["测试用例1", "测试用例2", "测试用例3"]}


def check_test_case_router(state: State):
"""检查测试用例是否可用"""
print("校验测试用例是否可用,当前的用例为:", state.get("test_cases"))
# 校验逻辑后续讲项目的时候完善,校验用例的数量是否大于3
if len(state.get("test_cases")) >= 3:
# return "执行测试用例"
return "a"
else:
# return "补充生成测试用例"
return "b"


def run_test_cases(state: State):
"""执行测试用例"""
print("开始执行测试用例:", state.get("test_cases"))
return {"report": "这个是一个测试报告"}


def generator_test_report(state: State, ):
"""生成测试报告"""
print("执行generator_test_report节点,生成测试报告")
return {"report": "这个是一个测试报告"}


# =================工作流的创建个编排===================
graph = StateGraph(State)
graph.add_node("生成测试用例", generator_test_case)
graph.add_node("检查测试用例", check_test_case_router)
graph.add_node("执行测试用例", run_test_cases)
graph.add_node("生成测试报告", generator_test_report)

# 设置起点
graph.add_edge(START, "生成测试用例")
# 设置一个控制节点执行分支的条件
graph.add_conditional_edges("生成测试用例", check_test_case_router, {
"a": "执行测试用例",
"b": "生成测试用例"
})
graph.add_edge("执行测试用例", "生成测试报告")

graph.add_edge("生成测试报告", END)

app = graph.compile()
res = app.invoke({"user_input": "测试项目A"})

print(res)