Agent2Agent Protocol (A2A)

An open protocol by Google for enabling AI agents to discover, communicate, and collaborate with each other across organizational boundaries.

What is A2A?

The Agent2Agent (A2A) protocol defines how autonomous AI agents discover each other's capabilities and exchange tasks. Unlike MCP (which connects agents to tools), A2A enables agent-to-agent collaboration.

Agent Cards

Machine-readable descriptions of what an agent can do, its skills, and how to authenticate

Task Protocol

Standardized way to send tasks, receive results, and handle multi-turn interactions

Streaming

Server-Sent Events for real-time task updates and progressive results

Authentication

OAuth2, API keys, or custom auth for secure cross-organization communication

A2A vs MCP

MCP connects agents to tools and data sources (agent → tool). A2A connects agents to other agents (agent → agent). They're complementary: an agent might use MCP to access tools locally and A2A to delegate tasks to specialized remote agents.

Architecture

A2A Protocol Flow
┌─────────────────────────────────────────────────────────────────┐
│                      ORCHESTRATOR AGENT                         │
│                                                                  │
│  1. Discover agents via Agent Cards                              │
│  2. Select appropriate agent for task                            │
│  3. Send task via A2A protocol                                   │
│  4. Handle responses (polling or streaming)                      │
└─────────────────────────────────────────────────────────────────┘
        │                    │                    │
        │ HTTPS              │ HTTPS              │ HTTPS
        │                    │                    │
        ▼                    ▼                    ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│  AGENT A      │   │  AGENT B      │   │  AGENT C      │
│  (Research)   │   │  (Analysis)   │   │  (Writing)    │
│               │   │               │   │               │
│ Skills:       │   │ Skills:       │   │ Skills:       │
│ • web-search  │   │ • data-viz    │   │ • summarize   │
│ • doc-fetch   │   │ • statistics  │   │ • translate   │
│               │   │ • ml-predict  │   │ • format      │
└───────────────┘   └───────────────┘   └───────────────┘
        │                    │                    │
   Uses MCP             Uses MCP             Uses MCP
        │                    │                    │
        ▼                    ▼                    ▼
    [Tools]              [Tools]              [Tools]

Agent Cards

Agent Cards are JSON documents that describe an agent's capabilities. They're hosted at a well-known URL (/.well-known/agent.json) for discovery:

Agent Card Structure
{
  "name": "research-assistant",
  "description": "An AI agent that helps with research tasks by searching the web, analyzing documents, and synthesizing information.",
  "url": "https://api.example.com/agents/research",
  "version": "1.2.0",

  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },

  "authentication": {
    "type": "oauth2",
    "authorizationUrl": "https://auth.example.com/oauth/authorize",
    "tokenUrl": "https://auth.example.com/oauth/token",
    "scopes": ["research:read", "research:write"]
  },

  "skills": [
    {
      "id": "web-search",
      "name": "Web Search",
      "description": "Search the internet for current information",
      "inputSchema": {
        "type": "object",
        "properties": {
          "query": { "type": "string", "description": "Search query" },
          "maxResults": { "type": "integer", "default": 10 }
        },
        "required": ["query"]
      },
      "outputSchema": {
        "type": "array",
        "items": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "url": { "type": "string" },
            "snippet": { "type": "string" }
          }
        }
      }
    },
    {
      "id": "document-analysis",
      "name": "Document Analysis",
      "description": "Analyze and extract insights from documents",
      "inputSchema": {
        "type": "object",
        "properties": {
          "documentUrl": { "type": "string" },
          "analysisType": {
            "type": "string",
            "enum": ["summary", "key-points", "sentiment", "entities"]
          }
        },
        "required": ["documentUrl", "analysisType"]
      }
    }
  ],

  "defaultInputModes": ["text"],
  "defaultOutputModes": ["text", "structured"],

  "provider": {
    "name": "Example AI Corp",
    "url": "https://example.com",
    "contactEmail": "support@example.com"
  }
}
name: research-assistant
description: An AI agent that helps with research tasks
url: https://api.example.com/agents/research
version: 1.2.0

capabilities:
  streaming: true
  pushNotifications: true
  stateTransitionHistory: true

authentication:
  type: oauth2
  authorizationUrl: https://auth.example.com/oauth/authorize
  tokenUrl: https://auth.example.com/oauth/token
  scopes:
    - research:read
    - research:write

skills:
  - id: web-search
    name: Web Search
    description: Search the internet for current information
    inputSchema:
      type: object
      properties:
        query:
          type: string
          description: Search query
        maxResults:
          type: integer
          default: 10
      required: [query]

  - id: document-analysis
    name: Document Analysis
    description: Analyze and extract insights from documents
    inputSchema:
      type: object
      properties:
        documentUrl:
          type: string
        analysisType:
          type: string
          enum: [summary, key-points, sentiment, entities]
      required: [documentUrl, analysisType]

provider:
  name: Example AI Corp
  url: https://example.com

Key Components

Field Purpose Required
name Unique identifier for the agent Yes
description Human-readable description of capabilities Yes
url Base URL for the agent's A2A endpoints Yes
skills List of specific capabilities with schemas Yes
capabilities Protocol features supported (streaming, etc.) No
authentication How to authenticate requests No

Skill Schemas

Define detailed inputSchema and outputSchema for each skill using JSON Schema. This enables client agents to validate inputs and understand outputs.

Task Protocol

Tasks are the primary unit of work in A2A. A task goes through several states:

Task State Machine
                    ┌─────────────┐
                    │  SUBMITTED  │
                    └──────┬──────┘
                           │
                           ▼
                    ┌─────────────┐
             ┌──────│   WORKING   │──────┐
             │      └──────┬──────┘      │
             │             │             │
             ▼             ▼             ▼
      ┌─────────────┐  ┌─────────┐  ┌──────────┐
      │INPUT-REQUIRED│ │COMPLETED│  │  FAILED  │
      └──────┬──────┘  └─────────┘  └──────────┘
             │
             │ (user provides input)
             │
             └──────────► WORKING
Task Lifecycle
# A2A Task Lifecycle

# 1. CLIENT SENDS TASK
task = {
    id: generateUUID(),
    sessionId: "session-123",  # For multi-turn conversations
    message: {
        role: "user",
        parts: [
            { type: "text", text: "Research the latest AI safety papers" }
        ]
    },
    metadata: {
        priority: "high",
        deadline: "2025-01-27T00:00:00Z"
    }
}

response = POST(agentUrl + "/tasks/send", task)

# 2. SERVER ACKNOWLEDGES AND STARTS PROCESSING
# Response includes task status
assert response.status == "submitted" or response.status == "working"

# 3. CLIENT POLLS FOR UPDATES (or uses streaming)
while task.status not in ["completed", "failed", "canceled"]:
    statusResponse = GET(agentUrl + "/tasks/" + task.id)
    task = statusResponse.task

    if task.status == "input-required":
        # Agent needs more information
        userInput = getUserInput(task.message)
        POST(agentUrl + "/tasks/" + task.id + "/send", userInput)

# 4. GET FINAL RESULT
if task.status == "completed":
    artifacts = task.artifacts
    for artifact in artifacts:
        print(artifact.type, artifact.content)
import httpx
import asyncio
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input-required"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

@dataclass
class A2ATask:
    id: str
    status: TaskStatus
    message: dict | None = None
    artifacts: list | None = None
    history: list | None = None

class A2AClient:
    def __init__(self, agent_url: str, auth_token: str):
        self.agent_url = agent_url.rstrip("/")
        self.client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {auth_token}"}
        )

    async def send_task(
        self,
        message: str,
        session_id: str | None = None,
        metadata: dict | None = None
    ) -> A2ATask:
        """Send a new task to the agent."""
        task_data = {
            "id": str(uuid4()),
            "sessionId": session_id or str(uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
        if metadata:
            task_data["metadata"] = metadata

        response = await self.client.post(
            f"{self.agent_url}/tasks/send",
            json=task_data
        )
        response.raise_for_status()
        return self._parse_task(response.json())

    async def get_task_status(self, task_id: str) -> A2ATask:
        """Get current status of a task."""
        response = await self.client.get(
            f"{self.agent_url}/tasks/{task_id}"
        )
        response.raise_for_status()
        return self._parse_task(response.json())

    async def send_input(
        self,
        task_id: str,
        message: str
    ) -> A2ATask:
        """Send additional input for a task that requires it."""
        response = await self.client.post(
            f"{self.agent_url}/tasks/{task_id}/send",
            json={
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": message}]
                }
            }
        )
        response.raise_for_status()
        return self._parse_task(response.json())

    async def wait_for_completion(
        self,
        task_id: str,
        poll_interval: float = 1.0,
        timeout: float = 300.0
    ) -> A2ATask:
        """Poll until task completes or times out."""
        elapsed = 0.0

        while elapsed < timeout:
            task = await self.get_task_status(task_id)

            if task.status in [
                TaskStatus.COMPLETED,
                TaskStatus.FAILED,
                TaskStatus.CANCELED
            ]:
                return task

            if task.status == TaskStatus.INPUT_REQUIRED:
                raise InterruptedError(
                    f"Task requires input: {task.message}"
                )

            await asyncio.sleep(poll_interval)
            elapsed += poll_interval

        raise TimeoutError(f"Task {task_id} did not complete")

    def _parse_task(self, data: dict) -> A2ATask:
        return A2ATask(
            id=data["id"],
            status=TaskStatus(data["status"]),
            message=data.get("message"),
            artifacts=data.get("artifacts"),
            history=data.get("history")
        )

# Usage
async def main():
    client = A2AClient(
        agent_url="https://api.example.com/agents/research",
        auth_token="your-token"
    )

    # Send a task
    task = await client.send_task(
        "Research the latest developments in AI safety"
    )
    print(f"Task submitted: {task.id}")

    # Wait for completion
    result = await client.wait_for_completion(task.id)

    if result.status == TaskStatus.COMPLETED:
        for artifact in result.artifacts or []:
            print(f"Artifact: {artifact}")

asyncio.run(main())
using System.Net.Http.Json;
using System.Text.Json;

public enum TaskStatus
{
    Submitted,
    Working,
    InputRequired,
    Completed,
    Failed,
    Canceled
}

public record A2ATask(
    string Id,
    TaskStatus Status,
    JsonElement? Message = null,
    JsonElement[]? Artifacts = null,
    JsonElement[]? History = null
);

public class A2AClient : IDisposable
{
    private readonly HttpClient _client;
    private readonly string _agentUrl;

    public A2AClient(string agentUrl, string authToken)
    {
        _agentUrl = agentUrl.TrimEnd('/');
        _client = new HttpClient();
        _client.DefaultRequestHeaders.Add("Authorization", $"Bearer {authToken}");
    }

    public async Task<A2ATask> SendTaskAsync(
        string message,
        string? sessionId = null,
        Dictionary<string, object>? metadata = null,
        CancellationToken ct = default)
    {
        var taskData = new
        {
            id = Guid.NewGuid().ToString(),
            sessionId = sessionId ?? Guid.NewGuid().ToString(),
            message = new
            {
                role = "user",
                parts = new[] { new { type = "text", text = message } }
            },
            metadata
        };

        var response = await _client.PostAsJsonAsync(
            $"{_agentUrl}/tasks/send",
            taskData,
            ct
        );
        response.EnsureSuccessStatusCode();

        return await response.Content.ReadFromJsonAsync<A2ATask>(ct)
            ?? throw new InvalidOperationException("Invalid response");
    }

    public async Task<A2ATask> GetTaskStatusAsync(
        string taskId,
        CancellationToken ct = default)
    {
        var response = await _client.GetAsync(
            $"{_agentUrl}/tasks/{taskId}",
            ct
        );
        response.EnsureSuccessStatusCode();

        return await response.Content.ReadFromJsonAsync<A2ATask>(ct)
            ?? throw new InvalidOperationException("Invalid response");
    }

    public async Task<A2ATask> WaitForCompletionAsync(
        string taskId,
        TimeSpan? pollInterval = null,
        TimeSpan? timeout = null,
        CancellationToken ct = default)
    {
        var interval = pollInterval ?? TimeSpan.FromSeconds(1);
        var maxWait = timeout ?? TimeSpan.FromMinutes(5);
        var elapsed = TimeSpan.Zero;

        while (elapsed < maxWait)
        {
            var task = await GetTaskStatusAsync(taskId, ct);

            if (task.Status is TaskStatus.Completed
                or TaskStatus.Failed
                or TaskStatus.Canceled)
            {
                return task;
            }

            if (task.Status == TaskStatus.InputRequired)
            {
                throw new InvalidOperationException(
                    $"Task requires input: {task.Message}"
                );
            }

            await Task.Delay(interval, ct);
            elapsed += interval;
        }

        throw new TimeoutException($"Task {taskId} did not complete");
    }

    public void Dispose() => _client.Dispose();
}

// Usage
await using var client = new A2AClient(
    "https://api.example.com/agents/research",
    "your-token"
);

var task = await client.SendTaskAsync(
    "Research the latest developments in AI safety"
);
Console.WriteLine($"Task submitted: {task.Id}");

var result = await client.WaitForCompletionAsync(task.Id);
if (result.Status == TaskStatus.Completed)
{
    foreach (var artifact in result.Artifacts ?? Array.Empty<JsonElement>())
    {
        Console.WriteLine($"Artifact: {artifact}");
    }
}

Task Endpoints

Endpoint Method Purpose
/tasks/send POST Submit a new task
/tasks/{id} GET Get task status and results
/tasks/{id}/send POST Send additional input to a task
/tasks/{id}/cancel POST Cancel a running task
/tasks/sendSubscribe POST Submit task with SSE streaming

Streaming Responses

For long-running tasks, A2A supports Server-Sent Events (SSE) to stream results in real-time:

Streaming Implementation
# A2A Streaming with Server-Sent Events (SSE)

function streamTask(agentUrl, task):
    # Open SSE connection
    eventSource = new EventSource(
        agentUrl + "/tasks/sendSubscribe",
        method: POST,
        body: task,
        headers: { "Content-Type": "application/json" }
    )

    eventSource.onMessage = (event) =>
        data = JSON.parse(event.data)

        switch data.type:
            case "status":
                print("Status:", data.status)

            case "artifact":
                # Partial artifact (streaming content)
                if data.append:
                    currentArtifact += data.content
                else:
                    artifacts.append(data)

            case "message":
                # Agent message (e.g., thinking, asking for input)
                print("Agent:", data.content)

            case "done":
                # Task completed
                eventSource.close()
                return { status: data.status, artifacts: artifacts }

            case "error":
                eventSource.close()
                throw Error(data.message)

    eventSource.onError = (error) =>
        eventSource.close()
        throw error
import httpx
from typing import AsyncIterator
from dataclasses import dataclass

@dataclass
class StreamEvent:
    type: str
    data: dict

async def stream_task(
    agent_url: str,
    message: str,
    auth_token: str,
    session_id: str | None = None
) -> AsyncIterator[StreamEvent]:
    """Stream task results using Server-Sent Events."""

    task_data = {
        "id": str(uuid4()),
        "sessionId": session_id or str(uuid4()),
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": message}]
        }
    }

    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            f"{agent_url}/tasks/sendSubscribe",
            json=task_data,
            headers={
                "Authorization": f"Bearer {auth_token}",
                "Accept": "text/event-stream"
            }
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    yield StreamEvent(
                        type=data.get("type", "unknown"),
                        data=data
                    )

                    if data.get("type") in ["done", "error"]:
                        return

# Usage with streaming
async def research_with_streaming():
    full_response = ""

    async for event in stream_task(
        agent_url="https://api.example.com/agents/research",
        message="Research quantum computing breakthroughs in 2025",
        auth_token="your-token"
    ):
        if event.type == "status":
            print(f"Status: {event.data['status']}")

        elif event.type == "artifact":
            if event.data.get("streaming"):
                # Streaming text content
                chunk = event.data.get("content", "")
                print(chunk, end="", flush=True)
                full_response += chunk
            else:
                # Complete artifact
                print(f"\nArtifact: {event.data}")

        elif event.type == "message":
            print(f"Agent: {event.data.get('content')}")

        elif event.type == "done":
            print(f"\nCompleted with status: {event.data['status']}")

        elif event.type == "error":
            print(f"Error: {event.data.get('message')}")
            raise Exception(event.data.get("message"))

asyncio.run(research_with_streaming())
using System.Net.Http.Json;
using System.Runtime.CompilerServices;

public record StreamEvent(string Type, JsonElement Data);

public class A2AStreamingClient : IDisposable
{
    private readonly HttpClient _client;
    private readonly string _agentUrl;

    public A2AStreamingClient(string agentUrl, string authToken)
    {
        _agentUrl = agentUrl.TrimEnd('/');
        _client = new HttpClient { Timeout = TimeSpan.FromMinutes(10) };
        _client.DefaultRequestHeaders.Add("Authorization", $"Bearer {authToken}");
        _client.DefaultRequestHeaders.Add("Accept", "text/event-stream");
    }

    public async IAsyncEnumerable<StreamEvent> StreamTaskAsync(
        string message,
        string? sessionId = null,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        var taskData = new
        {
            id = Guid.NewGuid().ToString(),
            sessionId = sessionId ?? Guid.NewGuid().ToString(),
            message = new
            {
                role = "user",
                parts = new[] { new { type = "text", text = message } }
            }
        };

        using var request = new HttpRequestMessage(
            HttpMethod.Post,
            $"{_agentUrl}/tasks/sendSubscribe"
        );
        request.Content = JsonContent.Create(taskData);

        using var response = await _client.SendAsync(
            request,
            HttpCompletionOption.ResponseHeadersRead,
            ct
        );
        response.EnsureSuccessStatusCode();

        await using var stream = await response.Content.ReadAsStreamAsync(ct);
        using var reader = new StreamReader(stream);

        while (!reader.EndOfStream && !ct.IsCancellationRequested)
        {
            var line = await reader.ReadLineAsync(ct);

            if (string.IsNullOrEmpty(line) || !line.StartsWith("data: "))
                continue;

            var jsonData = line.Substring(6);
            var data = JsonDocument.Parse(jsonData).RootElement;
            var eventType = data.GetProperty("type").GetString() ?? "unknown";

            yield return new StreamEvent(eventType, data);

            if (eventType is "done" or "error")
                yield break;
        }
    }

    public void Dispose() => _client.Dispose();
}

// Usage
await using var client = new A2AStreamingClient(
    "https://api.example.com/agents/research",
    "your-token"
);

var fullResponse = new StringBuilder();

await foreach (var evt in client.StreamTaskAsync(
    "Research quantum computing breakthroughs in 2025"
))
{
    switch (evt.Type)
    {
        case "status":
            Console.WriteLine($"Status: {evt.Data.GetProperty("status")}");
            break;

        case "artifact":
            if (evt.Data.TryGetProperty("streaming", out var streaming)
                && streaming.GetBoolean())
            {
                var chunk = evt.Data.GetProperty("content").GetString();
                Console.Write(chunk);
                fullResponse.Append(chunk);
            }
            break;

        case "done":
            Console.WriteLine($"\nCompleted: {evt.Data.GetProperty("status")}");
            break;

        case "error":
            throw new Exception(evt.Data.GetProperty("message").GetString());
    }
}

Event Types

Event Type Purpose Data
status Task status changed { status: "working" }
artifact Result content (can stream) { streaming: true, content: "..." }
message Agent message/question { role: "agent", content: "..." }
done Task completed { status: "completed" }
error Error occurred { message: "..." }

Building an A2A Server

Here's how to implement an A2A-compliant agent server:

A2A Server Implementation
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from enum import Enum
import asyncio
import json

app = FastAPI()

# In-memory task storage (use Redis/DB in production)
tasks: dict[str, dict] = {}

class TaskStatus(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input-required"
    COMPLETED = "completed"
    FAILED = "failed"

class TaskMessage(BaseModel):
    role: str
    parts: list[dict]

class TaskRequest(BaseModel):
    id: str
    sessionId: str
    message: TaskMessage
    metadata: dict | None = None

# Agent Card endpoint
@app.get("/.well-known/agent.json")
async def get_agent_card():
    return {
        "name": "research-assistant",
        "description": "AI research assistant",
        "url": "https://api.example.com/agents/research",
        "version": "1.0.0",
        "capabilities": {
            "streaming": True,
            "pushNotifications": False
        },
        "skills": [
            {
                "id": "web-search",
                "name": "Web Search",
                "description": "Search the web for information"
            }
        ]
    }

# Send task endpoint
@app.post("/tasks/send")
async def send_task(
    task: TaskRequest,
    background_tasks: BackgroundTasks
):
    # Store task
    tasks[task.id] = {
        "id": task.id,
        "sessionId": task.sessionId,
        "status": TaskStatus.SUBMITTED,
        "message": task.message.dict(),
        "artifacts": []
    }

    # Process in background
    background_tasks.add_task(process_task, task.id)

    return tasks[task.id]

# Get task status
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
    if task_id not in tasks:
        raise HTTPException(404, "Task not found")
    return tasks[task_id]

# Streaming endpoint
@app.post("/tasks/sendSubscribe")
async def send_task_streaming(task: TaskRequest):
    # Store task
    tasks[task.id] = {
        "id": task.id,
        "status": TaskStatus.SUBMITTED,
        "message": task.message.dict()
    }

    async def event_stream():
        # Send status update
        yield f"data: {json.dumps({'type': 'status', 'status': 'working'})}\n\n"

        # Simulate streaming response
        response_text = await generate_response(task.message.parts[0]["text"])

        # Stream response in chunks
        for chunk in response_text.split(" "):
            yield f"data: {json.dumps({'type': 'artifact', 'streaming': True, 'content': chunk + ' '})}\n\n"
            await asyncio.sleep(0.05)

        # Send completion
        yield f"data: {json.dumps({'type': 'done', 'status': 'completed'})}\n\n"

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream"
    )

async def process_task(task_id: str):
    """Background task processing."""
    tasks[task_id]["status"] = TaskStatus.WORKING

    try:
        # Get the user message
        message = tasks[task_id]["message"]["parts"][0]["text"]

        # Generate response (your agent logic here)
        response = await generate_response(message)

        # Store result
        tasks[task_id]["artifacts"] = [
            {"type": "text", "content": response}
        ]
        tasks[task_id]["status"] = TaskStatus.COMPLETED

    except Exception as e:
        tasks[task_id]["status"] = TaskStatus.FAILED
        tasks[task_id]["error"] = str(e)

async def generate_response(message: str) -> str:
    """Your agent logic here."""
    # This is where you'd call your LLM, tools, etc.
    await asyncio.sleep(1)  # Simulate processing
    return f"Research results for: {message}"
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Collections.Concurrent;

[ApiController]
[Route("")]
public class A2AAgentController : ControllerBase
{
    private static readonly ConcurrentDictionary<string, AgentTask> Tasks = new();

    // Agent Card endpoint
    [HttpGet(".well-known/agent.json")]
    public IActionResult GetAgentCard()
    {
        return Ok(new
        {
            name = "research-assistant",
            description = "AI research assistant",
            url = "https://api.example.com/agents/research",
            version = "1.0.0",
            capabilities = new { streaming = true },
            skills = new[]
            {
                new {
                    id = "web-search",
                    name = "Web Search",
                    description = "Search the web for information"
                }
            }
        });
    }

    // Send task endpoint
    [HttpPost("tasks/send")]
    public async Task<IActionResult> SendTask(
        [FromBody] TaskRequest request,
        [FromServices] IBackgroundTaskQueue taskQueue)
    {
        var task = new AgentTask
        {
            Id = request.Id,
            SessionId = request.SessionId,
            Status = TaskStatus.Submitted,
            Message = request.Message
        };

        Tasks[task.Id] = task;

        // Queue background processing
        await taskQueue.QueueAsync(async ct =>
        {
            await ProcessTaskAsync(task.Id, ct);
        });

        return Ok(task);
    }

    // Get task status
    [HttpGet("tasks/{taskId}")]
    public IActionResult GetTask(string taskId)
    {
        if (!Tasks.TryGetValue(taskId, out var task))
            return NotFound();

        return Ok(task);
    }

    // Streaming endpoint
    [HttpPost("tasks/sendSubscribe")]
    public async Task SendTaskStreaming(
        [FromBody] TaskRequest request,
        CancellationToken ct)
    {
        Response.ContentType = "text/event-stream";
        Response.Headers.CacheControl = "no-cache";

        var writer = Response.BodyWriter;

        // Send status
        await WriteEventAsync(writer, new { type = "status", status = "working" });

        // Generate response (your agent logic)
        var message = request.Message.Parts[0].Text;
        var response = await GenerateResponseAsync(message, ct);

        // Stream response
        foreach (var word in response.Split(' '))
        {
            await WriteEventAsync(writer, new
            {
                type = "artifact",
                streaming = true,
                content = word + " "
            });
            await Task.Delay(50, ct);
        }

        // Send completion
        await WriteEventAsync(writer, new { type = "done", status = "completed" });
    }

    private static async Task WriteEventAsync(
        System.IO.Pipelines.PipeWriter writer,
        object data)
    {
        var json = JsonSerializer.Serialize(data);
        var bytes = System.Text.Encoding.UTF8.GetBytes($"data: {json}\n\n");
        await writer.WriteAsync(bytes);
        await writer.FlushAsync();
    }

    private static async Task ProcessTaskAsync(string taskId, CancellationToken ct)
    {
        if (!Tasks.TryGetValue(taskId, out var task))
            return;

        task.Status = TaskStatus.Working;

        try
        {
            var message = task.Message.Parts[0].Text;
            var response = await GenerateResponseAsync(message, ct);

            task.Artifacts = new[]
            {
                new { type = "text", content = response }
            };
            task.Status = TaskStatus.Completed;
        }
        catch (Exception ex)
        {
            task.Status = TaskStatus.Failed;
            task.Error = ex.Message;
        }
    }

    private static async Task<string> GenerateResponseAsync(
        string message,
        CancellationToken ct)
    {
        await Task.Delay(1000, ct);
        return $"Research results for: {message}";
    }
}

Use Cases

Enterprise Agent Marketplace

Organizations publish specialized agents (HR, finance, legal) that other teams can discover and use via A2A.

Multi-Agent Workflows

An orchestrator agent breaks down complex tasks and delegates to specialized agents: research → analysis → writing → review.

Cross-Organization Collaboration

Company A's agent requests data analysis from Company B's specialized analytics agent, with proper authentication and billing.

Agent Ecosystems

Third-party developers build and monetize specialized agents (translation, legal review, code analysis) that any A2A client can use.

A2A vs MCP Comparison

Aspect A2A MCP
Purpose Agent-to-agent communication Agent-to-tool communication
Transport HTTPS (remote) stdio (local process)
Discovery Agent Cards at well-known URL Configuration file
Auth OAuth2, API keys Process-level trust
Streaming SSE (Server-Sent Events) JSON-RPC notifications
Use Case Cross-org agent collaboration Local tool integration

Using Both Protocols

A well-designed agent system might use both: MCP for local tools (file access, database queries) and A2A to delegate specialized tasks to remote agents.

Security Considerations

Authentication Required

Always require authentication for A2A endpoints. Use OAuth2 for cross-organization scenarios or API keys for internal use.

Input Validation

Validate all task inputs against the skill's inputSchema. Never trust data from remote agents without validation.

Rate Limiting

Implement rate limiting to prevent abuse. Track usage per client agent for billing and quota management.

Audit Logging

Log all task requests with client identity, inputs (sanitized), and outcomes for security and debugging.

Related Topics