Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangChain流式输出模式

LangChain 设计了一个特殊的机制,能够将数据以连续不断的“流”的形式传递和处理,而不是等待所有数据都准备好后再一次性展示。

官方介绍地址:https://docs.langchain.com/oss/python/langchain/streaming

流式处理对于提升基于 LLM 的应用程序的响应能力至关重要。

  • 基于 LLM 的应用程序:LLM(Large Language Models,大型语言模型)是现代人工智能应用的核心组件,如聊天机器人、智能助手等。这些应用依赖 LLM 来生成文本、回答问题等。
  • 响应能力:应用程序能够快速响应用户操作的能力。如果响应延迟过高,用户体验会大打折扣。
  • 流式处理的重要性:由于 LLM 通常需要一定时间来生成完整的回答(存在延迟),流式处理可以在生成过程中逐步展示部分结果,让用户感觉应用更加“响应迅速”。

通过逐步显示输出,即使在完整响应尚未准备好之前,流式处理也能显著提升用户体验(UX),尤其是在处理 LLM 的延迟时。

  • 逐步显示输出:不是等到 LLM 生成完整回答后再显示,而是随着生成过程逐步展示部分结果。
  • 用户体验(UX):用户在使用应用程序时的整体感受,包括操作的便捷性、等待时间的长短等。
  • 处理 LLM 的延迟:LLM 生成回答可能需要几秒甚至更长时间,流式处理可以在这段时间内持续提供反馈,让用户知道系统正在工作,而不是无响应地等待。

LangChain 流式处理的功能

  • 流式代理进度:在每个代理步骤之后获取状态更新。
    • 代理步骤:代理执行任务时的各个阶段,如调用工具、处理数据等。
    • 状态更新:每个步骤完成后,系统会发送一个更新,告知当前的执行状态。
  • 流式 LLM 令牌:随着生成过程流式传输语言模型令牌。
    • LLM 令牌:LLM 在生成文本时,会将文本分解为一个个“令牌”(token),这些令牌可以是单词、短语或符号等。
    • 流式传输:随着 LLM 生成令牌的过程,实时地将这些令牌发送出去。
  • 流式自定义更新:发出用户定义的信号(例如,“已获取 10/100 条记录”)。
    • 用户定义的信号:开发者可以根据需要定义特定的更新信息,如任务进度、特定事件的发生等。
  • 流式多种模式:可以选择更新(代理进度)、消息(LLM 令牌 + 元数据)或自定义(任意用户数据)。
    • 更新模式:主要关注代理的执行进度。
    • 消息模式:除了 LLM 令牌外,还会包含一些额外的元数据,如令牌的来源、生成时间等。
    • 自定义模式:允许开发者发送任意类型的数据,以满足特定的应用需求。

支持的流模式

当你使用LangChain的stream方法时,可以将以下一种或多种流模式以列表的形式传递给它,以实现不同的数据流功能

Mode Description 应用场景
updates 在每个代理步骤之后流式传输状态更新。如果在同一个步骤中进行了多次更新(例如,运行了多个节点),那么这些更新将分别进行流式传输。 当你需要实时了解代理的执行进度,例如代理调用了哪些工具、每个工具的执行结果等,就可以使用这种模式。这样可以让你的应用程序及时向用户反馈代理的运行状态,提高用户体验。
messages 从任何调用了LLM的图节点流式传输(token,metadata)元组。 当你想要获取LLM生成的token以及相关的元数据时,这种模式非常有用。你可以通过这些token逐步构建出最终的响应内容,并且可以根据元数据了解token的来源等信息,从而更好地处理和展示LLM的输出。
custom 使用流式写入器从你的图节点内部流式传输自定义数据。 如果你有一些特定的数据需要实时传输,比如工具执行过程中的中间结果、日志信息等,就可以通过这种模式自定义要流式传输的内容。这为开发者提供了很大的灵活性,可以根据自己的需求传输各种类型的数据。

流式处理案例

流式传输 Agent 进度(Agent progress)

使用stream方法和streamMode: “updates”:当你想流式传输代理的进度时,需要在调用stream方法时指定streamMode为”updates”。这样,代理在执行的每一步都会发出一个事件,告知当前的执行状态。

每一步的更新内容

  • LLM节点:工具调用请求的AIMessage:当代理执行到需要调用某个工具(tool)时,会首先生成一个AIMessage,其中包含了对工具的调用请求信息。例如,如果代理需要调用一个获取天气信息的工具,这个AIMessage可能会包含工具的名称、参数等信息,表明代理正在请求调用该工具。
  • 工具节点:工具执行结果的ToolMessage:工具节点在执行完成后,会生成一个ToolMessage,其中包含了工具执行的结果。例如,如果工具是获取天气信息,那么这个ToolMessage就会包含天气的具体信息,如温度、天气状况等。
  • LLM节点:最终的AI响应:在代理完成所有工具的调用和处理后,会生成一个最终的AI响应。这个响应是代理根据工具的执行结果以及自身的逻辑生成的,是对用户原始问题的最终回答。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# @Time:2025/12/31 15:25
# @Author:jinglv
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime

from src.core.llms import model_client


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
)

# 调用agent,一定要通过参数config出入线程id
res = agent.stream(
# agent调用的时候第一个参数input传递的只,最终会保存到runtime.state(短期记忆)中
{"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"},
stream_mode="updates" # 默认是updates
)

for chunk in res:
for step, data in chunk.items():
print(f"step: {step}")
print(f"content: {data['messages'][-1].content_blocks}")

执行结果

1
2
3
4
5
6
step: model
content: [{'type': 'text', 'text': '我来帮您获取用户信息。'}, {'type': 'tool_call', 'id': 'call_00_V1wAgybiEpZKj5WDuybkIUyC', 'name': 'get_user_info', 'args': {}}]
step: tools
content: [{'type': 'text', 'text': '{"user_id": "9527", "nickname": "花花", "age": 18}'}]
step: model
content: [{'type': 'text', 'text': '根据获取到的用户信息:\n\n- **用户ID**: 9527\n- **昵称**: 花花 \n- **年龄**: 18岁\n\n这是当前用户的完整信息。'}]

流式传输 LLM Tokens( LLM tokens)

当语言模型(LLM)产生令牌(tokens)时,可以通过设置“stream_mode”为“messages”来实现对这些令牌的流式传输。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# @Time:2025/12/31 15:25
# @Author:jinglv
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime

from src.core.llms import model_client


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
)

# 调用agent,一定要通过参数config出入线程id
res = agent.stream(
# agent调用的时候第一个参数input传递的只,最终会保存到runtime.state(短期记忆)中
{"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"},
stream_mode="messages"
)

for token, metadata in res:
print(f"node: {metadata['langgraph_node']}")
print(f"content: {token.content_blocks}")
print("\n")

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
node: model
content: []


node: model
content: [{'type': 'text', 'text': '我来'}]


node: model
content: [{'type': 'text', 'text': '帮'}]

……



node: model
content: [{'type': 'text', 'text': '\n\n'}]


node: model
content: [{'type': 'text', 'text': '这是'}]


node: model
content: [{'type': 'text', 'text': '当前'}]


node: model
content: [{'type': 'text', 'text': '用户'}]


node: model
content: [{'type': 'text', 'text': '的基本'}]


node: model
content: [{'type': 'text', 'text': '信息'}]


node: model
content: [{'type': 'text', 'text': '。'}]


node: model
content: []

自定义工具的 Streaming( Custom updates)

当工具在运行过程中,为了实时传输更新信息,你可以使用配置中的writer参数。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# @Time:2025/12/31 15:25
# @Author:jinglv
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime
from langgraph.config import get_stream_writer

from src.core.llms import model_client


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
writer = get_stream_writer()
writer(f"用户id:{runtime.state.get("user_id")}")
writer(f"用户昵称:{runtime.state.get("nickname")}")
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
)

# 调用agent,一定要通过参数config出入线程id
res = agent.stream(
# agent调用的时候第一个参数input传递的只,最终会保存到runtime.state(短期记忆)中
{"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"},
stream_mode="custom"
)

for chunk in res:
print(chunk)

执行结果

1
2
用户id:9527
用户昵称:花花

同时启用多个 Streaming 模式( Stream multiple modes)

可以通过将streamMode设置为一个数组来指定多种流式模式。数组中的元素可以是”updates”、”messages”、”custom”,分别代表不同的流式模式。例如,如果你想要同时获取状态更新、语言模型生成的token以及自定义的更新数据,就可以将streamMode设置为[“updates”, “messages”, “custom”]。

当使用了多种流式模式后,流式系统输出的数据将会是一个元组的形式,即[mode, chunk]。其中,mode表示当前这个元组对应的是哪种流式模式,比如是”updates”、”messages”还是”custom”;chunk则是该模式下实际被流式传输的数据内容。

代码示例:同时观察进度 + 自定义日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# @Time:2025/12/31 15:25
# @Author:jinglv
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime
from langgraph.config import get_stream_writer

from src.core.llms import model_client


# 定义短期记忆(runtime.state)中其他额外的字段
class UserInfo(AgentState):
user_id: str
nickname: str


# 定义一个工具
@tool("get_user_info", description="用户获取当前用户信息的工具")
def get_user_info(runtime: ToolRuntime):
"""获取用户信息"""
writer = get_stream_writer()
writer(f"用户id:{runtime.state.get("user_id")}")
writer(f"用户昵称:{runtime.state.get("nickname")}")
return {"user_id": runtime.state.get("user_id"), "nickname": runtime.state.get("nickname"), "age": 18}


# 创建一个agent,开启短期记忆
agent = create_agent(
model_client,
# 配置智能体工具
tools=[get_user_info],
# 指定短期记忆中额外的字段
state_schema=UserInfo,
)

# 调用agent,一定要通过参数config出入线程id
res = agent.stream(
# agent调用的时候第一个参数input传递的只,最终会保存到runtime.state(短期记忆)中
{"messages": [{"role": "user", "content": "获取用户的信息"}], "user_id": "9527", "nickname": "花花"},
stream_mode=["updates", "custom"]
)

for stream_mode, chunk in res:
print(f"stream_mode: {stream_mode}")
print(f"content: {chunk}\n")

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
stream_mode: updates
content: {'model': {'messages': [AIMessage(content='我来帮您获取用户信息。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 34, 'prompt_tokens': 286, 'total_tokens': 320, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 256}, 'prompt_cache_hit_tokens': 256, 'prompt_cache_miss_tokens': 30}, 'model_provider': 'deepseek', 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_eaab8d114b_prod0820_fp8_kvcache', 'id': '0ee91ccd-345b-4166-94aa-3fdb1c7f364d', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--3d10f54b-c504-43c0-b5e7-799ed1a33b0b-0', tool_calls=[{'name': 'get_user_info', 'args': {}, 'id': 'call_00_LjZ9Vs65zfubQYsPbTIOmR02', 'type': 'tool_call'}], usage_metadata={'input_tokens': 286, 'output_tokens': 34, 'total_tokens': 320, 'input_token_details': {'cache_read': 256}, 'output_token_details': {}})]}}

stream_mode: custom
content: 用户id:9527

stream_mode: custom
content: 用户昵称:花花

stream_mode: updates
content: {'tools': {'messages': [ToolMessage(content='{"user_id": "9527", "nickname": "花花", "age": 18}', name='get_user_info', id='b3088504-3aeb-4530-8a81-dcfb8e314a49', tool_call_id='call_00_LjZ9Vs65zfubQYsPbTIOmR02')]}}

stream_mode: updates
content: {'model': {'messages': [AIMessage(content='根据获取到的信息,当前用户的信息如下:\n\n- **用户ID**: 9527\n- **昵称**: 花花\n- **年龄**: 18岁\n\n这是系统为您提供的用户基本信息。如果您需要其他帮助,请随时告诉我!', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 51, 'prompt_tokens': 357, 'total_tokens': 408, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 320}, 'prompt_cache_hit_tokens': 320, 'prompt_cache_miss_tokens': 37}, 'model_provider': 'deepseek', 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_eaab8d114b_prod0820_fp8_kvcache', 'id': '2970088f-977d-4f17-b204-99110debdc21', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--b291342f-b997-474d-ae16-9fb1f21157e2-0', usage_metadata={'input_tokens': 357, 'output_tokens': 51, 'total_tokens': 408, 'input_token_details': {'cache_read': 320}, 'output_token_details': {}})]}}