Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangGraph之时光旅行

时光旅行(Time Travel)功能通过状态机管理实现数据流回溯与修改,支持重放和分叉操作。重放可回顾智能体执行流程,分叉则允许在特定节点更改数据并探索替代路径,提升调试与优化效率。

  • 重放(Replay)
    • 应用场景:如多步骤智能体任务回溯和分享
  • 分叉(Fork)
    • 应用场景:如在特定节点修改流程路径

基础示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# 设置工具
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import MessagesState, START
from langgraph.prebuilt import ToolNode
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver


@tool
def play_song_on_qq(song: str):
"""在qq音乐上播放歌曲"""
# 调用QQ音乐 API...
return f"成功在QQ音乐上播放了{song}!"


@tool
def play_song_on_163(song: str):
"""在网易云上播放歌曲"""
# 调用网易云 API...
return f"成功在网易云上播放了{song}!"


tools = [play_song_on_qq, play_song_on_163]
tool_node = ToolNode(tools)

# 设置模型
from langchain_openai import ChatOpenAI
import os

deepseek = ChatOpenAI(
model="deepseek-chat",
temperature=0,
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url=os.environ.get("DEEPSEEK_API_BASE"),
)

model = deepseek.bind_tools(tools, parallel_tool_calls=False)


# 定义节点和条件边


# 定义确定是否继续的函数
def should_continue(state):
messages = state["messages"]
last_message = messages[-1]
# 如果没有函数调用,则结束
if not last_message.tool_calls:
return "end"
# 否则如果有,我们继续
else:
return "continue"


# 定义调用模型的函数
def call_model(state):
messages = state["messages"]
response = model.invoke(messages)
# 我们返回一个列表,因为这将被添加到现有列表中
return {"messages": [response]}


# 定义一个新图
workflow = StateGraph(MessagesState)

# 定义我们将循环的两个节点
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)

# 将入口点设置为`agent`
# 这意味着这个节点是第一个被调用的
workflow.add_edge(START, "agent")

# 现在添加一个条件边
workflow.add_conditional_edges(
# 首先,我们定义起始节点。我们使用`agent`。
# 这意味着这些是在调用`agent`节点后采取的边。
"agent",
# 接下来,我们传入将确定下一个调用哪个节点的函数。
should_continue,
# 最后我们传入一个映射。
# 键是字符串,值是其他节点。
# END是一个特殊节点,标记图应该结束。
# 将会发生的是我们调用`should_continue`,然后该函数的输出
# 将与此映射中的键匹配。
# 根据匹配的键,然后调用相应的节点。
{
# 如果是`tools`,则调用工具节点。
"continue": "action",
# 否则我们结束。
"end": END,
},
)

# 现在我们从`tools`到`agent`添加一个普通边。
# 这意味着在调用`tools`之后,下一步调用`agent`节点。
workflow.add_edge("action", "agent")

# 设置内存
memory = MemorySaver()

# 最后,我们编译它!
# 这将它编译成一个LangChain Runnable,
# 意味着你可以像使用任何其他runnable一样使用它

# 我们添加`interrupt_before=["action"]`
# 这将在调用`action`节点之前添加一个断点
app = workflow.compile(checkpointer=memory)

# 进行简单调用,要求播放周杰伦的歌曲
from langchain_core.messages import HumanMessage

config = {"configurable": {"thread_id": "1"}}
input_message = HumanMessage(content="你能播放一首周杰伦播放量最高的歌曲吗?")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()

# 查看记录并重放,get_state获取当前的状态,查看信息
app.get_state(config).values["messages"]

# 使用get_state_history将历史记录打印出来
all_states = []
for state in app.get_state_history(config):
print(state)
all_states.append(state)
print("--")

重放

根据以上代码执行,我们可以返回任何一个状态节点,并从那个时候重新开始操作

1
2
3
4
5
6
7
8
9
10
11
to_replay = all_states[2] # 从第二状态开始,重新执行

to_replay.values

# 查看下一个节点执行什么
to_replay.next

# 如果想从这个状态节点重播,只需这样
for event in app.stream(None, to_replay.config):
for v in event.values():
print(v)

分叉

根据重放中的代码,从某个节点开始,对执行的数据进行分叉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 修改最后一个消息的工具调用
# 我们将其从`play_song_on_qq`更改为`play_song_on_163`
last_message = to_replay.values["messages"][-1]
last_message.tool_calls[0]["name"] = "play_song_on_163"

branch_config = app.update_state(
to_replay.config,
{"messages": [last_message]},
)

# 此时整个图的流就进行了分叉处理
for event in app.stream(None, branch_config):
for v in event.values():
print(v)