如果你曾经为多个AI代理之间"各说各话、工具重复造轮子"而苦恼,那么MCP(Model Context Protocol)正是为解决这个问题而生的。本教程将带你从零开始,系统性地设计一个MCP工具库,并在此基础上构建能够协同工作的多Agent系统。

全程以Python代码为主,每个步骤都给出可运行的完整示例,适合已有Python基础、希望进入Agent开发领域的中级开发者。

1. 核心概念:MCP是什么

MCP系统架构概览

MCP(Model Context Protocol)是Anthropic提出的一套开放标准协议,定义了AI模型与外部工具、数据源之间交换上下文信息的标准方式。你可以把它理解为AI世界的"USB接口"——无论哪个AI框架、哪种工具,只要实现了这套协议,就能即插即用。

MCP三层架构

  • MCP Host(宿主应用):Claude Desktop、Cursor、自定义AI应用等。负责发起请求、展示结果。
  • MCP Client(通信桥梁):内嵌于Host中,维护与MCP Server的连接,负责协议握手和消息路由。
  • MCP Server(工具提供方):暴露具体功能接口(工具、资源、提示词),可本地运行也可远程部署。

在多Agent场景下,MCP的价值尤为突出:工具定义一次,所有Agent共享。无需每个Agent单独集成文件系统、数据库、浏览器等工具,只需接入同一个MCP Server即可。

MCP协议已被Claude、OpenAI、Google等主流AI提供商支持。目前主流框架(LangChain、CrewAI、AutoGen)均提供了MCP适配层。

2. 环境搭建与依赖安装

环境搭建流程图
1

创建虚拟环境

推荐使用Python 3.10+,并创建独立虚拟环境避免依赖冲突。

bash
python3 -m venv mcp-env
source mcp-env/bin/activate  # Linux/Mac
# Windows: mcp-env\Scripts\activate

# 验证版本
python --version  # Python 3.10+
2

安装核心依赖

使用pip安装FastAPI(构建MCP Server)、httpx(异步HTTP客户端)、Pydantic(数据验证)等核心库。

bash
pip install fastapi uvicorn httpx pydantic

# 可选:用于多Agent通信
pip install anthropic  # Claude API
pip install a2a-sdk    # Agent-to-Agent协议SDK(如果可用)
3

初始化项目结构

bash
mkdir mcp-multi-agent && cd mcp-multi-agent

# 创建目录结构
mkdir -p server/{tools,resources}
mkdir -p agents
mkdir -p config

touch server/main.py
touch server/registry.py
touch agents/coordinator.py
touch agents/worker.py
touch config/tools.yaml

项目结构如下:

mcp-multi-agent/
├── server/
│   ├── main.py         # MCP Server 主程序
│   ├── registry.py     # 共享工具注册表
│   └── tools/
│       ├── file_tool.py
│       └── search_tool.py
├── agents/
│   ├── coordinator.py  # 协调Agent
│   └── worker.py       # 工作Agent
└── config/
    └── tools.yaml      # 工具配置
如果你使用uv(现代Python包管理器),可以用uv init mcp-multi-agent快速初始化,速度比pip快10倍以上。

3. 搭建MCP Server

MCP Server架构流程图

MCP Server是整个系统的核心,它暴露工具列表和工具调用接口,所有Agent都通过这里获取和执行工具。

3.1 定义核心数据模型

python · server/models.py
from pydantic import BaseModel
from typing import Any, Optional

class Tool(BaseModel):
    name: str
    description: str
    parameters: dict  # JSON Schema格式
    tags: list[str] = []

class ToolCallRequest(BaseModel):
    tool_name: str
    arguments: dict[str, Any]
    caller_id: Optional[str] = None  # 调用方Agent ID

class ToolCallResponse(BaseModel):
    success: bool
    result: Any
    error: Optional[str] = None

3.2 实现MCP Server主体

python · server/main.py
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from .registry import ToolRegistry
from .models import ToolCallRequest, ToolCallResponse

# 全局工具注册表
registry = ToolRegistry()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 服务启动时注册所有工具
    from .tools.file_tool import register_file_tools
    from .tools.search_tool import register_search_tools
    register_file_tools(registry)
    register_search_tools(registry)
    print(f"✅ MCP Server 启动,已注册 {len(registry.tools)} 个工具")
    yield
    print("🔴 MCP Server 关闭")

app = FastAPI(title="MCP Server", version="1.0.0", lifespan=lifespan)

# ── 工具发现接口 ──
@app.get("/tools")
async def list_tools():
    """返回所有可用工具的元数据"""
    return {
        "tools": [
            {
                "name": t.name,
                "description": t.description,
                "parameters": t.parameters,
                "tags": t.tags,
            }
            for t in registry.tools.values()
        ]
    }

# ── 工具调用接口 ──
@app.post("/tools/call", response_model=ToolCallResponse)
async def call_tool(req: ToolCallRequest):
    """调用指定工具并返回结果"""
    tool_fn = registry.get_handler(req.tool_name)
    if not tool_fn:
        raise HTTPException(status_code=404, detail=f"工具 '{req.tool_name}' 不存在")

    try:
        result = await tool_fn(**req.arguments)
        return ToolCallResponse(success=True, result=result)
    except Exception as e:
        return ToolCallResponse(success=False, result=None, error=str(e))

# ── 健康检查 ──
@app.get("/health")
async def health_check():
    return {"status": "ok", "tool_count": len(registry.tools)}

3.3 启动服务器

bash
# 开发模式(支持热重载)
uvicorn server.main:app --host 0.0.0.0 --port 8000 --reload

# 验证
curl http://localhost:8000/health
# 输出: {"status":"ok","tool_count":2}

curl http://localhost:8000/tools
# 输出: {"tools":[{"name":"read_file",...},{"name":"search_web",...}]}

4. 共享工具注册表设计

工具注册表对比设计

共享工具注册表是多Agent系统中避免代码重复的关键。与其让每个Agent各自定义工具,不如将工具统一注册到MCP Server,Agent按需查询和调用。

4.1 实现工具注册表

python · server/registry.py
from typing import Callable, Awaitable, Any
from .models import Tool
import logging

logger = logging.getLogger(__name__)

class ToolRegistry:
    """集中管理所有MCP工具的注册表"""

    def __init__(self):
        self.tools: dict[str, Tool] = {}
        self._handlers: dict[str, Callable[..., Awaitable[Any]]] = {}

    def register(
        self,
        name: str,
        description: str,
        parameters: dict,
        tags: list[str] = [],
    ):
        """装饰器:注册工具元数据和处理函数"""
        def decorator(fn: Callable):
            self.tools[name] = Tool(
                name=name,
                description=description,
                parameters=parameters,
                tags=tags,
            )
            self._handlers[name] = fn
            logger.info(f"📌 已注册工具: {name}")
            return fn
        return decorator

    def get_handler(self, name: str) -> Callable | None:
        return self._handlers.get(name)

4.2 实现具体工具

python · server/tools/file_tool.py
import aiofiles
import os
from ..registry import ToolRegistry

def register_file_tools(registry: ToolRegistry):

    @registry.register(
        name="read_file",
        description="读取本地文件内容,支持文本文件",
        parameters={
            "type": "object",
            "properties": {
                "path": {"type": "string", "description": "文件路径"},
                "encoding": {"type": "string", "default": "utf-8"},
            },
            "required": ["path"],
        },
        tags=["filesystem", "read"],
    )
    async def read_file(path: str, encoding: str = "utf-8") -> dict:
        # 安全检查:禁止路径遍历
        abs_path = os.path.realpath(path)
        allowed_base = os.path.realpath("./workspace")
        if not abs_path.startswith(allowed_base):
            raise PermissionError(f"拒绝访问路径:{path}")

        async with aiofiles.open(abs_path, encoding=encoding) as f:
            content = await f.read()
        return {"path": path, "content": content, "size": len(content)}

    @registry.register(
        name="write_file",
        description="向本地文件写入内容",
        parameters={
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "content": {"type": "string"},
            },
            "required": ["path", "content"],
        },
        tags=["filesystem", "write"],
    )
    async def write_file(path: str, content: str) -> dict:
        async with aiofiles.open(path, "w", encoding="utf-8") as f:
            await f.write(content)
        return {"path": path, "bytes_written": len(content.encode())}
生产环境中务必对文件路径做严格校验,防止路径遍历攻击(如../../etc/passwd)。上方代码已展示基本的realpath检查方式。

5. 多Agent系统构建

多Agent协作架构图

有了MCP Server之后,我们来构建多Agent系统。系统由两种角色组成:协调Agent(Coordinator)负责任务拆解和分发,工作Agent(Worker)负责具体执行,均通过MCP Server调用工具。

5.1 实现MCP Client基类

python · agents/base_agent.py
import httpx
from typing import Any

class MCPClient:
    """封装与MCP Server的所有通信"""

    def __init__(self, server_url: str = "http://localhost:8000"):
        self.server_url = server_url
        self._client = httpx.AsyncClient(timeout=30.0)
        self._tools_cache: list[dict] | None = None

    async def get_tools(self) -> list[dict]:
        """获取(并缓存)可用工具列表"""
        if self._tools_cache is None:
            resp = await self._client.get(f"{self.server_url}/tools")
            resp.raise_for_status()
            self._tools_cache = resp.json()["tools"]
        return self._tools_cache

    async def call_tool(self, tool_name: str, arguments: dict, caller_id: str = "") -> Any:
        """调用指定工具"""
        payload = {
            "tool_name": tool_name,
            "arguments": arguments,
            "caller_id": caller_id,
        }
        resp = await self._client.post(
            f"{self.server_url}/tools/call",
            json=payload,
        )
        resp.raise_for_status()
        data = resp.json()
        if not data["success"]:
            raise RuntimeError(f"工具 {tool_name} 调用失败: {data['error']}")
        return data["result"]

    async def close(self):
        await self._client.aclose()

5.2 实现工作Agent

python · agents/worker.py
import anthropic
import json
from .base_agent import MCPClient

class WorkerAgent:
    """工作Agent:接收任务,调用MCP工具执行,返回结果"""

    def __init__(self, agent_id: str, specialization: str):
        self.agent_id = agent_id
        self.specialization = specialization  # 例如:"file_ops" 或 "search"
        self.mcp = MCPClient()
        self.llm = anthropic.AsyncAnthropic()

    async def execute(self, task: str) -> str:
        """执行任务:让LLM决定调用哪些MCP工具"""

        # 获取工具列表并转为Claude tool格式
        mcp_tools = await self.mcp.get_tools()
        claude_tools = [
            {
                "name": t["name"],
                "description": t["description"],
                "input_schema": t["parameters"],
            }
            for t in mcp_tools
            # 仅使用与本Agent专长相关的工具
            if self.specialization in t.get("tags", [])
        ]

        messages = [{"role": "user", "content": task}]

        while True:
            response = await self.llm.messages.create(
                model="claude-opus-4-6",
                max_tokens=4096,
                tools=claude_tools,
                messages=messages,
            )

            # 无工具调用 → 任务完成
            if response.stop_reason == "end_turn":
                return response.content[0].text

            # 处理工具调用
            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = await self.mcp.call_tool(
                        block.name,
                        block.input,
                        caller_id=self.agent_id,
                    )
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(result, ensure_ascii=False),
                    })

            # 将工具结果反馈给LLM继续推理
            messages.append({"role": "assistant", "content": response.content})
            messages.append({"role": "user", "content": tool_results})

6. Agent间通信(A2A协议)

A2A协议通信流程图

MCP解决了Agent与工具之间的通信问题,而A2A(Agent-to-Agent)协议解决的是Agent与Agent之间的通信问题。Google于2025年提出这一协议,定义了Agent如何发现彼此、委派任务、传递结果。

6.1 实现协调Agent

python · agents/coordinator.py
import asyncio
import anthropic
from .worker import WorkerAgent

class CoordinatorAgent:
    """协调Agent:负责任务分解、分配和结果汇总"""

    def __init__(self):
        self.llm = anthropic.AsyncAnthropic()
        # 注册工作Agent池
        self.workers: dict[str, WorkerAgent] = {
            "file_worker": WorkerAgent("file_worker", "filesystem"),
            "search_worker": WorkerAgent("search_worker", "search"),
        }

    async def run(self, goal: str) -> str:
        """接收高层目标,分解为子任务,并行执行,汇总结果"""

        # Step 1: 让LLM分解任务
        decompose_resp = await self.llm.messages.create(
            model="claude-opus-4-6",
            max_tokens=1024,
            system=(
                "你是一个任务分解专家。将用户目标分解为子任务,"
                "每个子任务指定负责的Agent(file_worker 或 search_worker)。"
                "以JSON格式返回: [{\"agent\":\"...\",\"task\":\"...\"}]"
            ),
            messages=[{"role": "user", "content": f"目标:{goal}"}],
        )

        import json
        subtasks = json.loads(decompose_resp.content[0].text)

        # Step 2: 并行执行子任务
        async def run_subtask(item: dict) -> tuple[str, str]:
            worker = self.workers.get(item["agent"])
            if not worker:
                return item["agent"], f"未知Agent: {item['agent']}"
            result = await worker.execute(item["task"])
            return item["agent"], result

        results = await asyncio.gather(*[run_subtask(t) for t in subtasks])

        # Step 3: 汇总结果
        summary_input = "\n\n".join([
            f"【{agent}的结果】\n{result}"
            for agent, result in results
        ])

        final_resp = await self.llm.messages.create(
            model="claude-opus-4-6",
            max_tokens=2048,
            system="综合所有子Agent的执行结果,生成简洁的最终报告。",
            messages=[{"role": "user", "content": summary_input}],
        )

        return final_resp.content[0].text

6.2 运行完整系统

python · run.py
import asyncio
from agents.coordinator import CoordinatorAgent

async def main():
    coordinator = CoordinatorAgent()

    # 发布一个高层目标
    goal = "读取 ./workspace/report.txt 文件,搜索相关背景信息,生成一份综合摘要"

    print("🚀 多Agent系统启动...")
    print(f"📋 目标: {goal}\n")

    result = await coordinator.run(goal)

    print("✅ 执行完成!")
    print("=" * 50)
    print(result)

if __name__ == "__main__":
    # 确保MCP Server已在8000端口运行
    asyncio.run(main())
在运行run.py之前,确保已在另一个终端启动MCP Server:uvicorn server.main:app --port 8000

7. 生产环境最佳实践

生产实践要点信息图

7.1 权限控制与安全

每个工具调用都应验证调用方的身份和权限,避免恶意Agent滥用高危工具:

python
# 在ToolCallRequest中添加token字段
class ToolCallRequest(BaseModel):
    tool_name: str
    arguments: dict
    caller_id: str
    token: str  # JWT令牌

# 在/tools/call端点添加权限校验中间件
from fastapi import Depends, Header, HTTPException

async def verify_token(x_agent_token: str = Header(...)):
    try:
        payload = jwt.decode(x_agent_token, SECRET_KEY, algorithms=["HS256"])
        return payload["agent_id"]
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="无效的Agent令牌")

7.2 工具调用缓存

对于幂等的查询类工具(如search_web),添加Redis缓存可大幅降低API成本:

python
import hashlib, json
import redis.asyncio as redis

cache = redis.from_url("redis://localhost:6379")

async def cached_tool_call(tool_name: str, arguments: dict, ttl: int = 300):
    # 生成缓存key
    cache_key = f"mcp:{tool_name}:{hashlib.md5(json.dumps(arguments, sort_keys=True).encode()).hexdigest()}"

    # 命中缓存
    cached = await cache.get(cache_key)
    if cached:
        return json.loads(cached)

    # 未命中则执行并缓存
    result = await execute_tool(tool_name, arguments)
    await cache.setex(cache_key, ttl, json.dumps(result))
    return result

7.3 监控与链路追踪

使用OpenTelemetry为每次工具调用添加追踪标识,便于调试多Agent的调用链:

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider

tracer = trace.get_tracer("mcp-server")

@app.post("/tools/call")
async def call_tool(req: ToolCallRequest):
    with tracer.start_as_current_span(
        f"tool.{req.tool_name}",
        attributes={
            "agent.id": req.caller_id,
            "tool.name": req.tool_name,
        }
    ):
        # ... 执行工具
        pass

8. 常见问题与调试

问题 原因 解决方案
Agent连接MCP Server失败(Connection refused) Server未启动或端口被占用 检查uvicorn进程是否在运行,用lsof -i:8000确认端口
工具调用返回422 Unprocessable Entity 请求参数不符合JSON Schema定义 检查parametersrequired字段,确保arguments包含所有必填项
多Agent并发时出现数据竞争 多个Worker同时修改共享资源 在ToolRegistry中对写操作加asyncio.Lock()互斥锁
LLM无限循环调用工具 任务描述不明确,LLM无法判断何时结束 在Worker中设置最大工具调用轮次(如10次),超出后强制返回当前结果
工具缓存返回过期数据 TTL设置过长或数据源已更新 对时效性强的工具(如新闻搜索)设置短TTL(60秒),或提供缓存清除接口
调试多Agent系统时,建议先关闭并发(把asyncio.gather改为顺序执行),待逻辑正确后再开启并行,以降低排查难度。

总结

本教程带你完整实现了一套MCP工具库 + 多Agent协作系统:从MCP Server搭建、共享工具注册表设计,到工作Agent与协调Agent的实现,再到A2A协议通信和生产环境加固。核心思想是工具中心化 + Agent专注编排,让系统具备良好的可扩展性和可维护性。下一步,你可以尝试接入更多工具(数据库、浏览器自动化、代码执行沙箱),或引入CrewAI/LangGraph等框架进行更复杂的工作流编排。