danielhuber.dev@proton.me Sunday, February 22, 2026

Agent2Agent Protocol (A2A)

Google's open protocol enabling AI agents to discover, communicate, and collaborate across organizational boundaries using standardized task exchange.


February 18, 2026

The Agent2Agent (A2A) protocol defines how autonomous AI agents discover each other’s capabilities and exchange tasks. Unlike MCP, which connects agents to tools, A2A enables agent-to-agent collaboration — allowing specialized agents developed by different teams or organizations to work together without bespoke integration code. Google introduced A2A as an open standard precisely because the multi-agent future requires interoperability at the protocol level, not just at the application level.

What is A2A?

A2A rests on four pillars. Agent Cards are machine-readable JSON documents that describe what an agent can do, which skills it offers, and how to authenticate with it. The Task Protocol is a standardized HTTP interface for submitting work, polling for results, and handling multi-turn interactions where the remote agent needs additional input. Streaming support via Server-Sent Events lets clients receive real-time progress on long-running tasks without holding an open connection. Finally, authentication support for OAuth2 and API keys makes secure cross-organization communication practical.

A2A vs MCP

MCP connects agents to tools and data sources (agent to tool). A2A connects agents to other agents (agent to agent). They are complementary: an agent might use MCP to access tools locally and A2A to delegate tasks to specialized remote agents.

Architecture

A2A Protocol Flow
┌─────────────────────────────────────────────────────────────────┐
│                      ORCHESTRATOR AGENT                         │
│                                                                  │
│  1. Discover agents via Agent Cards                              │
│  2. Select appropriate agent for task                            │
│  3. Send task via A2A protocol                                   │
│  4. Handle responses (polling or streaming)                      │
└─────────────────────────────────────────────────────────────────┘
      │                    │                    │
      │ HTTPS              │ HTTPS              │ HTTPS
      │                    │                    │
      ▼                    ▼                    ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│  AGENT A      │   │  AGENT B      │   │  AGENT C      │
│  (Research)   │   │  (Analysis)   │   │  (Writing)    │
│               │   │               │   │               │
│ Skills:       │   │ Skills:       │   │ Skills:       │
│ • web-search  │   │ • data-viz    │   │ • summarize   │
│ • doc-fetch   │   │ • statistics  │   │ • translate   │
│               │   │ • ml-predict  │   │ • format      │
└───────────────┘   └───────────────┘   └───────────────┘
      │                    │                    │
 Uses MCP             Uses MCP             Uses MCP
      │                    │                    │
      ▼                    ▼                    ▼
  [Tools]              [Tools]              [Tools]

Agent Cards

Agent Cards are JSON documents hosted at a well-known URL (/.well-known/agent.json) for automatic discovery. Each card declares the agent’s name, description, base URL, and the skills it supports — with full JSON Schema definitions for inputs and outputs.

FieldPurposeRequired
nameUnique identifier for the agentYes
descriptionHuman-readable description of capabilitiesYes
urlBase URL for the agent’s A2A endpointsYes
skillsList of specific capabilities with schemasYes
capabilitiesProtocol features supported (streaming, etc.)No
authenticationHow to authenticate requestsNo
Skill Schemas

Define detailed inputSchema and outputSchema for each skill using JSON Schema. This enables client agents to validate inputs and understand outputs before sending a task.

Task Protocol

Tasks are the primary unit of work in A2A. A task moves through a well-defined state machine after being submitted, giving clients a predictable model for polling and error handling.

Task State Machine
                    ┌─────────────┐
                  │  SUBMITTED  │
                  └──────┬──────┘
                         │
                         ▼
                  ┌─────────────┐
           ┌──────│   WORKING   │──────┐
           │      └──────┬──────┘      │
           │             │             │
           ▼             ▼             ▼
    ┌─────────────┐  ┌─────────┐  ┌──────────┐
    │INPUT-REQUIRED│ │COMPLETED│  │  FAILED  │
    └──────┬──────┘  └─────────┘  └──────────┘
           │
           │ (user provides input)
           │
           └──────────► WORKING

The following Python client shows how to submit a task, poll for completion, and handle the input-required state where the remote agent needs clarification before it can continue.

import httpx
import asyncio
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input-required"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

@dataclass
class A2ATask:
    id: str
    status: TaskStatus
    message: dict | None = None
    artifacts: list | None = None
    history: list | None = None

class A2AClient:
    def __init__(self, agent_url: str, auth_token: str):
        self.agent_url = agent_url.rstrip("/")
        self.client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {auth_token}"}
        )

    async def send_task(
        self,
        message: str,
        session_id: str | None = None,
        metadata: dict | None = None
    ) -> A2ATask:
        """Send a new task to the agent."""
        task_data = {
            "id": str(uuid4()),
            "sessionId": session_id or str(uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
        if metadata:
            task_data["metadata"] = metadata

        response = await self.client.post(
            f"{self.agent_url}/tasks/send",
            json=task_data
        )
        response.raise_for_status()
        return self._parse_task(response.json())

    async def get_task_status(self, task_id: str) -> A2ATask:
        """Get current status of a task."""
        response = await self.client.get(
            f"{self.agent_url}/tasks/{task_id}"
        )
        response.raise_for_status()
        return self._parse_task(response.json())

    async def send_input(
        self,
        task_id: str,
        message: str
    ) -> A2ATask:
        """Send additional input for a task that requires it."""
        response = await self.client.post(
            f"{self.agent_url}/tasks/{task_id}/send",
            json={
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": message}]
                }
            }
        )
        response.raise_for_status()
        return self._parse_task(response.json())

    async def wait_for_completion(
        self,
        task_id: str,
        poll_interval: float = 1.0,
        timeout: float = 300.0
    ) -> A2ATask:
        """Poll until task completes or times out."""
        elapsed = 0.0

        while elapsed < timeout:
            task = await self.get_task_status(task_id)

            if task.status in [
                TaskStatus.COMPLETED,
                TaskStatus.FAILED,
                TaskStatus.CANCELED
            ]:
                return task

            if task.status == TaskStatus.INPUT_REQUIRED:
                raise InterruptedError(
                    f"Task requires input: {task.message}"
                )

            await asyncio.sleep(poll_interval)
            elapsed += poll_interval

        raise TimeoutError(f"Task {task_id} did not complete")

    def _parse_task(self, data: dict) -> A2ATask:
        return A2ATask(
            id=data["id"],
            status=TaskStatus(data["status"]),
            message=data.get("message"),
            artifacts=data.get("artifacts"),
            history=data.get("history")
        )

# Usage
async def main():
    client = A2AClient(
        agent_url="https://api.example.com/agents/research",
        auth_token="your-token"
    )

    # Send a task
    task = await client.send_task(
        "Research the latest developments in AI safety"
    )
    print(f"Task submitted: {task.id}")

    # Wait for completion
    result = await client.wait_for_completion(task.id)

    if result.status == TaskStatus.COMPLETED:
        for artifact in result.artifacts or []:
            print(f"Artifact: {artifact}")

asyncio.run(main())

Task Endpoints

EndpointMethodPurpose
/tasks/sendPOSTSubmit a new task
/tasks/{id}GETGet task status and results
/tasks/{id}/sendPOSTSend additional input to a task
/tasks/{id}/cancelPOSTCancel a running task
/tasks/sendSubscribePOSTSubmit task with SSE streaming

Streaming Responses

For long-running tasks, A2A supports Server-Sent Events (SSE) via the /tasks/sendSubscribe endpoint. The client opens a streaming connection and receives typed events as the agent makes progress.

import httpx
from typing import AsyncIterator
from dataclasses import dataclass

@dataclass
class StreamEvent:
    type: str
    data: dict

async def stream_task(
    agent_url: str,
    message: str,
    auth_token: str,
    session_id: str | None = None
) -> AsyncIterator[StreamEvent]:
    """Stream task results using Server-Sent Events."""

    task_data = {
        "id": str(uuid4()),
        "sessionId": session_id or str(uuid4()),
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": message}]
        }
    }

    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            f"{agent_url}/tasks/sendSubscribe",
            json=task_data,
            headers={
                "Authorization": f"Bearer {auth_token}",
                "Accept": "text/event-stream"
            }
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    yield StreamEvent(
                        type=data.get("type", "unknown"),
                        data=data
                    )

                    if data.get("type") in ["done", "error"]:
                        return

# Usage with streaming
async def research_with_streaming():
    full_response = ""

    async for event in stream_task(
        agent_url="https://api.example.com/agents/research",
        message="Research quantum computing breakthroughs in 2025",
        auth_token="your-token"
    ):
        if event.type == "status":
            print(f"Status: {event.data['status']}")

        elif event.type == "artifact":
            if event.data.get("streaming"):
                chunk = event.data.get("content", "")
                print(chunk, end="", flush=True)
                full_response += chunk
            else:
                print(f"\nArtifact: {event.data}")

        elif event.type == "done":
            print(f"\nCompleted with status: {event.data['status']}")

        elif event.type == "error":
            raise Exception(event.data.get("message"))

asyncio.run(research_with_streaming())
Event TypePurposeData
statusTask status changed{ status: "working" }
artifactResult content (can stream){ streaming: true, content: "..." }
messageAgent message or question{ role: "agent", content: "..." }
doneTask completed{ status: "completed" }
errorError occurred{ message: "..." }

Use Cases

A2A unlocks several categories of multi-agent collaboration that are difficult to achieve with point-to-point integrations. An enterprise agent marketplace lets organizations publish specialized agents — covering HR, finance, legal, or compliance — that other teams discover and invoke through the standard protocol without any custom API work. Multi-agent workflows become straightforward: an orchestrator breaks a complex task into phases and delegates each to the best-suited remote agent, with results flowing back through the same protocol. Cross-organization collaboration is equally tractable; one company’s agent can request specialized analytics from a partner’s agent with proper authentication and audit trails. Third-party developers can build and monetize niche agents — translation, legal review, code analysis — that any A2A-capable client can discover and use.

A2A vs MCP

AspectA2AMCP
PurposeAgent-to-agent communicationAgent-to-tool communication
TransportHTTPS (remote)stdio (local process)
DiscoveryAgent Cards at well-known URLConfiguration file
AuthOAuth2, API keysProcess-level trust
StreamingSSE (Server-Sent Events)JSON-RPC notifications
Use CaseCross-org agent collaborationLocal tool integration
Using Both Protocols

A well-designed agent system might use both: MCP for local tools such as file access and database queries, and A2A to delegate specialized tasks to remote agents.

Security Considerations

Audit Logging

Log all task requests with client identity, sanitized inputs, and outcomes for security and debugging purposes.

Tags: a2aagent-communicationgoogle