当你构建一个 Multi-Agent 系统时,是否遇到过这些困境:
- 不同框架(LangGraph、CrewAI、ADK)开发的 Agent 无法直接通信
- 每次新增一个 Agent 都要修改大量路由代码
- Agent 之间传递复杂任务时,状态管理混乱
- 缺乏标准协议,跨组织协作几乎不可能
这些问题的根源在于缺少统一的 Agent 通信协议。传统 API 调用方式难以满足 Agent 间上下文感知、动态能力发现和自主协商的需求。
本教程将带你实战 Google 2025 年推出的 A2A(Agent-to-Agent)协议——一个正在成为行业标准的开放协议,支持超过 50 家技术合作伙伴(包括 Salesforce、MongoDB、LangChain)。你将学会:
- A2A 协议的核心架构与通信流程
- Agent Card 设计与能力发现机制
- 基于 JSON-RPC 的任务提交与状态追踪
- 使用 Python 实现完整的 A2A Client 和 Server
- 多 Agent 协作系统的架构设计与最佳实践
一、A2A 协议核心概念
A2A 协议的设计目标是让不同厂商、不同框架构建的 AI Agent 能够像人类一样"自然对话",无需共享内部记忆或工具,仅通过标准化消息完成协作。
Agent Card
Agent 的"数字名片",JSON 格式描述能力、端点和认证方式,位于 .well-known/agent.json
A2A Client
发起通信的 Agent,负责任务提交、状态监控和结果接收
A2A Server
提供服务的远程 Agent,自主执行任务并返回结果
Task Lifecycle
submitted → working → completed,支持流式状态更新
二、环境准备与依赖
本教程使用 Python 3.12+ 实现完整的 A2A 通信系统。你需要准备以下环境:
Python 3.12+
推荐使用 uv 或 venv 管理虚拟环境
FastAPI
构建 A2A Server 的 HTTP 端点
HTTPX
异步 HTTP 客户端,支持 SSE 流式接收
Google Gemini API
Agent 智能决策的 LLM 后端
创建项目目录并安装依赖:
mkdir -p a2a-travel-planner && cd a2a-travel-planner
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 创建 requirements.txt
cat > requirements.txt << 'EOF'
fastapi==0.115.0
uvicorn[standard]==0.34.0
httpx==0.28.1
google-generativeai==0.8.4
pydantic==2.10.0
python-dotenv==1.0.1
EOF
pip install -r requirements.txt
创建项目结构:
mkdir -p host-agent/{src,static/.well-known}
mkdir -p remote-agents/{flight-agent,hotel-agent}/{src,static/.well-known}
touch .env
三、实战步骤
配置环境变量
在 .env 文件中配置 API 密钥:
# .env
GEMINI_API_KEY=your_google_gemini_api_key
SERPAPI_KEY=your_serpapi_key # 用于实时搜索
HOST_PORT=8000
FLIGHT_AGENT_PORT=8001
HOTEL_AGENT_PORT=8002
.env 提交到版本控制,将其添加到 .gitignore
定义 A2A 消息结构
创建 src/a2a_types.py,实现 A2A 协议的核心数据结构:
# src/a2a_types.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid
class TaskState(str, Enum):
SUBMITTED = "submitted"
WORKING = "working"
COMPLETED = "completed"
FAILED = "failed"
class AgentCard(BaseModel):
"""Agent 能力描述卡片"""
name: str
description: str
url: str
skills: List[Dict[str, Any]] = Field(default_factory=list)
authentication: Optional[Dict[str, str]] = None
version: str = "1.0.0"
class Message(BaseModel):
"""Agent 间通信消息"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
role: str # "user", "assistant", "system"
content: str
parts: Optional[List[Dict[str, Any]]] = None
class Task(BaseModel):
"""A2A 任务对象"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
status: TaskState = TaskState.SUBMITTED
message: Message
context: Optional[Dict[str, Any]] = None
artifacts: Optional[List[Dict[str, Any]]] = None
实现 Flight Agent(A2A Server)
Flight Agent 负责查询航班信息。首先创建 Agent Card:
// remote-agents/flight-agent/static/.well-known/agent.json
{
"name": "FlightSearchAgent",
"description": "实时航班查询与价格比较服务",
"url": "http://localhost:8001/a2a",
"skills": [
{
"id": "search_flights",
"name": "航班搜索",
"description": "根据出发地、目的地和日期查询航班",
"parameters": {
"type": "object",
"properties": {
"origin": {"type": "string", "description": "出发城市代码"},
"destination": {"type": "string", "description": "目的地城市代码"},
"date": {"type": "string", "format": "date"}
},
"required": ["origin", "destination", "date"]
}
}
],
"authentication": {"type": "bearer"},
"version": "1.0.0"
}
实现 Flight Agent Server(remote-agents/flight-agent/src/main.py):
# remote-agents/flight-agent/src/main.py
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
class TaskRequest(BaseModel):
task_id: str
message: dict
context: dict = None
@app.get("/.well-known/agent.json")
async def get_agent_card():
"""返回 Agent Card 供其他 Agent 发现"""
with open("static/.well-known/agent.json") as f:
return json.load(f)
@app.post("/a2a/task")
async def create_task(request: TaskRequest):
"""接收 A2A 任务请求"""
query = request.message.get("content", "")
params = request.context or {}
# 调用航班搜索 API(示例使用 SerpAPI)
async with httpx.AsyncClient() as client:
response = await client.get(
"https://serpapi.com/search",
params={
"engine": "google_flights",
"origin": params.get("origin"),
"destination": params.get("destination"),
"date": params.get("date"),
"api_key": os.getenv("SERPAPI_KEY")
}
)
flights = response.json()
# 构建返回结果
return {
"task_id": request.task_id,
"status": "completed",
"artifacts": [{
"type": "flight_results",
"data": flights.get("flights", [])[:5] # 返回前 5 个航班
}]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("FLIGHT_AGENT_PORT", 8001)))
实现 Hotel Agent(A2A Server)
Hotel Agent 结构与 Flight Agent 类似,创建 remote-agents/hotel-agent/src/main.py:
# remote-agents/hotel-agent/src/main.py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
class TaskRequest(BaseModel):
task_id: str
message: dict
context: dict = None
@app.get("/.well-known/agent.json")
async def get_agent_card():
with open("static/.well-known/agent.json") as f:
return json.load(f)
@app.post("/a2a/task")
async def create_task(request: TaskRequest):
"""接收酒店预订任务"""
params = request.context or {}
async with httpx.AsyncClient() as client:
response = await client.get(
"https://serpapi.com/search",
params={
"engine": "google_hotels",
"location": params.get("destination"),
"check_in_date": params.get("check_in"),
"check_out_date": params.get("check_out"),
"api_key": os.getenv("SERPAPI_KEY")
}
)
hotels = response.json()
return {
"task_id": request.task_id,
"status": "completed",
"artifacts": [{
"type": "hotel_results",
"data": hotels.get("hotels", [])[:5]
}]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("HOTEL_AGENT_PORT", 8002)))
实现 Host Agent(A2A Client 协调者)
Host Agent 是用户直接交互的入口,负责解析用户请求、发现远程 Agent 能力、分发任务并聚合结果。
创建 Host Agent 的 Agent Card(host-agent/static/.well-known/agent.json):
{
"name": "TravelPlannerHost",
"description": "智能旅行规划助手,协调航班、酒店和景点查询",
"url": "http://localhost:8000/a2a",
"skills": [
{
"id": "plan_trip",
"name": "旅行规划",
"description": "根据目的地和预算生成完整旅行计划"
}
],
"version": "1.0.0"
}
实现 Host Agent 核心逻辑(host-agent/src/main.py):
# host-agent/src/main.py
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import httpx
import json
import os
from dotenv import load_dotenv
import google.generativeai as genai
load_dotenv()
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
# 远程 Agent 配置
REMOTE_AGENTS = {
"flight": "http://localhost:8001",
"hotel": "http://localhost:8002"
}
class UserRequest(BaseModel):
message: str
context: dict = None
async def discover_agent_capabilities(agent_url: str) -> dict:
"""发现远程 Agent 的能力(Agent Card)"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{agent_url}/.well-known/agent.json")
return response.json()
async def delegate_task(agent_url: str, task_id: str, message: str, context: dict) -> dict:
"""向远程 Agent 委派任务"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{agent_url}/a2a/task",
json={
"task_id": task_id,
"message": {"role": "user", "content": message},
"context": context
},
timeout=30.0
)
return response.json()
@app.get("/.well-known/agent.json")
async def get_agent_card():
with open("static/.well-known/agent.json") as f:
return json.load(f)
@app.post("/a2a/plan")
async def plan_trip(request: UserRequest):
"""处理用户旅行规划请求"""
# 使用 LLM 解析用户意图
model = genai.GenerativeModel("gemini-2.0-flash")
intent_response = model.generate_content(f"""
分析用户请求,提取旅行规划参数:
用户消息:{request.message}
返回 JSON 格式:
{{
"destination": "目的地城市",
"origin": "出发城市",
"dates": {{"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}},
"budget": 预算数字,
"needs": ["flight", "hotel", "activities"]
}}
""")
try:
params = json.loads(intent_response.text)
except:
raise HTTPException(status_code=400, detail="无法解析用户意图")
# 并发调用远程 Agent
tasks = []
if "flight" in params.get("needs", []):
tasks.append(delegate_task(
REMOTE_AGENTS["flight"],
f"flight-{params['dates']['start']}",
f"查询 {params['origin']} 到 {params['destination']} 的航班",
{
"origin": params["origin"],
"destination": params["destination"],
"date": params["dates"]["start"]
}
))
if "hotel" in params.get("needs", []):
tasks.append(delegate_task(
REMOTE_AGENTS["hotel"],
f"hotel-{params['dates']['start']}",
f"查询 {params['destination']} 的酒店",
{
"destination": params["destination"],
"check_in": params["dates"]["start"],
"check_out": params["dates"]["end"]
}
))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 聚合结果并生成最终计划
final_plan = model.generate_content(f"""
基于以下信息生成旅行计划:
用户需求:{request.message}
航班结果:{results[0] if len(results) > 0 else "未查询"}
酒店结果:{results[1] if len(results) > 1 else "未查询"}
返回结构化的 Markdown 格式计划。
""")
return {"plan": final_plan.text, "raw_results": results}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("HOST_PORT", 8000)))
启动并测试 Multi-Agent 系统
打开三个终端窗口,分别启动三个 Agent:
# 终端 1: Flight Agent
cd remote-agents/flight-agent
python src/main.py
# 终端 2: Hotel Agent
cd remote-agents/hotel-agent
python src/main.py
# 终端 3: Host Agent
cd host-agent
python src/main.py
测试请求(使用 curl 或 Postman):
curl -X POST http://localhost:8000/a2a/plan \
-H "Content-Type: application/json" \
-d '{
"message": "我想从北京去上海旅行 3 天,预算 5000 元,需要机票和酒店"
}'
预期响应:
{
"plan": "# 上海 3 日旅行计划\n\n## 航班推荐\n- 航班号:CA123...\n\n## 酒店推荐\n- 酒店名:上海中心大酒店...\n\n## 行程安排\n...",
"raw_results": [
{"task_id": "flight-2026-03-15", "status": "completed", ...},
{"task_id": "hotel-2026-03-15", "status": "completed", ...}
]
}
实现 SSE 流式状态更新(进阶)
A2A 协议支持通过 Server-Sent Events(SSE)实现实时任务进度推送。修改 Host Agent 添加流式端点:
# host-agent/src/main.py 添加
from fastapi.responses import StreamingResponse
import asyncio
import json
async def stream_task_progress(task_id: str, agent_url: str, context: dict):
"""SSE 流式推送任务进度"""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{agent_url}/a2a/task/stream",
json={"task_id": task_id, "context": context}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
yield f"data: {line[6:]}\n\n"
@app.post("/a2a/plan/stream")
async def plan_trip_stream(request: UserRequest):
"""流式旅行规划(实时推送进度)"""
async def generate():
# 发送开始消息
yield f"data: {json.dumps({'type': 'start', 'message': '开始规划行程'})}\n\n"
# 意图解析阶段
yield f"data: {json.dumps({'type': 'progress', 'step': '解析需求'})}\n\n"
# ... 继续解析 ...
# 航班查询阶段
yield f"data: {json.dumps({'type': 'progress', 'step': '查询航班'})}\n\n"
# ... 查询航班 ...
# 完成
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
前端可使用 EventSource 接收流式更新:
const eventSource = new EventSource('/a2a/plan/stream?message=我想去上海旅行');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'progress') {
console.log('当前进度:', data.step);
}
};
四、常见问题与解决方案
Q1: A2A 与 MCP 协议有什么区别?应该选哪个?
A2A 用于 Agent 之间的通信(如 Host Agent 调用 Flight Agent),而MCP 用于 Agent 调用工具和数据源(如 Agent 访问数据库、API)。最佳实践是同时使用两者:A2A 负责多 Agent 协作,MCP 负责单个 Agent 访问外部资源。
Q2: 如何处理 Agent 认证和授权?
A2A 协议支持多种认证方式:1) OAuth 2.0 Bearer Token(推荐用于生产环境);2) API Key(适合内部系统);3) 双向 TLS(高安全场景)。在 Agent Card 的 authentication 字段声明所需方式,Client 在请求头中携带凭证。
Q3: 任务超时或失败如何处理?
实现重试机制和降级策略:1) 设置合理的超时时间(建议 30-60 秒);2) 对失败任务进行指数退避重试;3) 准备备用 Agent 或降级响应(如返回缓存数据)。
Q4: A2A 支持哪些数据格式?
A2A 使用 JSON-RPC 2.0 作为基础协议,支持任意 JSON 序列化数据。对于多模态内容(图片、音频),使用 artifacts 字段传递 Base64 编码或 URL 引用。
五、进阶技巧与最佳实践
- Agent Card 版本管理:在 URL 中添加版本号(
/v1/.well-known/agent.json),支持向后兼容 - 任务预算控制:为每个远程 Agent 设置 Token 使用上限和调用频率限制,防止资源滥用
- 动态注册发现:构建 Agent 注册中心,支持 Agent 动态上线/下线,避免硬编码 URL
- 链路追踪:为每个任务生成唯一
trace_id,贯穿所有 Agent 调用,便于调试和监控 - 结果缓存:对高频查询(如热门航线航班)实现 Redis 缓存,降低 API 成本
总结
通过本教程,你完成了以下内容:
- ✓ 理解 A2A 协议的核心概念:Agent Card、Client/Server 模型、任务生命周期
- ✓ 实现 Flight Agent 和 Hotel Agent 作为 A2A Server
- ✓ 实现 Host Agent 作为 A2A Client 协调者,使用 LLM 解析意图并分发任务
- ✓ 掌握 SSE 流式状态更新的实现方法
- ✓ 了解生产环境最佳实践:认证、缓存、链路追踪
A2A 协议正在成为 Multi-Agent 系统的行业标准。掌握它,你将能够构建可扩展、跨组织的智能协作系统。