三种MCP通讯方式 MCP(Model Context Protocol)协议目前支持三种主要通信方式 ,分别是:
stdio(标准输入输出)
SSE(Server-Sent Events)
Streamable HTTP(流式HTTP)
实现基于stdio的mcp服务 stdio模式mcp服务架构:
第一步:创建mcp server(包含工具能力) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from mcp.server.fastmcp import FastMCPmcp = FastMCP("Match Tools" ) @mcp.tool() def add (a: int , b: int ) -> int : """Add two numbers""" return a + b @mcp.tool() def multiply (a: int , b: int ) -> int : """Multiply two numbers""" return a * b if __name__ == "__main__" : mcp.run(transport="stdio" )
以上代码包含两部分内容:
使用@mcp.tool()装饰器注册的 mcp 工具方法;
使用 mcp.run(transport="stdio")启动 stdio mcp 服务。
第二步:启动mcp server 找到 mcp server 所在文件夹,使用 python 命令启动服务(相当于启动了对 IO 流中 read 和 write 事件的监听):
1 (.venv) jinglv@localhost ai-agent-dev % python demo/mcp/stdio/mcp_stdio_server.py
启动后,一直处于监听状态
第三步:开发mcp client(包含智能体) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import asyncioimport osfrom dotenv import load_dotenvfrom langchain.agents import initialize_agent, AgentTypefrom langchain_mcp_adapters.tools import load_mcp_toolsfrom mcp import StdioServerParameters, ClientSessionfrom mcp.client.stdio import stdio_clientload_dotenv() from langchain_openai import ChatOpenAIqv_llm = ChatOpenAI( model=os.getenv("LLM_MODEL" ), base_url=os.getenv("LLM_BASE_URL" ), api_key=os.getenv("LLM_API_KEY" ), streaming=True , ) async def create_mcp_stdio_client (): """创建MCP客户端""" server_params = StdioServerParameters( command="python" , args=["/Users/jinglv/PycharmProjects/ai-agent-dev/demo/mcp/stdio/mcp_stdio_server.py" ], ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() tools = await load_mcp_tools(session) print (tools) agent = initialize_agent( tools=tools, llm=qv_llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True , ) res = await agent.ainvoke("100+100=?" ) return res if __name__ == '__main__' : res = asyncio.run(create_mcp_stdio_client()) print (f'执行结果:{res} ' )
注意:stdio方式是可以不启动mcp server的,在客户端定义stdio server参数会加载进去的
实现基于sse的mcp服务 注意:sse已被官方废弃,优先使用streamable-http,两者从代码层面来看,差异不大
sse/streamable-http模式mcp服务架构:
第一步:创建mcp server端 与stdio的区别,是将MCP启动修改为sse:transport=”stdio”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from mcp.server.fastmcp import FastMCPmcp = FastMCP("Match Tools" ) @mcp.tool() def add (a: int , b: int ) -> int : """Add two numbers""" return a + b @mcp.tool() def multiply (a: int , b: int ) -> int : """Multiply two numbers""" return a * b if __name__ == "__main__" : mcp.run(transport="sse" )
第二步:启动mcp server 注意:这个是基于网络协议的,需要启动服务
1 (.venv) jinglv@localhost ai-agent-dev % python demo/mcp/sse/mcp_sse_server.py
启动服务会暴露一个端口
1 2 3 4 5 INFO: Started server process [36603] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
第三步:开发mcp client(包含智能体) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import asyncioimport osfrom dotenv import load_dotenvfrom langchain.agents import initialize_agent, AgentTypefrom langchain_mcp_adapters.client import MultiServerMCPClientload_dotenv() from langchain_openai import ChatOpenAIqv_llm = ChatOpenAI( model=os.getenv("LLM_MODEL" ), base_url=os.getenv("LLM_BASE_URL" ), api_key=os.getenv("LLM_API_KEY" ), streaming=True , ) async def create_mcp_sse_client (): client = MultiServerMCPClient({ "mcp" : { "url" : "http://127.0.0.1:8000/sse" , "transport" : "sse" , } }) tools = await client.get_tools() print (tools) agent = initialize_agent( tools=tools, llm=qv_llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True , ) res = await agent.ainvoke("100+100=?" ) return res if __name__ == '__main__' : res = asyncio.run(create_mcp_sse_client()) print (f'执行结果:{res} ' )
实现基于streamable_http的mcp服务 注意:sse已被官方废弃,优先使用streamable-http,两者从代码层面来看,差异不大,和sse的区别也只是服务启动方式和创建客户端方式有差异其他基本一致
mcp server端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from mcp.server.fastmcp import FastMCPmcp = FastMCP("Match Tools" ) @mcp.tool() def add (a: int , b: int ) -> int : """Add two numbers""" return a + b @mcp.tool() def multiply (a: int , b: int ) -> int : """Multiply two numbers""" return a * b if __name__ == "__main__" : mcp.run(transport="streamable-http" )
mcp client端(包含智能体) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import asyncioimport osfrom dotenv import load_dotenvfrom langchain.agents import initialize_agent, AgentTypefrom langchain_mcp_adapters.client import MultiServerMCPClientload_dotenv() from langchain_openai import ChatOpenAIqv_llm = ChatOpenAI( model=os.getenv("LLM_MODEL" ), base_url=os.getenv("LLM_BASE_URL" ), api_key=os.getenv("LLM_API_KEY" ), streaming=True , ) async def create_mcp_streamable_http_client (): client = MultiServerMCPClient({ "mcp" : { "url" : "http://127.0.0.1:8000/mcp" , "transport" : "streamable_http" , } }) tools = await client.get_tools() print (tools) agent = initialize_agent( tools=tools, llm=qv_llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True , ) res = await agent.ainvoke("100+100=?" ) return res if __name__ == '__main__' : res = asyncio.run(create_mcp_streamable_http_client()) print (f'执行结果:{res} ' )
扩展 工具转换为MCP服务 以上示例使用FastMCP定义工具,也可以使用langchain_mcp_adapters.tools.to_fastmcp` 用于将 LangChain 工具转换为 FastMCP 的服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from langchain_core.tools import toolfrom langchain_mcp_adapters.tools import to_fastmcpfrom mcp.server.fastmcp import FastMCP@tool def add (a: int , b: int ) -> int : """将两个数字相加""" return a + b fastmcp_tool = to_fastmcp(add) mcp = FastMCP("Math" , tools=[fastmcp_tool]) mcp.run(transport="stdio" )
MCP在工作流中的使用 以上的示例是介绍了MCP在Agent中的使用,下面的示例是介绍MCP在工作流中的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 import osimport dotenvfrom langchain_mcp_adapters.client import MultiServerMCPClientfrom langchain_openai import ChatOpenAIfrom langgraph.graph import MessagesState, StateGraph, START, ENDfrom langgraph.prebuilt import ToolNodedotenv.load_dotenv() class MCPGraphAgent : def __init__ (self, mcp_config: dict ): """ 初始化MCP Graph Agent Args: mcp_config: MCP客户端配置 """ self .model = ChatOpenAI( model=os.getenv("LLM_MODEL" ), base_url=os.getenv("LLM_BASE_URL" ), api_key=os.getenv("LLM_API_KEY" ), streaming=True , ) self .client = MultiServerMCPClient(mcp_config) self .tools = None self .model_with_tools = None self .tool_node = None self .graph = None async def initialize (self ): """ 初始化agent,获取工具并构建图结构 """ self .tools = await self .client.get_tools() print ("工具列表:" , self .tools) self .model_with_tools = self .model.bind_tools(self .tools) self .tool_node = ToolNode(self .tools) builder = StateGraph(MessagesState) builder.add_node("call_model" , self ._call_model) builder.add_node("tools" , self .tool_node) builder.add_edge(START, "call_model" ) builder.add_conditional_edges( "call_model" , self ._should_continue, ) builder.add_edge("tools" , "call_model" ) self .graph = builder.compile () def _should_continue (self, state: MessagesState ): """ 条件判断函数:判断模型响应是否包含 tool_calls(即是否需要调用工具) """ messages = state["messages" ] last_message = messages[-1 ] if last_message.tool_calls: return "tools" return END async def _call_model (self, state: MessagesState ): """ 模型调用函数:向模型发送历史消息并获取响应 """ messages = state["messages" ] response = await self .model_with_tools.ainvoke(messages) return {"messages" : [response]} async def query (self, content: str ) -> dict : """ 执行查询 Args: content: 查询内容 Returns: 查询结果 """ if not self .graph: raise RuntimeError("Agent未初始化,请先调用initialize()方法" ) response = await self .graph.ainvoke( {"messages" : [{"role" : "user" , "content" : content}]} ) return response async def main (): mcp_config = { "math" : { "command" : "python" , "args" : ["/Users/jinglv/PycharmProjects/ai-agent-dev/demo/mcp/stdio/mcp_stdio_server.py" ], "transport" : "stdio" , } } agent = MCPGraphAgent(mcp_config) await agent.initialize() math_response = await agent.query("计算一下 (3 + 5) x 12的结果?" ) print ("math_response:" , math_response) if __name__ == '__main__' : import asyncio asyncio.run(main())