Jean's Blog

一个专注软件测试开发技术的个人博客

0%

MCP通信方式

三种MCP通讯方式

MCP(Model Context Protocol)协议目前支持三种主要通信方式,分别是:

stdio(标准输入输出)

  • 工作原理:

    通过本地进程的标准输入(stdin)和标准输出(stdout)进行通信。客户端以子进程的形式启动MCP服务器,双方通过管道交换JSON-RPC格式的消息,消息以换行符分隔。

  • 适用场景:

    • 本地进程间通信(如命令行工具、文件系统操作)。
    • 简单的批处理任务或工具调用。
  • 优点:

    • 实现简单,低延迟。
    • 无需网络配置,适合本地开发。
  • 限制:

    • 仅限本地使用,不支持分布式部署。
    • 服务端不能输出控制台日志(会污染协议流)。
  • 示例配置

    1
    2
    3
    4
    5
    6
    7
    {
    "math": {
    "command": "python",
    "args": ["/absolute/path/to/math_server.py"],
    "transport": "stdio",
    }
    }

SSE(Server-Sent Events)

  • 工作原理:

    基于HTTP长连接实现服务器到客户端的单向消息推送。客户端通过GET /sse建立长连接,服务器通过SSE流发送JSON-RPC消息;客户端通过POST /message发送请求或响应。

  • 适用场景:

    • 远程服务调用(如云服务、多客户端监控)。
    • 需要实时数据推送的场景(如对话式AI的流式输出)。
  • 优点:

    • 支持实时单向推送,适合流式交互。
  • 限制:

    • 已逐步被弃用(2025年3月后被Streamable HTTP取代)。
    • 连接中断后无法恢复,需重新建立。
    • 服务器需维持长连接,资源消耗较高。
  • 示例配置

    1
    2
    3
    4
    5
    6
    {
    "weather": {
    "url": "http://localhost:8000/sse",
    "transport": "sse",
    }
    }

Streamable HTTP(流式HTTP)

  • 工作原理:

    2025年3月引入的新传输方式,替代了SSE。通过统一的/message端点实现双向通信,支持以下特性:

    • 客户端通过HTTP POST发送请求(如工具调用)。
    • 服务器可将响应升级为SSE流式传输(当需要时)。
    • 支持无状态模式(Stateless Server),无需维持长连接。
  • 适用场景:

    • 高并发远程服务调用。
    • 需要灵活流式响应的场景(如AI助手的动态输出)。
  • 优点:

    • 解决SSE的缺陷:
      • 支持连接恢复(无需重新开始)。
      • 无需服务器维持长连接,降低资源压力。
      • 统一端点(/message),简化接口设计。
    • 兼容基础设施(如中间件、负载均衡)。
  • 推荐使用:

    当前MCP官方推荐的传输方式,尤其适合生产环境和云服务。

  • 示例配置

    1
    2
    3
    4
    5
    6
    {
    "weather": {
    "url": "http://localhost:8000/mcp",
    "transport": "streamable_http",
    }
    }

实现基于stdio的mcp服务

stdio模式mcp服务架构:

image-20250916171800738

第一步:创建mcp server(包含工具能力)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# @Time:2025/9/16 17:19
# @Author:jinglv
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Match Tools")


@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b


@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b


if __name__ == "__main__":
mcp.run(transport="stdio")

以上代码包含两部分内容:

  1. 使用@mcp.tool()装饰器注册的 mcp 工具方法;
  2. 使用 mcp.run(transport="stdio")启动 stdio mcp 服务。

第二步:启动mcp server

找到 mcp server 所在文件夹,使用 python 命令启动服务(相当于启动了对 IO 流中 read 和 write 事件的监听):

1
(.venv) jinglv@localhost ai-agent-dev % python demo/mcp/stdio/mcp_stdio_server.py

启动后,一直处于监听状态

第三步:开发mcp client(包含智能体)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# @Time:2025/9/16 17:25
# @Author:jinglv
import asyncio
import os

from dotenv import load_dotenv
from langchain.agents import initialize_agent, AgentType
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import StdioServerParameters, ClientSession
from mcp.client.stdio import stdio_client

# 加载 .env 文件中的变量
load_dotenv()
from langchain_openai import ChatOpenAI

qv_llm = ChatOpenAI(
model=os.getenv("LLM_MODEL"),
base_url=os.getenv("LLM_BASE_URL"),
api_key=os.getenv("LLM_API_KEY"),
streaming=True,
)


async def create_mcp_stdio_client():
"""创建MCP客户端"""
# 定义stdio server参数
server_params = StdioServerParameters(
command="python",
args=["/Users/jinglv/PycharmProjects/ai-agent-dev/demo/mcp/stdio/mcp_stdio_server.py"],
)

# 读取 stdio mcp tools
async with stdio_client(server_params) as (read, write):
# 设置会话,并初始化
async with ClientSession(read, write) as session:
await session.initialize()
tools = await load_mcp_tools(session)
print(tools)

# 定义智能体,加载 mcp tools
agent = initialize_agent(
tools=tools,
llm=qv_llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
res = await agent.ainvoke("100+100=?")
return res


if __name__ == '__main__':
res = asyncio.run(create_mcp_stdio_client())
print(f'执行结果:{res}')

注意:stdio方式是可以不启动mcp server的,在客户端定义stdio server参数会加载进去的

实现基于sse的mcp服务

注意:sse已被官方废弃,优先使用streamable-http,两者从代码层面来看,差异不大

sse/streamable-http模式mcp服务架构:

image-20250916184135754

第一步:创建mcp server端

与stdio的区别,是将MCP启动修改为sse:transport=”stdio”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# @Time:2025/9/16 18:43
# @Author:jinglv
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Match Tools")


@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b


@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b


if __name__ == "__main__":
mcp.run(transport="sse")

第二步:启动mcp server

注意:这个是基于网络协议的,需要启动服务

1
(.venv) jinglv@localhost ai-agent-dev % python demo/mcp/sse/mcp_sse_server.py 

启动服务会暴露一个端口

1
2
3
4
5
INFO:     Started server process [36603]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

第三步:开发mcp client(包含智能体)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# @Time:2025/9/16 18:45
# @Author:jinglv
import asyncio
import os

from dotenv import load_dotenv
from langchain.agents import initialize_agent, AgentType
from langchain_mcp_adapters.client import MultiServerMCPClient

# 加载 .env 文件中的变量
load_dotenv()
from langchain_openai import ChatOpenAI

qv_llm = ChatOpenAI(
model=os.getenv("LLM_MODEL"),
base_url=os.getenv("LLM_BASE_URL"),
api_key=os.getenv("LLM_API_KEY"),
streaming=True,
)


async def create_mcp_sse_client():
# 创建MCP客户端
client = MultiServerMCPClient({
"mcp": {
"url": "http://127.0.0.1:8000/sse",
"transport": "sse",
}
})
tools = await client.get_tools()
print(tools)

agent = initialize_agent(
tools=tools,
llm=qv_llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
res = await agent.ainvoke("100+100=?")
return res


if __name__ == '__main__':
res = asyncio.run(create_mcp_sse_client())
print(f'执行结果:{res}')

实现基于streamable_http的mcp服务

注意:sse已被官方废弃,优先使用streamable-http,两者从代码层面来看,差异不大,和sse的区别也只是服务启动方式和创建客户端方式有差异其他基本一致

mcp server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# @Time:2025/9/16 18:54
# @Author:jinglv
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Match Tools")


@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b


@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b


if __name__ == "__main__":
mcp.run(transport="streamable-http")

mcp client端(包含智能体)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# @Time:2025/9/16 18:54
# @Author:jinglv
import asyncio
import os

from dotenv import load_dotenv
from langchain.agents import initialize_agent, AgentType
from langchain_mcp_adapters.client import MultiServerMCPClient

# 加载 .env 文件中的变量
load_dotenv()
from langchain_openai import ChatOpenAI

qv_llm = ChatOpenAI(
model=os.getenv("LLM_MODEL"),
base_url=os.getenv("LLM_BASE_URL"),
api_key=os.getenv("LLM_API_KEY"),
streaming=True,
)


async def create_mcp_streamable_http_client():
# 创建MCP客户端
client = MultiServerMCPClient({
"mcp": {
"url": "http://127.0.0.1:8000/mcp",
"transport": "streamable_http",
}
})
tools = await client.get_tools()
print(tools)

agent = initialize_agent(
tools=tools,
llm=qv_llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
res = await agent.ainvoke("100+100=?")
return res


if __name__ == '__main__':
res = asyncio.run(create_mcp_streamable_http_client())
print(f'执行结果:{res}')

扩展

工具转换为MCP服务

以上示例使用FastMCP定义工具,也可以使用langchain_mcp_adapters.tools.to_fastmcp` 用于将 LangChain 工具转换为 FastMCP 的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from langchain_core.tools import tool
from langchain_mcp_adapters.tools import to_fastmcp
from mcp.server.fastmcp import FastMCP


@tool
def add(a: int, b: int) -> int:
"""将两个数字相加"""
return a + b

# 将工具转换为mcp服务
fastmcp_tool = to_fastmcp(add)
# 创建mcp服务,使用tools参数注入转换后的工具
mcp = FastMCP("Math", tools=[fastmcp_tool])
mcp.run(transport="stdio")

MCP在工作流中的使用

以上的示例是介绍了MCP在Agent中的使用,下面的示例是介绍MCP在工作流中的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# @Time:2025/10/9 10:04
# @Author:jinglv
import os

import dotenv
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import ToolNode

# 加载.env文件中的环境变量
dotenv.load_dotenv()


class MCPGraphAgent:
def __init__(self, mcp_config: dict):
"""
初始化MCP Graph Agent

Args:
mcp_config: MCP客户端配置
"""
# 初始化语言模型
self.model = ChatOpenAI(
model=os.getenv("LLM_MODEL"),
base_url=os.getenv("LLM_BASE_URL"),
api_key=os.getenv("LLM_API_KEY"),
streaming=True,
)

# 配置并启动多个 MCP 工具服务客户端
self.client = MultiServerMCPClient(mcp_config)
self.tools = None
self.model_with_tools = None
self.tool_node = None
self.graph = None

async def initialize(self):
"""
初始化agent,获取工具并构建图结构
"""
# 异步获取工具列表(通过 MCP 协议从服务端动态获取工具定义)
self.tools = await self.client.get_tools()
print("工具列表:", self.tools)

# 将工具绑定到模型,使其具备调用工具的能力
self.model_with_tools = self.model.bind_tools(self.tools)

# 创建 ToolNode,用于根据模型生成的 tool_calls 实际调用工具
self.tool_node = ToolNode(self.tools)

# 构建状态图(StateGraph)
builder = StateGraph(MessagesState)

# 添加节点:模型调用节点
builder.add_node("call_model", self._call_model)

# 添加节点:工具调用节点(通过 MCP 执行工具)
builder.add_node("tools", self.tool_node)

# 添加边:从起始节点 START 跳转到模型调用节点
builder.add_edge(START, "call_model")

# 添加条件边:根据模型输出决定下一步跳转到工具调用或结束
builder.add_conditional_edges(
"call_model",
self._should_continue,
)

# 添加边:工具调用完成后继续调用模型(形成循环,直到无需再调用工具)
builder.add_edge("tools", "call_model")

# 编译图结构
self.graph = builder.compile()

def _should_continue(self, state: MessagesState):
"""
条件判断函数:判断模型响应是否包含 tool_calls(即是否需要调用工具)
"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools" # 如果包含工具调用请求,跳转到 tools 节点执行
return END # 否则流程结束

async def _call_model(self, state: MessagesState):
"""
模型调用函数:向模型发送历史消息并获取响应
"""
messages = state["messages"]
response = await self.model_with_tools.ainvoke(messages)
return {"messages": [response]}

async def query(self, content: str) -> dict:
"""
执行查询

Args:
content: 查询内容

Returns:
查询结果
"""
if not self.graph:
raise RuntimeError("Agent未初始化,请先调用initialize()方法")

response = await self.graph.ainvoke(
{"messages": [{"role": "user", "content": content}]}
)
return response


# 使用示例
async def main():
# MCP配置
mcp_config = {
"math": {
"command": "python",
"args": ["/Users/jinglv/PycharmProjects/ai-agent-dev/demo/mcp/stdio/mcp_stdio_server.py"],
"transport": "stdio",
}
}

# 创建并初始化agent
agent = MCPGraphAgent(mcp_config)
await agent.initialize()

# 测试图流程:提问数学问题,模型应识别并调用 math 工具
math_response = await agent.query("计算一下 (3 + 5) x 12的结果?")
print("math_response:", math_response)


if __name__ == '__main__':
import asyncio

asyncio.run(main())