Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangGraph之流式输出

LangGraph 实现了一个流式传输系统,用于实时更新信息。对于基于大型语言模型(LLMs)构建的应用程序来说,流式传输对于增强其响应性至关重要。这是因为大型语言模型在处理复杂的自然语言任务时,可能会存在一定的延迟。而通过流式传输,可以在模型还在处理数据的过程中,就将部分结果逐步展示给用户,而不是让用户等待一个完整的响应,从而让用户能够更快地获得反馈,提高了应用程序的交互性。

官方文档:https://docs.langchain.com/oss/python/langgraph/streaming

  • Stream graph state(流式传输图状态):可以通过“updates”和“values”模式获取状态更新/值。其中,“updates”模式会流式传输图在每一步之后的状态更新,如果在同一个步骤中进行了多次更新(例如,运行了多个节点),这些更新将分别流式传输;而“values”模式则会流式传输图在每一步之后的完整状态值。
  • Stream subgraph outputs(流式传输子图输出):包括父图和任何嵌套子图的输出。通过在父图的.stream()方法中设置subgraphs=True,可以实现这一点。输出将以(namespace, data)元组的形式流式传输,其中namespace是一个包含调用子图的节点路径的元组,例如(“parent_node:“, “child_node:“)。
  • Stream LLM tokens(流式传输LLM令牌):可以从任何地方捕获令牌流,包括节点内部、子图内部或工具内部。这使得能够实时获取大型语言模型(LLM)的输出令牌,从而提升应用的响应性和用户体验。
  • Stream custom data(流式传输自定义数据):可以直接从工具函数发送自定义更新或进度信号。这为开发者提供了更大的灵活性,可以根据需要自定义流式传输的数据内容,以满足特定的应用需求。
  • Use multiple streaming modes(使用多种流式传输模式):可以选择以下多种流式传输模式:
    • values(完整状态):流式传输每一步之后图的完整状态值。
    • updates(状态增量):流式传输每一步之后图的状态更新。
    • messages(LLM令牌+元数据):流式传输2元组(LLM令牌,元数据),这些元数据包含有关图节点和LLM调用的详细信息。
    • custom(任意用户数据):流式传输任意用户定义的数据。
    • debug(详细跟踪):在图执行过程中尽可能多地流式传输信息,包括节点名称和完整状态等详细信息。

支持的流模式(stream_mode)

将以下一种或多种流模式作为列表传递给 stream()astream() 方法:

模式 描述
values 每个图步骤后流式传输完整状态值。
updates 每个步骤后流式传输状态的增量更新。
custom 从图中节点流式传输自定义数据。
messages 从调用 LLM 的节点流式传输生成的 token 及元数据(元组)。
debug 尽可能多地流式传输调试信息。

支持将多种模式作为列表同时传递,如:

1
stream_mode=["updates", "messages", "custom"]

定义一个常见的图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from typing import TypedDict
from langgraph.graph import StateGraph, START


class State(TypedDict):
topic: str
joke: str


def refine_topic(state: State):
return {"topic": state["topic"] + "和小狗"}


def generate_joke(state: State):
return {"joke": f"这是一个关于{state['topic']}的笑话"}


graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.compile()
)

流式传输stream_mode=”values”

在图表的每个步骤之后传输状态的完整值

1
2
3
4
5
for chunk in graph.stream(
{"topic": "冰激凌"},
stream_mode="values",
):
print(chunk)

执行结果

1
2
3
{'topic': '冰激凌'}
{'topic': '冰激凌和小狗'}
{'topic': '冰激凌和小狗', 'joke': '这是一个关于冰激凌和小狗的笑话'}

说明:values 则输出完整状态值。

流式传输stream_mode=”updates”

将图表每一步之后的更新流式传输到状态

1
2
3
4
5
for chunk in graph.stream(
{"topic": "冰激凌"},
stream_mode="updates",
):
print(chunk)

执行结果

1
2
{'refine_topic': {'topic': '冰激凌和小狗'}}
{'generate_joke': {'joke': '这是一个关于冰激凌和小狗的笑话'}}

说明:updates 模式输出节点的状态更新

流式传输stream_mode=”debug”

在图表的整个执行过程中传输尽可能多的信息

1
2
3
4
5
for chunk in graph.stream(
{"topic": "冰激凌"},
stream_mode="debug",
):
print(chunk)

执行结果

1
2
3
4
{'step': 1, 'timestamp': '2025-09-18T08:51:31.478123+00:00', 'type': 'task', 'payload': {'id': '671c3949-69b8-8e90-b997-e76a1f711f5f', 'name': 'refine_topic', 'input': {'topic': '冰激凌'}, 'triggers': ('branch:to:refine_topic',)}}
{'step': 1, 'timestamp': '2025-09-18T08:51:31.478680+00:00', 'type': 'task_result', 'payload': {'id': '671c3949-69b8-8e90-b997-e76a1f711f5f', 'name': 'refine_topic', 'error': None, 'result': [('topic', '冰激凌和小狗')], 'interrupts': []}}
{'step': 2, 'timestamp': '2025-09-18T08:51:31.478828+00:00', 'type': 'task', 'payload': {'id': 'b7f1816a-7dab-4847-e8e6-78ae4d691a8e', 'name': 'generate_joke', 'input': {'topic': '冰激凌和小狗'}, 'triggers': ('branch:to:generate_joke',)}}
{'step': 2, 'timestamp': '2025-09-18T08:51:31.479006+00:00', 'type': 'task_result', 'payload': {'id': 'b7f1816a-7dab-4847-e8e6-78ae4d691a8e', 'name': 'generate_joke', 'error': None, 'result': [('joke', '这是一个关于冰激凌和小狗的笑话')], 'interrupts': []}}

从以上结果来看,可以看打执行的详细信息,每一个步骤执行

LLM token 流式传输stream_mode=”messages”

为调用LLM的图形节点传输LLM令牌和元数据

需要接入大模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from langchain_deepseek import ChatDeepSeek
import os

llm = ChatDeepSeek(
model="deepseek-chat",
temperature=0,
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url=os.environ.get("DEEPSEEK_API_BASE"),
)


def generate_joke(state: State):
llm_response = llm.invoke(
[
{"role": "user", "content": f"生成一个关于 {state['topic']}的笑话"}
]
)
return {"joke": llm_response.content}


graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.compile()
)

for message_chunk, metadata in graph.stream(
{"topic": "冰激凌"},
stream_mode="messages",
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)

执行结果(流式输出的)

1
2
3
4
5
6
小狗|走进|一家|冰|激|凌|店|,|店员|问|:“|想要|什么|口|味的|?”
|小狗|说|:“|汪|草|味的|!”
|店员|愣|住|:“|抱歉|…|我们没有|汪|草|口味|。”
|小狗|叹气|:“|那|好吧|,|给我|一个|‘|爪子|’|饼干|筒|装|香|草|味|——|但|记住|,|这次|别|再把|我的|球|藏|进|冰|激|凌|里|了|,|上次|我|挖|了|半小时|!”|🍦|🐶|

|(|注|:|谐|音|梗|:|汪|草|=|香|草|,|爪子|=|甜|筒|品牌|“|可爱|多|”|的|经典|筒|身|设计|)|

说明:每生成一个 token 就会立即输出,并附带上下文信息。

工具中自定义数据流式输出stream_mode=”custom”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# @Time:2025/9/30 14:48
# @Author:jinglv
from dataclasses import dataclass
from typing import TypedDict

from langchain_core.runnables import RunnableConfig
from langgraph.config import get_stream_writer
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime

from app.agent.model.llms import qv_llm


# 定义状态
class State(TypedDict):
"""状态"""
user_input: str
test_cases: str
result: str


# 定义运行时的上下文参数
@dataclass
class RuntimeContext:
"""运行时上下文参数"""
test_env: str # 测试环境
tester_name: str # 测试人员名称


def generator_test_case(state: State):
"""生成测试用例"""
writer = get_stream_writer()
writer(f"开始执行生成测试用例的节点")
prompt = """
请帮我生成用户5条登录的测试用例,登录账号密码的长度限制为8到16位
"""
cases = qv_llm.invoke(prompt)
return {"test_cases": cases}


def run_test_cases(state: State, runtime: Runtime[RuntimeContext]):
"""执行测试用例"""
writer = get_stream_writer()
writer(f"开始执行【分析测试用例】的节点")
cases = state['test_cases']
prompt = f"""
请分析当前的五条测试用例,是否有缺陷,
用例数据如下:{cases}
"""
result = qv_llm.invoke(prompt)
return {"result": result}


def generator_test_report(state: State, config: RunnableConfig):
"""生成测试报告"""
writer = get_stream_writer()
writer(f"开始【生成测试报告】的节点运行")
return {"report": "这个是一个测试报告"}


# =================工作流的创建个编排===================
graph = StateGraph(State, context_schema=RuntimeContext)
graph.add_node("生成测试用例", generator_test_case)
graph.add_node("执行测试用例", run_test_cases)
graph.add_node("生成测试报告", generator_test_report)

# 设置起点
# graph.set_entry_point("生成测试用例")
graph.add_edge(START, "生成测试用例")
graph.add_edge("生成测试用例", "执行测试用例")
graph.add_edge("执行测试用例", "生成测试报告")
# graph.set_finish_point("生成测试报告")
graph.add_edge("生成测试报告", END)

app = graph.compile()

response = app.stream({"user_input": "测试项目A"},
config={"recursion_limit": 5},
context=RuntimeContext(test_env="测试环境A", tester_name="张三"),
stream_mode=['messages', 'custom']
)

for input_type, chunk in response:
if input_type == "messages":
# ai的输出内容
print(chunk[0].content, end="", flush=True)
elif input_type == "custom":
# 工具执行的输出内容
print(chunk)

注意:必须在 LangGraph 执行上下文中调用 get_stream_writer(),否则无效。