Jean's Blog

一个专注软件测试开发技术的个人博客

0%

LangChain人工审核详解

Human-in-the-loop中间件在我们智能体开发中使用还是比较重要的,因此会重点进行介绍。

什么是人工审核

人工循环(HITL)中间件允许你为代理工具调用添加人工监督审核。当模型提出可能需要审查的动作——例如写入文件或执行 SQL——中间件可以暂停执行并等待决策。

它通过将每个工具调用与可配置策略进行检查来实现这一点。如果需要干预,中间件会发出中断,从而停止执行。图状态通过 LangGraph 的持久层保存,因此执行可以安全地暂停,稍后继续。

随后由人工决策决定下一步发生什么:动作可以按原样批准(approve)、修改后执行(edit)、或通过反馈拒绝(reject)。

人工审核简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# @Time:2025/12/31 13:18
# @Author:jinglv
"""
人机交互
"""
from langchain.agents import create_agent
from langchain_core.tools import tool
from pydantic import BaseModel, Field

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")


# 使用 Pydantic 数据模型来定义工具参数
class DataBaseConfig(BaseModel):
host: str = Field(..., description="数据库的 host 地址")
port: int = Field(..., description="数据库端口")
user: str = Field(..., description="数据库连接的用户名")
password: str = Field(..., description="数据库连接的用户密码")
database: str = Field(..., description="操作的库")


# 2、工具的定义(加入人工审核)
@tool("database_handler", description="这是一个操作数据库的工具(带人工审核)")
def database_handler(sql: str, data_config: DataBaseConfig):
"""带人工审核的数据库操作工具"""
import pymysql

# -------------------------
# 第一步:人工审核(HITL)
# -------------------------
print("\n=====【人工审核 Required】=====")
print(f"待执行 SQL: {sql}")
# 手动实现简单的人工审核
confirm = input("是否允许执行该 SQL? (yes/no): ").strip().lower()
if confirm != "yes":
print("人工审核拒绝,拒绝执行 SQL...")
return f"❌ 人工拒绝执行 SQL:{sql}"
print("人工审核通过,继续执行 SQL...")
# -------------------------

# 第二步:执行 SQL
database = pymysql.connect(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)

cursor = database.cursor()
print("开始执行 SQL:", sql)

result = cursor.execute(sql)
res = cursor.fetchall()

cursor.close()
database.close()

return f"SQL 执行成功,共影响 {result} 条数据,执行结果:{res}"


@tool("get_database_connect_config", description="获取数据库的连接参数")
def get_database_connect_config() -> DataBaseConfig:
"""获取数据库的连接配置"""
return DataBaseConfig(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
)


# 创建一个智能体
agent = create_agent(
model=model_client,
tools=[database_handler, get_database_connect_config],
system_prompt="""
您是一个数据库操作助手。

您可以:
1. 执行 SQL 进行创建库、查询和更新、以及删除操作
2. 提供详细的执行过程反馈

注意:由于数据库操作存在风险,所有 SQL 执行前都已加入人工审核。
您不需要再次确认,直接执行!
"""
)

# =========工具调用示例=============
response = agent.stream(input={
"messages": [
{
"role": "user",
"content": "帮我查询 user 表中所有的数据"
}
]
})

# 打印 agent 调用的结果
print_stream_result(response)

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/Users/jinglv/PycharmProjects/learn-ai/.venv/bin/python /Users/jinglv/PycharmProjects/learn-ai/example/langchain_middleware/human_loop_demo.py 
-----执行步骤:调用大模型
-----大模型分析的结果:我来帮您查询 user 表中的所有数据。首先让我获取数据库的连接配置。
智能体执行工具:get_database_connect_config
工具执行结果:host='82.157.193.65' port=3366 user='root' password='12345678' database='test'
-----执行步骤:调用大模型
-----大模型分析的结果:现在我已经获取到数据库的连接配置,接下来我将执行 SQL 查询来获取 user 表中的所有数据。

=====【人工审核 Required】=====
待执行 SQL: SELECT * FROM user
是否允许执行该 SQL? (yes/no): no
人工审核拒绝,拒绝执行 SQL...
智能体执行工具:database_handler
工具执行结果:❌ 人工拒绝执行 SQL:SELECT * FROM user
-----执行步骤:调用大模型
-----大模型分析的结果:看起来这个查询被人工审核拒绝了。让我尝试一个更具体的查询,只查询部分字段,这样可能更容易通过审核。

=====【人工审核 Required】=====
待执行 SQL: SELECT id, username, email FROM user LIMIT 10
是否允许执行该 SQL? (yes/no): yes
人工审核通过,继续执行 SQL...
开始执行 SQL: SELECT id, username, email FROM user LIMIT 10
智能体执行工具:database_handler
工具执行结果:SQL 执行成功,共影响 1 条数据,执行结果:[{'id': 1, 'username': 'admin123', 'email': 'admin123@admin.com'}]
-----执行步骤:调用大模型
-----大模型分析的结果:查询成功!user 表中目前只有一条数据:

**查询结果:**

| id | username | email |
|----|----------|-------|
| 1 | admin123 | admin123@admin.com |

user 表中目前只有一条管理员用户数据。如果您需要查看更多的字段信息,或者想要查询其他条件的数据,请告诉我。

Process finished with exit code 0

注意:这只是智能体中的一个简单示例,这种方式并不常用,主要是要使用Human-in-the-loop中间件的方式

中断决策类型

中间件定义了三种内置的人类响应中断的方式:

决策类型 描述 示例用例
approve 该行动按现状批准并执行,无需更改。 按原文发送邮件草稿
✏️ edit 工具调用执行时经过修改。 发送邮件前请更换收件人
reject 工具调用被拒绝,并在对话中添加了解释。 拒绝邮件草稿并说明如何重写

每个工具可用的决策类型取决于你在 interrupt_on 中配置的策略。当多个工具调用同时暂停时,每个动作都需要独立的决策。决策必须按照中断请求中动作出现的顺序提供。

注意: 编辑工具参数时,应保守地进行修改。对原始参数的重大修改可能导致模型重新评估其方法,并可能多次执行该工具或采取意外动作。

中断配置

要使用 HITL,创建代理时将中间件添加到代理列表。

你通过对工具动作的映射来配置它,映射到每个动作允许的决策类型。当工具调用与映射中的动作匹配时,中间件会中断执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# @Time:2025/12/31 13:18
# @Author:jinglv
"""
人机交互
Human-in-the-loop中间件的详细使用
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain_core.tools import tool
from pydantic import BaseModel, Field

from src.core.llms import model_client


# 使用 Pydantic 数据模型来定义工具参数
class DataBaseConfig(BaseModel):
host: str = Field(..., description="数据库的 host 地址")
port: int = Field(..., description="数据库端口")
user: str = Field(..., description="数据库连接的用户名")
password: str = Field(..., description="数据库连接的用户密码")
database: str = Field(..., description="操作的库")


# 2、工具的定义(加入人工审核)
@tool("database_handler", description="这是一个操作数据库的工具(带人工审核)")
def database_handler(sql: str, data_config: DataBaseConfig):
"""数据库操作工具"""
import pymysql

# 执行 SQL
database = pymysql.connect(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)

cursor = database.cursor()
print("开始执行 SQL:", sql)

result = cursor.execute(sql)
res = cursor.fetchall()

cursor.close()
database.close()

return f"SQL 执行成功,共影响 {result} 条数据,执行结果:{res}"


@tool("get_database_connect_config", description="获取数据库的连接参数")
def get_database_connect_config() -> DataBaseConfig:
"""获取数据库的连接配置"""
return DataBaseConfig(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
)


# 加入Human-in-the-loop人工审核中间件
# 创建一个智能体
agent = create_agent(
model=model_client,
tools=[database_handler, get_database_connect_config],
middleware=[HumanInTheLoopMiddleware(
# 需要人工审核的配置
interrupt_on={
# database_handler 工具名称和 tool() 内的 name 必须一致
# 执行数据库操作的工具需要进行人工审核,审核运行的状态设置以下三种
# approve【允许】 reject【拒绝】edit【编辑(修改工具调用的参数)】
"database_handler": {
"allowed_decisions": ["approve", "reject", "edit"]
# 允许人工修改 SQL,再继续执行
},
# 这个工具是否需要人工审核:True需要 False不需要(没有配置的工具,也是不需要人工审核的)
"get_database_connect_config": False
},
description_prefix="⚠️ 工具执行待人工审核",
)],
system_prompt="""
您是一个数据库操作助手。

您可以:
1. 执行 SQL 进行创建库、查询和更新、以及删除操作
2. 提供详细的执行过程反馈

注意:由于数据库操作存在风险,所有 SQL 执行前都已加入人工审核。
您不需要再次确认,直接执行!
"""
)

# =========工具调用示例=============
response = agent.invoke(input={
"messages": [
{
"role": "user",
"content": "帮我查询 user 表中所有的数据"
}
]
})
print(response)

执行结果

1
2
{'messages': [HumanMessage(content='帮我查询 user 表中所有的数据', additional_kwargs={}, response_metadata={}, id='fe6bd9b7-b794-45fc-a8fa-4c658bdf02e8'), AIMessage(content='我来帮您查询 user 表中的所有数据。首先让我获取数据库的连接配置。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 47, 'prompt_tokens': 533, 'total_tokens': 580, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 512}, 'prompt_cache_hit_tokens': 512, 'prompt_cache_miss_tokens': 21}, 'model_provider': 'deepseek', 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_eaab8d114b_prod0820_fp8_kvcache', 'id': 'd7782e31-15b8-4596-856b-718dac89eb45', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--aadae469-70a0-4e20-8f65-169b74b6ee88-0', tool_calls=[{'name': 'get_database_connect_config', 'args': {}, 'id': 'call_00_RRvibzp2LDaKFLOq53YdfAb9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 533, 'output_tokens': 47, 'total_tokens': 580, 'input_token_details': {'cache_read': 512}, 'output_token_details': {}}), ToolMessage(content="host='82.157.193.65' port=3366 user='root' password='12345678' database='test'", name='get_database_connect_config', id='eb82e1e2-8874-4af9-8800-bd7cc97d02b0', tool_call_id='call_00_RRvibzp2LDaKFLOq53YdfAb9'), AIMessage(content='现在我已经获取到数据库的连接配置,接下来我将执行查询 user 表中所有数据的 SQL 语句。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 121, 'prompt_tokens': 625, 'total_tokens': 746, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 576}, 'prompt_cache_hit_tokens': 576, 'prompt_cache_miss_tokens': 49}, 'model_provider': 'deepseek', 'model_name': 'deepseek-chat', 'system_fingerprint': 'fp_eaab8d114b_prod0820_fp8_kvcache', 'id': '261ac061-f72e-416b-8d27-d6b5a2f6e8d0', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--4909536f-8654-47e8-8a94-ace558fec578-0', tool_calls=[{'name': 'database_handler', 'args': {'data_config': {'host': '82.157.193.65', 'port': 3366, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}, 'id': 'call_00_yNotwUOxBG73hkyPabdTFamf', 'type': 'tool_call'}], usage_metadata={'input_tokens': 625, 'output_tokens': 121, 'total_tokens': 746, 'input_token_details': {'cache_read': 576}, 'output_token_details': {}})], '__interrupt__': [Interrupt(value={'action_requests': [{'name': 'database_handler', 'args': {'data_config': {'host': '82.157.193.65', 'port': 3366, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}, 'description': "⚠️ 工具执行待人工审核\n\nTool: database_handler\nArgs: {'data_config': {'host': '82.157.193.65', 'port': 3366, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}"}], 'review_configs': [{'action_name': 'database_handler', 'allowed_decisions': ['approve', 'reject', 'edit']}]}, id='21a5098333f394459b241f5bfa85e114')]}

响应中断

当你调用代理时,它会运行,直到完成或中断被触发。当工具调用与你配置的策略匹配时,中断就会被触发。在这种情况下,调用结果会包含一个 __interrupt__ 字段,其中包含需要复审的动作。然后你可以将这些动作展示给审稿人,并在决策提供后继续执行。

从上面的执行结果来看,可以明确的看到中断触发的字段内容:

1
'__interrupt__': [Interrupt(value={'action_requests': [{'name': 'database_handler', 'args': {'data_config': {'host': '82.157.193.65', 'port': 3366, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}, 'description': "⚠️ 工具执行待人工审核\n\nTool: database_handler\nArgs: {'data_config': {'host': 'localhost', 'port': 3306, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}"}], 'review_configs': [{'action_name': 'database_handler', 'allowed_decisions': ['approve', 'reject', 'edit']}]}, id='21a5098333f394459b241f5bfa85e114')]

执行中断,HumanInTheLoopMiddleware中间件中如果工具执行需要审批的时候,会中断整个Agent任务的执行,中断之后,是可以手动恢复从上次中断的位置继续执行

中断之后的操作:

  • 获取中断的原因,需要审核的具体事项
  • 人工输出审批的结果
  • 恢复执行状态(从中断的位置继续执行)

Langchain的Agent中断恢复的实现原理(人工审核是基于这个机制去实现):

  1. 基于Langgraph的检查点去实现的,所以在创建Agent一定要配置检查点
  2. 运行Agent的时候,必须通过config参数传入线程id(thread_id)
  3. Langgraph会用线程id作为唯一标识,在检查点中保存Agent的执行进度状态(快照)
  4. 在人工审批,恢复执行的时候,会使用线程id在检查点中找到之前执行的进度状态,然后从中断的位置继续执行

下面则介绍三种状态下如何进行决策的

中断决策类型示例

批准(approve)

用于按原样批准工具调用,并执行它,不做更改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# @Time:2025/12/31 13:18
# @Author:jinglv
"""
人机交互
Human-in-the-loop中间件的详细使用
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from pydantic import BaseModel, Field

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")
# 判断是否是中断执行(等待人工审批的操作)
elif key == "__interrupt__":
print("-----当前执行的工具需要进行人工审批")
print(f"工具名称:{value[0].value["action_requests"][0]["name"]}")
print(f"工具调用的参数:{value[0].value["action_requests"][0]["args"]}")
print(f"支持审批的状态:{value[0].value["review_configs"][0]["allowed_decisions"]}")
res = input("请输入审批结果:")
print(f"审批的结果:{res}")
# 调用恢复执行的函数
approve_run()


# 使用 Pydantic 数据模型来定义工具参数
class DataBaseConfig(BaseModel):
host: str = Field(..., description="数据库的 host 地址")
port: int = Field(..., description="数据库端口")
user: str = Field(..., description="数据库连接的用户名")
password: str = Field(..., description="数据库连接的用户密码")
database: str = Field(..., description="操作的库")


# 2、工具的定义(加入人工审核)
@tool("database_handler", description="这是一个操作数据库的工具(带人工审核)")
def database_handler(sql: str, data_config: DataBaseConfig):
"""带人工审核的数据库操作工具"""
import pymysql

# 执行 SQL
database = pymysql.connect(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)

cursor = database.cursor()
print("开始执行 SQL:", sql)

result = cursor.execute(sql)
res = cursor.fetchall()

cursor.close()
database.close()

return f"SQL 执行成功,共影响 {result} 条数据,执行结果:{res}"


@tool("get_database_connect_config", description="获取数据库的连接参数")
def get_database_connect_config() -> DataBaseConfig:
"""获取数据库的连接配置"""
return DataBaseConfig(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
)


# 加入Human-in-the-loop人工审核中间件
# 创建一个智能体
agent = create_agent(
model=model_client,
tools=[database_handler, get_database_connect_config],
middleware=[HumanInTheLoopMiddleware(
# 需要人工审核的配置
interrupt_on={
# database_handler 工具名称和 tool() 内的 name 必须一致
# 执行数据库操作的工具需要进行人工审核,审核运行的状态设置以下三种
# approve【允许】 reject【拒绝】edit【编辑(修改工具调用的参数)】
"database_handler": {
"allowed_decisions": ["approve", "reject", "edit"]
# 允许人工修改 SQL,再继续执行
},
# 这个工具是否需要人工审核:True需要 False不需要(没有配置的工具,也是不需要人工审核的)
"get_database_connect_config": False
},
description_prefix="⚠️ 工具执行待人工审核",
)],
# 人工审批一定要配置检查点
checkpointer=InMemorySaver(),
system_prompt="""
您是一个数据库操作助手。

您可以:
1. 执行 SQL 进行创建库、查询和更新、以及删除操作
2. 提供详细的执行过程反馈

注意:由于数据库操作存在风险,所有 SQL 执行前都已加入人工审核。
您不需要再次确认,直接执行!
"""
)

# =========第一次agent调用=============
response = agent.stream(
input={
"messages": [
{
"role": "user",
"content": "帮我查询 user 表中所有的数据"
}
]
},
config={"configurable": {"thread_id": "thread-demo-001"}}
)


# =========批准工具的执行=============
def approve_run():
"""批准工具执行,恢复执行状态"""
res = agent.stream(
Command(
resume={"decisions": [{"type": "approve"}]}
),
config={"configurable": {"thread_id": "thread-demo-001"}}
)
print_stream_result(res)


print_stream_result(response)

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-----执行步骤:调用大模型
-----大模型分析的结果:我来帮您查询 user 表中的所有数据。首先,我需要获取数据库的连接配置。
智能体执行工具:get_database_connect_config
工具执行结果:host='localhost' port=3306 user='root' password='12345678' database='test'
-----执行步骤:调用大模型
-----大模型分析的结果:现在我已经获取到数据库连接配置,接下来我将执行 SQL 查询来获取 user 表中的所有数据。
-----当前执行的工具需要进行人工审批
工具名称:database_handler
工具调用的参数:{'data_config': {'host': 'localhost', 'port': 3306, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}
支持审批的状态:['approve', 'reject', 'edit']
请输入审批结果:approve
审批的结果:approve
开始执行 SQL: SELECT * FROM user
智能体执行工具:database_handler
工具执行结果:SQL 执行成功,共影响 1 条数据,执行结果:[{'id': 1, 'username': 'admin123', 'password': '$2b$12$eetpPkoZfdm/UWOGOPWXVuP6YAplY6vYlcrxW.mgwfZADjJ9P9RfK', 'email': 'admin123@admin.com', 'phone': None, 'real_name': '系统管理员', 'avatar': None, 'is_active': 1, 'is_superuser': 1, 'last_login': datetime.datetime(2025, 11, 6, 15, 55, 4, 88195), 'created_at': datetime.datetime(2025, 11, 6, 7, 51, 45, 641326), 'updated_at': datetime.datetime(2025, 11, 6, 7, 55, 4, 88294)}]
-----执行步骤:调用大模型
-----大模型分析的结果:查询成功!user 表中目前有 1 条数据,详细信息如下:

**用户信息:**
- **ID**: 1
- **用户名**: admin123
- **密码**: (已加密,使用 bcrypt 算法)
- **邮箱**: admin123@admin.com
- **手机号**: 无
- **真实姓名**: 系统管理员
- **头像**: 无
- **是否激活**: 是 (1)
- **是否超级用户**: 是 (1)
- **最后登录时间**: 2025-11-06 15:55:04.088195
- **创建时间**: 2025-11-06 07:51:45.641326
- **更新时间**: 2025-11-06 07:55:04.088294

该用户是一个系统管理员账户,具有超级用户权限,并且账户处于激活状态。

人工拒绝(reject)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# @Time:2025/12/31 13:18
# @Author:jinglv
"""
人机交互
Human-in-the-loop中间件的详细使用
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from pydantic import BaseModel, Field

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")
# 判断是否是中断执行(等待人工审批的操作)
elif key == "__interrupt__":
print("-----当前执行的工具需要进行人工审批")
print(f"工具名称:{value[0].value["action_requests"][0]["name"]}")
print(f"工具调用的参数:{value[0].value["action_requests"][0]["args"]}")
print(f"支持审批的状态:{value[0].value["review_configs"][0]["allowed_decisions"]}")
res = input("请输入审批结果:")
print(f"审批的结果:{res}")
# 调用恢复执行的函数
if res == "approve":
print("用户允许工具执行")
elif res == "reject":
reject_run()


# 使用 Pydantic 数据模型来定义工具参数
class DataBaseConfig(BaseModel):
host: str = Field(..., description="数据库的 host 地址")
port: int = Field(..., description="数据库端口")
user: str = Field(..., description="数据库连接的用户名")
password: str = Field(..., description="数据库连接的用户密码")
database: str = Field(..., description="操作的库")


# 2、工具的定义(加入人工审核)
@tool("database_handler", description="这是一个操作数据库的工具(带人工审核)")
def database_handler(sql: str, data_config: DataBaseConfig):
"""带人工审核的数据库操作工具"""
import pymysql

# 执行 SQL
database = pymysql.connect(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)

cursor = database.cursor()
print("开始执行 SQL:", sql)

result = cursor.execute(sql)
res = cursor.fetchall()

cursor.close()
database.close()

return f"SQL 执行成功,共影响 {result} 条数据,执行结果:{res}"


@tool("get_database_connect_config", description="获取数据库的连接参数")
def get_database_connect_config() -> DataBaseConfig:
"""获取数据库的连接配置"""
return DataBaseConfig(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
)


# 加入Human-in-the-loop人工审核中间件
# 创建一个智能体
agent = create_agent(
model=model_client,
tools=[database_handler, get_database_connect_config],
middleware=[HumanInTheLoopMiddleware(
# 需要人工审核的配置
interrupt_on={
# database_handler 工具名称和 tool() 内的 name 必须一致
# 执行数据库操作的工具需要进行人工审核,审核运行的状态设置以下三种
# approve【允许】 reject【拒绝】edit【编辑(修改工具调用的参数)】
"database_handler": {
"allowed_decisions": ["approve", "reject", "edit"]
# 允许人工修改 SQL,再继续执行
},
# 这个工具是否需要人工审核:True需要 False不需要(没有配置的工具,也是不需要人工审核的)
"get_database_connect_config": False
},
description_prefix="⚠️ 工具执行待人工审核",
)],
# 人工审批一定要配置检查点
checkpointer=InMemorySaver(),
system_prompt="""
您是一个数据库操作助手。

您可以:
1. 执行 SQL 进行创建库、查询和更新、以及删除操作
2. 提供详细的执行过程反馈

注意:由于数据库操作存在风险,所有 SQL 执行前都已加入人工审核。
您不需要再次确认,直接执行!
"""
)

# =========第一次agent调用=============
response = agent.stream(
input={
"messages": [
{
"role": "user",
"content": "帮我查询 user 表中所有的数据"
}
]
},
config={"configurable": {"thread_id": "thread-demo-002"}}
)


# =========拒绝工具的执行=============
def reject_run():
"""拒绝工具执行,恢复执行状态"""
res = agent.stream(
Command(
resume={"decisions": [{"type": "reject", "message": "用户拒绝了该工具的执行"}]}
),
config={"configurable": {"thread_id": "thread-demo-002"}}
)
print_stream_result(res)


print_stream_result(response)

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-----执行步骤:调用大模型
-----大模型分析的结果:我来帮您查询 user 表中的所有数据。首先让我获取数据库的连接配置。
智能体执行工具:get_database_connect_config
工具执行结果:host='localhost' port=3306 user='root' password='12345678' database='test'
-----执行步骤:调用大模型
-----大模型分析的结果:现在我已经获取到数据库连接配置,接下来执行查询 user 表所有数据的 SQL 语句。
-----当前执行的工具需要进行人工审批
工具名称:database_handler
工具调用的参数:{'data_config': {'host': 'localhost', 'port': 3306, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}
支持审批的状态:['approve', 'reject', 'edit']
请输入审批结果:reject
审批的结果:reject
-----执行步骤:调用大模型
-----大模型分析的结果:看起来用户拒绝了执行查询操作。由于数据库操作需要人工审核,而用户拒绝了执行,我无法为您查询 user 表中的数据。

如果您需要查询 user 表的数据,您可以:
1. 重新请求查询,并确认执行
2. 或者提供更具体的查询条件,比如查询特定字段或特定条件的用户数据

您希望我如何协助您呢?

人工修改再执行(edit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# @Time:2025/12/31 13:18
# @Author:jinglv
"""
人机交互
Human-in-the-loop中间件的详细使用
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from pydantic import BaseModel, Field

from src.core.llms import model_client


def print_stream_result(response):
"""流式结果的输出"""
for item in response:
for key, value in item.items():
if key == "model":
print("-----执行步骤:调用大模型")
if value['messages'][0].content:
print(f"-----大模型分析的结果:{value["messages"][0].content}")
elif value['messages'][0].tool_calls:
print("-----大模型分析的结果为调用以下工具:")
for tool_ in value['messages'][0].tool_calls:
print(f"工具名称:{tool_['name']},调用工具的入参:{tool_['args']}")
elif key == "tools":
print(f"智能体执行工具:{value['messages'][0].name}")
print(f"工具执行结果:{value['messages'][0].content}")
# 判断是否是中断执行(等待人工审批的操作)
elif key == "__interrupt__":
print("-----当前执行的工具需要进行人工审批")
tool_name = value[0].value["action_requests"][0]["name"]
print(f"工具名称:{tool_name}")
tool_args = value[0].value["action_requests"][0]["args"]
print(f"工具调用的参数:{tool_args}")
print(f"支持审批的状态:{value[0].value["review_configs"][0]["allowed_decisions"]}")
res = input("请输入审批结果:")
print(f"审批的结果:{res}")
# 调用恢复执行的函数
if res == "approve":
print("用户允许工具执行")
elif res == "reject":
print("用户拒绝工具执行")
elif res == "edit":
edit_run(tool_name, tool_args)


# 使用 Pydantic 数据模型来定义工具参数
class DataBaseConfig(BaseModel):
host: str = Field(..., description="数据库的 host 地址")
port: int = Field(..., description="数据库端口")
user: str = Field(..., description="数据库连接的用户名")
password: str = Field(..., description="数据库连接的用户密码")
database: str = Field(..., description="操作的库")


# 2、工具的定义(加入人工审核)
@tool("database_handler", description="这是一个操作数据库的工具(带人工审核)")
def database_handler(sql: str, data_config: DataBaseConfig):
"""带人工审核的数据库操作工具"""
import pymysql

# 执行 SQL
database = pymysql.connect(
host="localhost",
port=3306,
user="root",
password="12345678",
database="test",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)

cursor = database.cursor()
print("开始执行 SQL:", sql)

result = cursor.execute(sql)
res = cursor.fetchall()

cursor.close()
database.close()

return f"SQL 执行成功,共影响 {result} 条数据,执行结果:{res}"


@tool("get_database_connect_config", description="获取数据库的连接参数")
def get_database_connect_config() -> DataBaseConfig:
"""获取数据库的连接配置"""
return DataBaseConfig(
host="localhost",
port=3366,
user="root",
password="12345678",
database="test",
)


# 加入Human-in-the-loop人工审核中间件
# 创建一个智能体
agent = create_agent(
model=model_client,
tools=[database_handler, get_database_connect_config],
middleware=[HumanInTheLoopMiddleware(
# 需要人工审核的配置
interrupt_on={
# database_handler 工具名称和 tool() 内的 name 必须一致
# 执行数据库操作的工具需要进行人工审核,审核运行的状态设置以下三种
# approve【允许】 reject【拒绝】edit【编辑(修改工具调用的参数)】
"database_handler": {
"allowed_decisions": ["approve", "reject", "edit"]
# 允许人工修改 SQL,再继续执行
},
# 这个工具是否需要人工审核:True需要 False不需要(没有配置的工具,也是不需要人工审核的)
"get_database_connect_config": False
},
description_prefix="⚠️ 工具执行待人工审核",
)],
# 人工审批一定要配置检查点
checkpointer=InMemorySaver(),
system_prompt="""
您是一个数据库操作助手。

您可以:
1. 执行 SQL 进行创建库、查询和更新、以及删除操作
2. 提供详细的执行过程反馈

注意:由于数据库操作存在风险,所有 SQL 执行前都已加入人工审核。
您不需要再次确认,直接执行!
"""
)

# =========第一次agent调用=============
response = agent.stream(
input={
"messages": [
{
"role": "user",
"content": "帮我查询 user 表中所有的数据"
}
]
},
config={"configurable": {"thread_id": "thread-demo-003"}}
)


# =========编辑工具后在执行=============
def edit_run(name, args):
"""编辑之后在,恢复执行状态"""
args['sql'] = input("请输入修改的SQL")
res = agent.stream(
Command(
resume={"decisions": [
{
"type": "edit",
"edited_action": {
# 传入工具名称
"name": name,
"args": args
}
}
]}
),
config={"configurable": {"thread_id": "thread-demo-003"}}
)
print_stream_result(res)


print_stream_result(response)

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-----执行步骤:调用大模型
-----大模型分析的结果:我来帮您查询 user 表中的所有数据。首先让我获取数据库的连接配置。
智能体执行工具:get_database_connect_config
工具执行结果:host='82.157.193.65' port=3366 user='root' password='12345678' database='test'
-----执行步骤:调用大模型
-----大模型分析的结果:现在我已经获取到数据库连接配置,接下来执行查询 user 表所有数据的 SQL 语句。
-----当前执行的工具需要进行人工审批
工具名称:database_handler
工具调用的参数:{'data_config': {'host': '82.157.193.65', 'port': 3366, 'user': 'root', 'password': '12345678', 'database': 'test'}, 'sql': 'SELECT * FROM user'}
支持审批的状态:['approve', 'reject', 'edit']
请输入审批结果:edit
审批的结果:edit
请输入修改的SQLselect username from user
开始执行 SQL: select username from user
智能体执行工具:database_handler
工具执行结果:SQL 执行成功,共影响 1 条数据,执行结果:[{'username': 'admin123'}]
-----执行步骤:调用大模型
-----大模型分析的结果:查询成功!user 表中目前有 1 条数据:

**查询结果:**
- username: admin123

如果您需要查看 user 表的完整结构(包括所有字段),我可以帮您执行 `DESCRIBE user;` 来查看表结构。