Agent2Agent Protocol (A2A)
Google's open protocol enabling AI agents to discover, communicate, and collaborate across organizational boundaries using standardized task exchange.
The Agent2Agent (A2A) protocol defines how autonomous AI agents discover each other’s capabilities and exchange tasks. Unlike MCP, which connects agents to tools, A2A enables agent-to-agent collaboration — allowing specialized agents developed by different teams or organizations to work together without bespoke integration code. Google introduced A2A as an open standard precisely because the multi-agent future requires interoperability at the protocol level, not just at the application level.
What is A2A?
A2A rests on four pillars. Agent Cards are machine-readable JSON documents that describe what an agent can do, which skills it offers, and how to authenticate with it. The Task Protocol is a standardized HTTP interface for submitting work, polling for results, and handling multi-turn interactions where the remote agent needs additional input. Streaming support via Server-Sent Events lets clients receive real-time progress on long-running tasks without holding an open connection. Finally, authentication support for OAuth2 and API keys makes secure cross-organization communication practical.
MCP connects agents to tools and data sources (agent to tool). A2A connects agents to other agents (agent to agent). They are complementary: an agent might use MCP to access tools locally and A2A to delegate tasks to specialized remote agents.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ ORCHESTRATOR AGENT │
│ │
│ 1. Discover agents via Agent Cards │
│ 2. Select appropriate agent for task │
│ 3. Send task via A2A protocol │
│ 4. Handle responses (polling or streaming) │
└─────────────────────────────────────────────────────────────────┘
│ │ │
│ HTTPS │ HTTPS │ HTTPS
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ AGENT A │ │ AGENT B │ │ AGENT C │
│ (Research) │ │ (Analysis) │ │ (Writing) │
│ │ │ │ │ │
│ Skills: │ │ Skills: │ │ Skills: │
│ • web-search │ │ • data-viz │ │ • summarize │
│ • doc-fetch │ │ • statistics │ │ • translate │
│ │ │ • ml-predict │ │ • format │
└───────────────┘ └───────────────┘ └───────────────┘
│ │ │
Uses MCP Uses MCP Uses MCP
│ │ │
▼ ▼ ▼
[Tools] [Tools] [Tools] Agent Cards
Agent Cards are JSON documents hosted at a well-known URL (/.well-known/agent.json) for automatic discovery. Each card declares the agent’s name, description, base URL, and the skills it supports — with full JSON Schema definitions for inputs and outputs.
| Field | Purpose | Required |
|---|---|---|
name | Unique identifier for the agent | Yes |
description | Human-readable description of capabilities | Yes |
url | Base URL for the agent’s A2A endpoints | Yes |
skills | List of specific capabilities with schemas | Yes |
capabilities | Protocol features supported (streaming, etc.) | No |
authentication | How to authenticate requests | No |
Define detailed inputSchema and outputSchema for each skill using JSON Schema. This enables client agents to validate inputs and understand outputs before sending a task.
Task Protocol
Tasks are the primary unit of work in A2A. A task moves through a well-defined state machine after being submitted, giving clients a predictable model for polling and error handling.
┌─────────────┐
│ SUBMITTED │
└──────┬──────┘
│
▼
┌─────────────┐
┌──────│ WORKING │──────┐
│ └──────┬──────┘ │
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────┐ ┌──────────┐
│INPUT-REQUIRED│ │COMPLETED│ │ FAILED │
└──────┬──────┘ └─────────┘ └──────────┘
│
│ (user provides input)
│
└──────────► WORKING The following Python client shows how to submit a task, poll for completion, and handle the input-required state where the remote agent needs clarification before it can continue.
import httpx
import asyncio
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input-required"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
@dataclass
class A2ATask:
id: str
status: TaskStatus
message: dict | None = None
artifacts: list | None = None
history: list | None = None
class A2AClient:
def __init__(self, agent_url: str, auth_token: str):
self.agent_url = agent_url.rstrip("/")
self.client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {auth_token}"}
)
async def send_task(
self,
message: str,
session_id: str | None = None,
metadata: dict | None = None
) -> A2ATask:
"""Send a new task to the agent."""
task_data = {
"id": str(uuid4()),
"sessionId": session_id or str(uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
if metadata:
task_data["metadata"] = metadata
response = await self.client.post(
f"{self.agent_url}/tasks/send",
json=task_data
)
response.raise_for_status()
return self._parse_task(response.json())
async def get_task_status(self, task_id: str) -> A2ATask:
"""Get current status of a task."""
response = await self.client.get(
f"{self.agent_url}/tasks/{task_id}"
)
response.raise_for_status()
return self._parse_task(response.json())
async def send_input(
self,
task_id: str,
message: str
) -> A2ATask:
"""Send additional input for a task that requires it."""
response = await self.client.post(
f"{self.agent_url}/tasks/{task_id}/send",
json={
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
)
response.raise_for_status()
return self._parse_task(response.json())
async def wait_for_completion(
self,
task_id: str,
poll_interval: float = 1.0,
timeout: float = 300.0
) -> A2ATask:
"""Poll until task completes or times out."""
elapsed = 0.0
while elapsed < timeout:
task = await self.get_task_status(task_id)
if task.status in [
TaskStatus.COMPLETED,
TaskStatus.FAILED,
TaskStatus.CANCELED
]:
return task
if task.status == TaskStatus.INPUT_REQUIRED:
raise InterruptedError(
f"Task requires input: {task.message}"
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(f"Task {task_id} did not complete")
def _parse_task(self, data: dict) -> A2ATask:
return A2ATask(
id=data["id"],
status=TaskStatus(data["status"]),
message=data.get("message"),
artifacts=data.get("artifacts"),
history=data.get("history")
)
# Usage
async def main():
client = A2AClient(
agent_url="https://api.example.com/agents/research",
auth_token="your-token"
)
# Send a task
task = await client.send_task(
"Research the latest developments in AI safety"
)
print(f"Task submitted: {task.id}")
# Wait for completion
result = await client.wait_for_completion(task.id)
if result.status == TaskStatus.COMPLETED:
for artifact in result.artifacts or []:
print(f"Artifact: {artifact}")
asyncio.run(main())
Task Endpoints
| Endpoint | Method | Purpose |
|---|---|---|
/tasks/send | POST | Submit a new task |
/tasks/{id} | GET | Get task status and results |
/tasks/{id}/send | POST | Send additional input to a task |
/tasks/{id}/cancel | POST | Cancel a running task |
/tasks/sendSubscribe | POST | Submit task with SSE streaming |
Streaming Responses
For long-running tasks, A2A supports Server-Sent Events (SSE) via the /tasks/sendSubscribe endpoint. The client opens a streaming connection and receives typed events as the agent makes progress.
import httpx
from typing import AsyncIterator
from dataclasses import dataclass
@dataclass
class StreamEvent:
type: str
data: dict
async def stream_task(
agent_url: str,
message: str,
auth_token: str,
session_id: str | None = None
) -> AsyncIterator[StreamEvent]:
"""Stream task results using Server-Sent Events."""
task_data = {
"id": str(uuid4()),
"sessionId": session_id or str(uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{agent_url}/tasks/sendSubscribe",
json=task_data,
headers={
"Authorization": f"Bearer {auth_token}",
"Accept": "text/event-stream"
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
yield StreamEvent(
type=data.get("type", "unknown"),
data=data
)
if data.get("type") in ["done", "error"]:
return
# Usage with streaming
async def research_with_streaming():
full_response = ""
async for event in stream_task(
agent_url="https://api.example.com/agents/research",
message="Research quantum computing breakthroughs in 2025",
auth_token="your-token"
):
if event.type == "status":
print(f"Status: {event.data['status']}")
elif event.type == "artifact":
if event.data.get("streaming"):
chunk = event.data.get("content", "")
print(chunk, end="", flush=True)
full_response += chunk
else:
print(f"\nArtifact: {event.data}")
elif event.type == "done":
print(f"\nCompleted with status: {event.data['status']}")
elif event.type == "error":
raise Exception(event.data.get("message"))
asyncio.run(research_with_streaming())
| Event Type | Purpose | Data |
|---|---|---|
status | Task status changed | { status: "working" } |
artifact | Result content (can stream) | { streaming: true, content: "..." } |
message | Agent message or question | { role: "agent", content: "..." } |
done | Task completed | { status: "completed" } |
error | Error occurred | { message: "..." } |
Use Cases
A2A unlocks several categories of multi-agent collaboration that are difficult to achieve with point-to-point integrations. An enterprise agent marketplace lets organizations publish specialized agents — covering HR, finance, legal, or compliance — that other teams discover and invoke through the standard protocol without any custom API work. Multi-agent workflows become straightforward: an orchestrator breaks a complex task into phases and delegates each to the best-suited remote agent, with results flowing back through the same protocol. Cross-organization collaboration is equally tractable; one company’s agent can request specialized analytics from a partner’s agent with proper authentication and audit trails. Third-party developers can build and monetize niche agents — translation, legal review, code analysis — that any A2A-capable client can discover and use.
A2A vs MCP
| Aspect | A2A | MCP |
|---|---|---|
| Purpose | Agent-to-agent communication | Agent-to-tool communication |
| Transport | HTTPS (remote) | stdio (local process) |
| Discovery | Agent Cards at well-known URL | Configuration file |
| Auth | OAuth2, API keys | Process-level trust |
| Streaming | SSE (Server-Sent Events) | JSON-RPC notifications |
| Use Case | Cross-org agent collaboration | Local tool integration |
A well-designed agent system might use both: MCP for local tools such as file access and database queries, and A2A to delegate specialized tasks to remote agents.
Security Considerations
Always require authentication for A2A endpoints. Use OAuth2 for cross-organization scenarios or API keys for internal use.
Validate all task inputs against the skill’s inputSchema. Never trust data from remote agents without validation.
Implement rate limiting to prevent abuse. Track usage per client agent for billing and quota management.
Log all task requests with client identity, sanitized inputs, and outcomes for security and debugging purposes.