首页 / AI Agent开发 / Multi-Agent 系统通信协议设计与实现——基于 A2A 协议的 Agent 间消息路由与任务分发实战 2 次阅读
Multi-Agent 系统通信协议设计与实现——基于 A2A 协议的 Agent 间消息路由与任务分发实战
AI Agent 开发

Multi-Agent 系统通信协议设计与实现

基于 A2A 协议的 Agent 间消息路由与任务分发实战

2026 年 3 月 2 日 · 约 12 分钟阅读

当你构建一个 Multi-Agent 系统时,是否遇到过这些困境:

  • 不同框架(LangGraph、CrewAI、ADK)开发的 Agent 无法直接通信
  • 每次新增一个 Agent 都要修改大量路由代码
  • Agent 之间传递复杂任务时,状态管理混乱
  • 缺乏标准协议,跨组织协作几乎不可能

这些问题的根源在于缺少统一的 Agent 通信协议。传统 API 调用方式难以满足 Agent 间上下文感知、动态能力发现和自主协商的需求。

本教程将带你实战 Google 2025 年推出的 A2A(Agent-to-Agent)协议——一个正在成为行业标准的开放协议,支持超过 50 家技术合作伙伴(包括 Salesforce、MongoDB、LangChain)。你将学会:

  • A2A 协议的核心架构与通信流程
  • Agent Card 设计与能力发现机制
  • 基于 JSON-RPC 的任务提交与状态追踪
  • 使用 Python 实现完整的 A2A Client 和 Server
  • 多 Agent 协作系统的架构设计与最佳实践

一、A2A 协议核心概念

A2A 协议的设计目标是让不同厂商、不同框架构建的 AI Agent 能够像人类一样"自然对话",无需共享内部记忆或工具,仅通过标准化消息完成协作。

📇

Agent Card

Agent 的"数字名片",JSON 格式描述能力、端点和认证方式,位于 .well-known/agent.json

📨

A2A Client

发起通信的 Agent,负责任务提交、状态监控和结果接收

🖥️

A2A Server

提供服务的远程 Agent,自主执行任务并返回结果

🔄

Task Lifecycle

submitted → working → completed,支持流式状态更新

A2A 协议架构示意图:Client Agent 通过 Agent Card 发现 Remote Agent,建立通信链路

二、环境准备与依赖

本教程使用 Python 3.12+ 实现完整的 A2A 通信系统。你需要准备以下环境:

🐍

Python 3.12+

推荐使用 uv 或 venv 管理虚拟环境

🌐

FastAPI

构建 A2A Server 的 HTTP 端点

📡

HTTPX

异步 HTTP 客户端,支持 SSE 流式接收

🔑

Google Gemini API

Agent 智能决策的 LLM 后端

创建项目目录并安装依赖:

mkdir -p a2a-travel-planner && cd a2a-travel-planner
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 创建 requirements.txt
cat > requirements.txt << 'EOF'
fastapi==0.115.0
uvicorn[standard]==0.34.0
httpx==0.28.1
google-generativeai==0.8.4
pydantic==2.10.0
python-dotenv==1.0.1
EOF

pip install -r requirements.txt

创建项目结构:

mkdir -p host-agent/{src,static/.well-known}
mkdir -p remote-agents/{flight-agent,hotel-agent}/{src,static/.well-known}
touch .env

三、实战步骤

1

配置环境变量

.env 文件中配置 API 密钥:

# .env
GEMINI_API_KEY=your_google_gemini_api_key
SERPAPI_KEY=your_serpapi_key  # 用于实时搜索
HOST_PORT=8000
FLIGHT_AGENT_PORT=8001
HOTEL_AGENT_PORT=8002
⚠️ 注意:不要将 .env 提交到版本控制,将其添加到 .gitignore
2

定义 A2A 消息结构

创建 src/a2a_types.py,实现 A2A 协议的核心数据结构:

# src/a2a_types.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid

class TaskState(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    COMPLETED = "completed"
    FAILED = "failed"

class AgentCard(BaseModel):
    """Agent 能力描述卡片"""
    name: str
    description: str
    url: str
    skills: List[Dict[str, Any]] = Field(default_factory=list)
    authentication: Optional[Dict[str, str]] = None
    version: str = "1.0.0"

class Message(BaseModel):
    """Agent 间通信消息"""
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    role: str  # "user", "assistant", "system"
    content: str
    parts: Optional[List[Dict[str, Any]]] = None

class Task(BaseModel):
    """A2A 任务对象"""
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    status: TaskState = TaskState.SUBMITTED
    message: Message
    context: Optional[Dict[str, Any]] = None
    artifacts: Optional[List[Dict[str, Any]]] = None
A2A 任务状态流转图:submitted → working → completed/failed
3

实现 Flight Agent(A2A Server)

Flight Agent 负责查询航班信息。首先创建 Agent Card:

// remote-agents/flight-agent/static/.well-known/agent.json
{
  "name": "FlightSearchAgent",
  "description": "实时航班查询与价格比较服务",
  "url": "http://localhost:8001/a2a",
  "skills": [
    {
      "id": "search_flights",
      "name": "航班搜索",
      "description": "根据出发地、目的地和日期查询航班",
      "parameters": {
        "type": "object",
        "properties": {
          "origin": {"type": "string", "description": "出发城市代码"},
          "destination": {"type": "string", "description": "目的地城市代码"},
          "date": {"type": "string", "format": "date"}
        },
        "required": ["origin", "destination", "date"]
      }
    }
  ],
  "authentication": {"type": "bearer"},
  "version": "1.0.0"
}

实现 Flight Agent Server(remote-agents/flight-agent/src/main.py):

# remote-agents/flight-agent/src/main.py
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import httpx
import os
from dotenv import load_dotenv

load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")

class TaskRequest(BaseModel):
    task_id: str
    message: dict
    context: dict = None

@app.get("/.well-known/agent.json")
async def get_agent_card():
    """返回 Agent Card 供其他 Agent 发现"""
    with open("static/.well-known/agent.json") as f:
        return json.load(f)

@app.post("/a2a/task")
async def create_task(request: TaskRequest):
    """接收 A2A 任务请求"""
    query = request.message.get("content", "")
    params = request.context or {}

    # 调用航班搜索 API(示例使用 SerpAPI)
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://serpapi.com/search",
            params={
                "engine": "google_flights",
                "origin": params.get("origin"),
                "destination": params.get("destination"),
                "date": params.get("date"),
                "api_key": os.getenv("SERPAPI_KEY")
            }
        )
        flights = response.json()

    # 构建返回结果
    return {
        "task_id": request.task_id,
        "status": "completed",
        "artifacts": [{
            "type": "flight_results",
            "data": flights.get("flights", [])[:5]  # 返回前 5 个航班
        }]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("FLIGHT_AGENT_PORT", 8001)))
4

实现 Hotel Agent(A2A Server)

Hotel Agent 结构与 Flight Agent 类似,创建 remote-agents/hotel-agent/src/main.py

# remote-agents/hotel-agent/src/main.py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import httpx
import os
from dotenv import load_dotenv

load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")

class TaskRequest(BaseModel):
    task_id: str
    message: dict
    context: dict = None

@app.get("/.well-known/agent.json")
async def get_agent_card():
    with open("static/.well-known/agent.json") as f:
        return json.load(f)

@app.post("/a2a/task")
async def create_task(request: TaskRequest):
    """接收酒店预订任务"""
    params = request.context or {}

    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://serpapi.com/search",
            params={
                "engine": "google_hotels",
                "location": params.get("destination"),
                "check_in_date": params.get("check_in"),
                "check_out_date": params.get("check_out"),
                "api_key": os.getenv("SERPAPI_KEY")
            }
        )
        hotels = response.json()

    return {
        "task_id": request.task_id,
        "status": "completed",
        "artifacts": [{
            "type": "hotel_results",
            "data": hotels.get("hotels", [])[:5]
        }]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("HOTEL_AGENT_PORT", 8002)))
💡 提示:实际生产中应接入真实的航班/酒店 API(如 Amadeus、Booking.com API)
5

实现 Host Agent(A2A Client 协调者)

Host Agent 是用户直接交互的入口,负责解析用户请求、发现远程 Agent 能力、分发任务并聚合结果。

创建 Host Agent 的 Agent Card(host-agent/static/.well-known/agent.json):

{
  "name": "TravelPlannerHost",
  "description": "智能旅行规划助手,协调航班、酒店和景点查询",
  "url": "http://localhost:8000/a2a",
  "skills": [
    {
      "id": "plan_trip",
      "name": "旅行规划",
      "description": "根据目的地和预算生成完整旅行计划"
    }
  ],
  "version": "1.0.0"
}

实现 Host Agent 核心逻辑(host-agent/src/main.py):

# host-agent/src/main.py
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import httpx
import json
import os
from dotenv import load_dotenv
import google.generativeai as genai

load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))

app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")

# 远程 Agent 配置
REMOTE_AGENTS = {
    "flight": "http://localhost:8001",
    "hotel": "http://localhost:8002"
}

class UserRequest(BaseModel):
    message: str
    context: dict = None

async def discover_agent_capabilities(agent_url: str) -> dict:
    """发现远程 Agent 的能力(Agent Card)"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{agent_url}/.well-known/agent.json")
        return response.json()

async def delegate_task(agent_url: str, task_id: str, message: str, context: dict) -> dict:
    """向远程 Agent 委派任务"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{agent_url}/a2a/task",
            json={
                "task_id": task_id,
                "message": {"role": "user", "content": message},
                "context": context
            },
            timeout=30.0
        )
        return response.json()

@app.get("/.well-known/agent.json")
async def get_agent_card():
    with open("static/.well-known/agent.json") as f:
        return json.load(f)

@app.post("/a2a/plan")
async def plan_trip(request: UserRequest):
    """处理用户旅行规划请求"""
    # 使用 LLM 解析用户意图
    model = genai.GenerativeModel("gemini-2.0-flash")
    intent_response = model.generate_content(f"""
    分析用户请求,提取旅行规划参数:
    用户消息:{request.message}

    返回 JSON 格式:
    {{
        "destination": "目的地城市",
        "origin": "出发城市",
        "dates": {{"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}},
        "budget": 预算数字,
        "needs": ["flight", "hotel", "activities"]
    }}
    """)

    try:
        params = json.loads(intent_response.text)
    except:
        raise HTTPException(status_code=400, detail="无法解析用户意图")

    # 并发调用远程 Agent
    tasks = []
    if "flight" in params.get("needs", []):
        tasks.append(delegate_task(
            REMOTE_AGENTS["flight"],
            f"flight-{params['dates']['start']}",
            f"查询 {params['origin']} 到 {params['destination']} 的航班",
            {
                "origin": params["origin"],
                "destination": params["destination"],
                "date": params["dates"]["start"]
            }
        ))

    if "hotel" in params.get("needs", []):
        tasks.append(delegate_task(
            REMOTE_AGENTS["hotel"],
            f"hotel-{params['dates']['start']}",
            f"查询 {params['destination']} 的酒店",
            {
                "destination": params["destination"],
                "check_in": params["dates"]["start"],
                "check_out": params["dates"]["end"]
            }
        ))

    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 聚合结果并生成最终计划
    final_plan = model.generate_content(f"""
    基于以下信息生成旅行计划:
    用户需求:{request.message}
    航班结果:{results[0] if len(results) > 0 else "未查询"}
    酒店结果:{results[1] if len(results) > 1 else "未查询"}

    返回结构化的 Markdown 格式计划。
    """)

    return {"plan": final_plan.text, "raw_results": results}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("HOST_PORT", 8000)))
Host Agent 协调流程图:用户请求 → LLM 意图解析 → 并发调用 Flight/Hotel Agent → 结果聚合
6

启动并测试 Multi-Agent 系统

打开三个终端窗口,分别启动三个 Agent:

# 终端 1: Flight Agent
cd remote-agents/flight-agent
python src/main.py

# 终端 2: Hotel Agent
cd remote-agents/hotel-agent
python src/main.py

# 终端 3: Host Agent
cd host-agent
python src/main.py

测试请求(使用 curl 或 Postman):

curl -X POST http://localhost:8000/a2a/plan \
  -H "Content-Type: application/json" \
  -d '{
    "message": "我想从北京去上海旅行 3 天,预算 5000 元,需要机票和酒店"
  }'

预期响应:

{
  "plan": "# 上海 3 日旅行计划\n\n## 航班推荐\n- 航班号:CA123...\n\n## 酒店推荐\n- 酒店名:上海中心大酒店...\n\n## 行程安排\n...",
  "raw_results": [
    {"task_id": "flight-2026-03-15", "status": "completed", ...},
    {"task_id": "hotel-2026-03-15", "status": "completed", ...}
  ]
}
7

实现 SSE 流式状态更新(进阶)

A2A 协议支持通过 Server-Sent Events(SSE)实现实时任务进度推送。修改 Host Agent 添加流式端点:

# host-agent/src/main.py 添加
from fastapi.responses import StreamingResponse
import asyncio
import json

async def stream_task_progress(task_id: str, agent_url: str, context: dict):
    """SSE 流式推送任务进度"""
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            f"{agent_url}/a2a/task/stream",
            json={"task_id": task_id, "context": context}
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    yield f"data: {line[6:]}\n\n"

@app.post("/a2a/plan/stream")
async def plan_trip_stream(request: UserRequest):
    """流式旅行规划(实时推送进度)"""
    async def generate():
        # 发送开始消息
        yield f"data: {json.dumps({'type': 'start', 'message': '开始规划行程'})}\n\n"

        # 意图解析阶段
        yield f"data: {json.dumps({'type': 'progress', 'step': '解析需求'})}\n\n"
        # ... 继续解析 ...

        # 航班查询阶段
        yield f"data: {json.dumps({'type': 'progress', 'step': '查询航班'})}\n\n"
        # ... 查询航班 ...

        # 完成
        yield f"data: {json.dumps({'type': 'complete'})}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

前端可使用 EventSource 接收流式更新:

const eventSource = new EventSource('/a2a/plan/stream?message=我想去上海旅行');
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === 'progress') {
    console.log('当前进度:', data.step);
  }
};

四、常见问题与解决方案

Q1: A2A 与 MCP 协议有什么区别?应该选哪个?

A2A 用于 Agent 之间的通信(如 Host Agent 调用 Flight Agent),而MCP 用于 Agent 调用工具和数据源(如 Agent 访问数据库、API)。最佳实践是同时使用两者:A2A 负责多 Agent 协作,MCP 负责单个 Agent 访问外部资源。

Q2: 如何处理 Agent 认证和授权?

A2A 协议支持多种认证方式:1) OAuth 2.0 Bearer Token(推荐用于生产环境);2) API Key(适合内部系统);3) 双向 TLS(高安全场景)。在 Agent Card 的 authentication 字段声明所需方式,Client 在请求头中携带凭证。

Q3: 任务超时或失败如何处理?

实现重试机制和降级策略:1) 设置合理的超时时间(建议 30-60 秒);2) 对失败任务进行指数退避重试;3) 准备备用 Agent 或降级响应(如返回缓存数据)。

Q4: A2A 支持哪些数据格式?

A2A 使用 JSON-RPC 2.0 作为基础协议,支持任意 JSON 序列化数据。对于多模态内容(图片、音频),使用 artifacts 字段传递 Base64 编码或 URL 引用。

五、进阶技巧与最佳实践

  • Agent Card 版本管理:在 URL 中添加版本号(/v1/.well-known/agent.json),支持向后兼容
  • 任务预算控制:为每个远程 Agent 设置 Token 使用上限和调用频率限制,防止资源滥用
  • 动态注册发现:构建 Agent 注册中心,支持 Agent 动态上线/下线,避免硬编码 URL
  • 链路追踪:为每个任务生成唯一 trace_id,贯穿所有 Agent 调用,便于调试和监控
  • 结果缓存:对高频查询(如热门航线航班)实现 Redis 缓存,降低 API 成本
生产级 Multi-Agent 系统架构图:包含负载均衡、缓存层、监控和日志

总结

通过本教程,你完成了以下内容:

  • ✓ 理解 A2A 协议的核心概念:Agent Card、Client/Server 模型、任务生命周期
  • ✓ 实现 Flight Agent 和 Hotel Agent 作为 A2A Server
  • ✓ 实现 Host Agent 作为 A2A Client 协调者,使用 LLM 解析意图并分发任务
  • ✓ 掌握 SSE 流式状态更新的实现方法
  • ✓ 了解生产环境最佳实践:认证、缓存、链路追踪

A2A 协议正在成为 Multi-Agent 系统的行业标准。掌握它,你将能够构建可扩展、跨组织的智能协作系统。

A2A 协议 Multi-Agent Agent 开发 Google ADK
选择栏目
今日简报 播客电台 实战教程 AI挣钱计划 关于我
栏目
全球AI日报国内AI日报全球金融日报国内金融日报全球大新闻日报国内大新闻日报Claude Code 玩法日报OpenClaw 动态日报GitHub 热门项目日报AI工具实战AI应用开发编程实战工作流自动化AI原理图解AI Agent开发AI变现案例库AI工具创收AI内容变现AI接单提效变现前沿研究
我的收藏
播客版
0:00
--:--