Agent2Agent Protocol (A2A)
An open protocol by Google for enabling AI agents to discover, communicate, and collaborate with each other across organizational boundaries.
What is A2A?
The Agent2Agent (A2A) protocol defines how autonomous AI agents discover each other's capabilities and exchange tasks. Unlike MCP (which connects agents to tools), A2A enables agent-to-agent collaboration.
Agent Cards
Machine-readable descriptions of what an agent can do, its skills, and how to authenticate
Task Protocol
Standardized way to send tasks, receive results, and handle multi-turn interactions
Streaming
Server-Sent Events for real-time task updates and progressive results
Authentication
OAuth2, API keys, or custom auth for secure cross-organization communication
A2A vs MCP
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ ORCHESTRATOR AGENT │
│ │
│ 1. Discover agents via Agent Cards │
│ 2. Select appropriate agent for task │
│ 3. Send task via A2A protocol │
│ 4. Handle responses (polling or streaming) │
└─────────────────────────────────────────────────────────────────┘
│ │ │
│ HTTPS │ HTTPS │ HTTPS
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ AGENT A │ │ AGENT B │ │ AGENT C │
│ (Research) │ │ (Analysis) │ │ (Writing) │
│ │ │ │ │ │
│ Skills: │ │ Skills: │ │ Skills: │
│ • web-search │ │ • data-viz │ │ • summarize │
│ • doc-fetch │ │ • statistics │ │ • translate │
│ │ │ • ml-predict │ │ • format │
└───────────────┘ └───────────────┘ └───────────────┘
│ │ │
Uses MCP Uses MCP Uses MCP
│ │ │
▼ ▼ ▼
[Tools] [Tools] [Tools] Agent Cards
Agent Cards are JSON documents that describe an agent's capabilities. They're hosted at a well-known URL (/.well-known/agent.json) for discovery:
{
"name": "research-assistant",
"description": "An AI agent that helps with research tasks by searching the web, analyzing documents, and synthesizing information.",
"url": "https://api.example.com/agents/research",
"version": "1.2.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true
},
"authentication": {
"type": "oauth2",
"authorizationUrl": "https://auth.example.com/oauth/authorize",
"tokenUrl": "https://auth.example.com/oauth/token",
"scopes": ["research:read", "research:write"]
},
"skills": [
{
"id": "web-search",
"name": "Web Search",
"description": "Search the internet for current information",
"inputSchema": {
"type": "object",
"properties": {
"query": { "type": "string", "description": "Search query" },
"maxResults": { "type": "integer", "default": 10 }
},
"required": ["query"]
},
"outputSchema": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": { "type": "string" },
"url": { "type": "string" },
"snippet": { "type": "string" }
}
}
}
},
{
"id": "document-analysis",
"name": "Document Analysis",
"description": "Analyze and extract insights from documents",
"inputSchema": {
"type": "object",
"properties": {
"documentUrl": { "type": "string" },
"analysisType": {
"type": "string",
"enum": ["summary", "key-points", "sentiment", "entities"]
}
},
"required": ["documentUrl", "analysisType"]
}
}
],
"defaultInputModes": ["text"],
"defaultOutputModes": ["text", "structured"],
"provider": {
"name": "Example AI Corp",
"url": "https://example.com",
"contactEmail": "support@example.com"
}
} name: research-assistant
description: An AI agent that helps with research tasks
url: https://api.example.com/agents/research
version: 1.2.0
capabilities:
streaming: true
pushNotifications: true
stateTransitionHistory: true
authentication:
type: oauth2
authorizationUrl: https://auth.example.com/oauth/authorize
tokenUrl: https://auth.example.com/oauth/token
scopes:
- research:read
- research:write
skills:
- id: web-search
name: Web Search
description: Search the internet for current information
inputSchema:
type: object
properties:
query:
type: string
description: Search query
maxResults:
type: integer
default: 10
required: [query]
- id: document-analysis
name: Document Analysis
description: Analyze and extract insights from documents
inputSchema:
type: object
properties:
documentUrl:
type: string
analysisType:
type: string
enum: [summary, key-points, sentiment, entities]
required: [documentUrl, analysisType]
provider:
name: Example AI Corp
url: https://example.com Key Components
| Field | Purpose | Required |
|---|---|---|
name | Unique identifier for the agent | Yes |
description | Human-readable description of capabilities | Yes |
url | Base URL for the agent's A2A endpoints | Yes |
skills | List of specific capabilities with schemas | Yes |
capabilities | Protocol features supported (streaming, etc.) | No |
authentication | How to authenticate requests | No |
Skill Schemas
inputSchema and outputSchema for each skill using JSON Schema. This enables client agents to validate inputs and understand outputs.
Task Protocol
Tasks are the primary unit of work in A2A. A task goes through several states:
┌─────────────┐
│ SUBMITTED │
└──────┬──────┘
│
▼
┌─────────────┐
┌──────│ WORKING │──────┐
│ └──────┬──────┘ │
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────┐ ┌──────────┐
│INPUT-REQUIRED│ │COMPLETED│ │ FAILED │
└──────┬──────┘ └─────────┘ └──────────┘
│
│ (user provides input)
│
└──────────► WORKING # A2A Task Lifecycle
# 1. CLIENT SENDS TASK
task = {
id: generateUUID(),
sessionId: "session-123", # For multi-turn conversations
message: {
role: "user",
parts: [
{ type: "text", text: "Research the latest AI safety papers" }
]
},
metadata: {
priority: "high",
deadline: "2025-01-27T00:00:00Z"
}
}
response = POST(agentUrl + "/tasks/send", task)
# 2. SERVER ACKNOWLEDGES AND STARTS PROCESSING
# Response includes task status
assert response.status == "submitted" or response.status == "working"
# 3. CLIENT POLLS FOR UPDATES (or uses streaming)
while task.status not in ["completed", "failed", "canceled"]:
statusResponse = GET(agentUrl + "/tasks/" + task.id)
task = statusResponse.task
if task.status == "input-required":
# Agent needs more information
userInput = getUserInput(task.message)
POST(agentUrl + "/tasks/" + task.id + "/send", userInput)
# 4. GET FINAL RESULT
if task.status == "completed":
artifacts = task.artifacts
for artifact in artifacts:
print(artifact.type, artifact.content) import httpx
import asyncio
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input-required"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
@dataclass
class A2ATask:
id: str
status: TaskStatus
message: dict | None = None
artifacts: list | None = None
history: list | None = None
class A2AClient:
def __init__(self, agent_url: str, auth_token: str):
self.agent_url = agent_url.rstrip("/")
self.client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {auth_token}"}
)
async def send_task(
self,
message: str,
session_id: str | None = None,
metadata: dict | None = None
) -> A2ATask:
"""Send a new task to the agent."""
task_data = {
"id": str(uuid4()),
"sessionId": session_id or str(uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
if metadata:
task_data["metadata"] = metadata
response = await self.client.post(
f"{self.agent_url}/tasks/send",
json=task_data
)
response.raise_for_status()
return self._parse_task(response.json())
async def get_task_status(self, task_id: str) -> A2ATask:
"""Get current status of a task."""
response = await self.client.get(
f"{self.agent_url}/tasks/{task_id}"
)
response.raise_for_status()
return self._parse_task(response.json())
async def send_input(
self,
task_id: str,
message: str
) -> A2ATask:
"""Send additional input for a task that requires it."""
response = await self.client.post(
f"{self.agent_url}/tasks/{task_id}/send",
json={
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
)
response.raise_for_status()
return self._parse_task(response.json())
async def wait_for_completion(
self,
task_id: str,
poll_interval: float = 1.0,
timeout: float = 300.0
) -> A2ATask:
"""Poll until task completes or times out."""
elapsed = 0.0
while elapsed < timeout:
task = await self.get_task_status(task_id)
if task.status in [
TaskStatus.COMPLETED,
TaskStatus.FAILED,
TaskStatus.CANCELED
]:
return task
if task.status == TaskStatus.INPUT_REQUIRED:
raise InterruptedError(
f"Task requires input: {task.message}"
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(f"Task {task_id} did not complete")
def _parse_task(self, data: dict) -> A2ATask:
return A2ATask(
id=data["id"],
status=TaskStatus(data["status"]),
message=data.get("message"),
artifacts=data.get("artifacts"),
history=data.get("history")
)
# Usage
async def main():
client = A2AClient(
agent_url="https://api.example.com/agents/research",
auth_token="your-token"
)
# Send a task
task = await client.send_task(
"Research the latest developments in AI safety"
)
print(f"Task submitted: {task.id}")
# Wait for completion
result = await client.wait_for_completion(task.id)
if result.status == TaskStatus.COMPLETED:
for artifact in result.artifacts or []:
print(f"Artifact: {artifact}")
asyncio.run(main()) using System.Net.Http.Json;
using System.Text.Json;
public enum TaskStatus
{
Submitted,
Working,
InputRequired,
Completed,
Failed,
Canceled
}
public record A2ATask(
string Id,
TaskStatus Status,
JsonElement? Message = null,
JsonElement[]? Artifacts = null,
JsonElement[]? History = null
);
public class A2AClient : IDisposable
{
private readonly HttpClient _client;
private readonly string _agentUrl;
public A2AClient(string agentUrl, string authToken)
{
_agentUrl = agentUrl.TrimEnd('/');
_client = new HttpClient();
_client.DefaultRequestHeaders.Add("Authorization", $"Bearer {authToken}");
}
public async Task<A2ATask> SendTaskAsync(
string message,
string? sessionId = null,
Dictionary<string, object>? metadata = null,
CancellationToken ct = default)
{
var taskData = new
{
id = Guid.NewGuid().ToString(),
sessionId = sessionId ?? Guid.NewGuid().ToString(),
message = new
{
role = "user",
parts = new[] { new { type = "text", text = message } }
},
metadata
};
var response = await _client.PostAsJsonAsync(
$"{_agentUrl}/tasks/send",
taskData,
ct
);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<A2ATask>(ct)
?? throw new InvalidOperationException("Invalid response");
}
public async Task<A2ATask> GetTaskStatusAsync(
string taskId,
CancellationToken ct = default)
{
var response = await _client.GetAsync(
$"{_agentUrl}/tasks/{taskId}",
ct
);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<A2ATask>(ct)
?? throw new InvalidOperationException("Invalid response");
}
public async Task<A2ATask> WaitForCompletionAsync(
string taskId,
TimeSpan? pollInterval = null,
TimeSpan? timeout = null,
CancellationToken ct = default)
{
var interval = pollInterval ?? TimeSpan.FromSeconds(1);
var maxWait = timeout ?? TimeSpan.FromMinutes(5);
var elapsed = TimeSpan.Zero;
while (elapsed < maxWait)
{
var task = await GetTaskStatusAsync(taskId, ct);
if (task.Status is TaskStatus.Completed
or TaskStatus.Failed
or TaskStatus.Canceled)
{
return task;
}
if (task.Status == TaskStatus.InputRequired)
{
throw new InvalidOperationException(
$"Task requires input: {task.Message}"
);
}
await Task.Delay(interval, ct);
elapsed += interval;
}
throw new TimeoutException($"Task {taskId} did not complete");
}
public void Dispose() => _client.Dispose();
}
// Usage
await using var client = new A2AClient(
"https://api.example.com/agents/research",
"your-token"
);
var task = await client.SendTaskAsync(
"Research the latest developments in AI safety"
);
Console.WriteLine($"Task submitted: {task.Id}");
var result = await client.WaitForCompletionAsync(task.Id);
if (result.Status == TaskStatus.Completed)
{
foreach (var artifact in result.Artifacts ?? Array.Empty<JsonElement>())
{
Console.WriteLine($"Artifact: {artifact}");
}
} Task Endpoints
| Endpoint | Method | Purpose |
|---|---|---|
/tasks/send | POST | Submit a new task |
/tasks/{id} | GET | Get task status and results |
/tasks/{id}/send | POST | Send additional input to a task |
/tasks/{id}/cancel | POST | Cancel a running task |
/tasks/sendSubscribe | POST | Submit task with SSE streaming |
Streaming Responses
For long-running tasks, A2A supports Server-Sent Events (SSE) to stream results in real-time:
Streaming Implementation # A2A Streaming with Server-Sent Events (SSE)
function streamTask(agentUrl, task):
# Open SSE connection
eventSource = new EventSource(
agentUrl + "/tasks/sendSubscribe",
method: POST,
body: task,
headers: { "Content-Type": "application/json" }
)
eventSource.onMessage = (event) =>
data = JSON.parse(event.data)
switch data.type:
case "status":
print("Status:", data.status)
case "artifact":
# Partial artifact (streaming content)
if data.append:
currentArtifact += data.content
else:
artifacts.append(data)
case "message":
# Agent message (e.g., thinking, asking for input)
print("Agent:", data.content)
case "done":
# Task completed
eventSource.close()
return { status: data.status, artifacts: artifacts }
case "error":
eventSource.close()
throw Error(data.message)
eventSource.onError = (error) =>
eventSource.close()
throw error
import httpx
from typing import AsyncIterator
from dataclasses import dataclass
@dataclass
class StreamEvent:
type: str
data: dict
async def stream_task(
agent_url: str,
message: str,
auth_token: str,
session_id: str | None = None
) -> AsyncIterator[StreamEvent]:
"""Stream task results using Server-Sent Events."""
task_data = {
"id": str(uuid4()),
"sessionId": session_id or str(uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{agent_url}/tasks/sendSubscribe",
json=task_data,
headers={
"Authorization": f"Bearer {auth_token}",
"Accept": "text/event-stream"
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
yield StreamEvent(
type=data.get("type", "unknown"),
data=data
)
if data.get("type") in ["done", "error"]:
return
# Usage with streaming
async def research_with_streaming():
full_response = ""
async for event in stream_task(
agent_url="https://api.example.com/agents/research",
message="Research quantum computing breakthroughs in 2025",
auth_token="your-token"
):
if event.type == "status":
print(f"Status: {event.data['status']}")
elif event.type == "artifact":
if event.data.get("streaming"):
# Streaming text content
chunk = event.data.get("content", "")
print(chunk, end="", flush=True)
full_response += chunk
else:
# Complete artifact
print(f"\nArtifact: {event.data}")
elif event.type == "message":
print(f"Agent: {event.data.get('content')}")
elif event.type == "done":
print(f"\nCompleted with status: {event.data['status']}")
elif event.type == "error":
print(f"Error: {event.data.get('message')}")
raise Exception(event.data.get("message"))
asyncio.run(research_with_streaming())
using System.Net.Http.Json;
using System.Runtime.CompilerServices;
public record StreamEvent(string Type, JsonElement Data);
public class A2AStreamingClient : IDisposable
{
private readonly HttpClient _client;
private readonly string _agentUrl;
public A2AStreamingClient(string agentUrl, string authToken)
{
_agentUrl = agentUrl.TrimEnd('/');
_client = new HttpClient { Timeout = TimeSpan.FromMinutes(10) };
_client.DefaultRequestHeaders.Add("Authorization", $"Bearer {authToken}");
_client.DefaultRequestHeaders.Add("Accept", "text/event-stream");
}
public async IAsyncEnumerable<StreamEvent> StreamTaskAsync(
string message,
string? sessionId = null,
[EnumeratorCancellation] CancellationToken ct = default)
{
var taskData = new
{
id = Guid.NewGuid().ToString(),
sessionId = sessionId ?? Guid.NewGuid().ToString(),
message = new
{
role = "user",
parts = new[] { new { type = "text", text = message } }
}
};
using var request = new HttpRequestMessage(
HttpMethod.Post,
$"{_agentUrl}/tasks/sendSubscribe"
);
request.Content = JsonContent.Create(taskData);
using var response = await _client.SendAsync(
request,
HttpCompletionOption.ResponseHeadersRead,
ct
);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync(ct);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream && !ct.IsCancellationRequested)
{
var line = await reader.ReadLineAsync(ct);
if (string.IsNullOrEmpty(line) || !line.StartsWith("data: "))
continue;
var jsonData = line.Substring(6);
var data = JsonDocument.Parse(jsonData).RootElement;
var eventType = data.GetProperty("type").GetString() ?? "unknown";
yield return new StreamEvent(eventType, data);
if (eventType is "done" or "error")
yield break;
}
}
public void Dispose() => _client.Dispose();
}
// Usage
await using var client = new A2AStreamingClient(
"https://api.example.com/agents/research",
"your-token"
);
var fullResponse = new StringBuilder();
await foreach (var evt in client.StreamTaskAsync(
"Research quantum computing breakthroughs in 2025"
))
{
switch (evt.Type)
{
case "status":
Console.WriteLine($"Status: {evt.Data.GetProperty("status")}");
break;
case "artifact":
if (evt.Data.TryGetProperty("streaming", out var streaming)
&& streaming.GetBoolean())
{
var chunk = evt.Data.GetProperty("content").GetString();
Console.Write(chunk);
fullResponse.Append(chunk);
}
break;
case "done":
Console.WriteLine($"\nCompleted: {evt.Data.GetProperty("status")}");
break;
case "error":
throw new Exception(evt.Data.GetProperty("message").GetString());
}
}
Event Types
Event Type Purpose Data status Task status changed { status: "working" } artifact Result content (can stream) { streaming: true, content: "..." } message Agent message/question { role: "agent", content: "..." } done Task completed { status: "completed" } error Error occurred { message: "..." }
Building an A2A Server
Here's how to implement an A2A-compliant agent server:
A2A Server Implementation from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from enum import Enum
import asyncio
import json
app = FastAPI()
# In-memory task storage (use Redis/DB in production)
tasks: dict[str, dict] = {}
class TaskStatus(str, Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input-required"
COMPLETED = "completed"
FAILED = "failed"
class TaskMessage(BaseModel):
role: str
parts: list[dict]
class TaskRequest(BaseModel):
id: str
sessionId: str
message: TaskMessage
metadata: dict | None = None
# Agent Card endpoint
@app.get("/.well-known/agent.json")
async def get_agent_card():
return {
"name": "research-assistant",
"description": "AI research assistant",
"url": "https://api.example.com/agents/research",
"version": "1.0.0",
"capabilities": {
"streaming": True,
"pushNotifications": False
},
"skills": [
{
"id": "web-search",
"name": "Web Search",
"description": "Search the web for information"
}
]
}
# Send task endpoint
@app.post("/tasks/send")
async def send_task(
task: TaskRequest,
background_tasks: BackgroundTasks
):
# Store task
tasks[task.id] = {
"id": task.id,
"sessionId": task.sessionId,
"status": TaskStatus.SUBMITTED,
"message": task.message.dict(),
"artifacts": []
}
# Process in background
background_tasks.add_task(process_task, task.id)
return tasks[task.id]
# Get task status
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
if task_id not in tasks:
raise HTTPException(404, "Task not found")
return tasks[task_id]
# Streaming endpoint
@app.post("/tasks/sendSubscribe")
async def send_task_streaming(task: TaskRequest):
# Store task
tasks[task.id] = {
"id": task.id,
"status": TaskStatus.SUBMITTED,
"message": task.message.dict()
}
async def event_stream():
# Send status update
yield f"data: {json.dumps({'type': 'status', 'status': 'working'})}\n\n"
# Simulate streaming response
response_text = await generate_response(task.message.parts[0]["text"])
# Stream response in chunks
for chunk in response_text.split(" "):
yield f"data: {json.dumps({'type': 'artifact', 'streaming': True, 'content': chunk + ' '})}\n\n"
await asyncio.sleep(0.05)
# Send completion
yield f"data: {json.dumps({'type': 'done', 'status': 'completed'})}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
async def process_task(task_id: str):
"""Background task processing."""
tasks[task_id]["status"] = TaskStatus.WORKING
try:
# Get the user message
message = tasks[task_id]["message"]["parts"][0]["text"]
# Generate response (your agent logic here)
response = await generate_response(message)
# Store result
tasks[task_id]["artifacts"] = [
{"type": "text", "content": response}
]
tasks[task_id]["status"] = TaskStatus.COMPLETED
except Exception as e:
tasks[task_id]["status"] = TaskStatus.FAILED
tasks[task_id]["error"] = str(e)
async def generate_response(message: str) -> str:
"""Your agent logic here."""
# This is where you'd call your LLM, tools, etc.
await asyncio.sleep(1) # Simulate processing
return f"Research results for: {message}"
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Collections.Concurrent;
[ApiController]
[Route("")]
public class A2AAgentController : ControllerBase
{
private static readonly ConcurrentDictionary<string, AgentTask> Tasks = new();
// Agent Card endpoint
[HttpGet(".well-known/agent.json")]
public IActionResult GetAgentCard()
{
return Ok(new
{
name = "research-assistant",
description = "AI research assistant",
url = "https://api.example.com/agents/research",
version = "1.0.0",
capabilities = new { streaming = true },
skills = new[]
{
new {
id = "web-search",
name = "Web Search",
description = "Search the web for information"
}
}
});
}
// Send task endpoint
[HttpPost("tasks/send")]
public async Task<IActionResult> SendTask(
[FromBody] TaskRequest request,
[FromServices] IBackgroundTaskQueue taskQueue)
{
var task = new AgentTask
{
Id = request.Id,
SessionId = request.SessionId,
Status = TaskStatus.Submitted,
Message = request.Message
};
Tasks[task.Id] = task;
// Queue background processing
await taskQueue.QueueAsync(async ct =>
{
await ProcessTaskAsync(task.Id, ct);
});
return Ok(task);
}
// Get task status
[HttpGet("tasks/{taskId}")]
public IActionResult GetTask(string taskId)
{
if (!Tasks.TryGetValue(taskId, out var task))
return NotFound();
return Ok(task);
}
// Streaming endpoint
[HttpPost("tasks/sendSubscribe")]
public async Task SendTaskStreaming(
[FromBody] TaskRequest request,
CancellationToken ct)
{
Response.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
var writer = Response.BodyWriter;
// Send status
await WriteEventAsync(writer, new { type = "status", status = "working" });
// Generate response (your agent logic)
var message = request.Message.Parts[0].Text;
var response = await GenerateResponseAsync(message, ct);
// Stream response
foreach (var word in response.Split(' '))
{
await WriteEventAsync(writer, new
{
type = "artifact",
streaming = true,
content = word + " "
});
await Task.Delay(50, ct);
}
// Send completion
await WriteEventAsync(writer, new { type = "done", status = "completed" });
}
private static async Task WriteEventAsync(
System.IO.Pipelines.PipeWriter writer,
object data)
{
var json = JsonSerializer.Serialize(data);
var bytes = System.Text.Encoding.UTF8.GetBytes($"data: {json}\n\n");
await writer.WriteAsync(bytes);
await writer.FlushAsync();
}
private static async Task ProcessTaskAsync(string taskId, CancellationToken ct)
{
if (!Tasks.TryGetValue(taskId, out var task))
return;
task.Status = TaskStatus.Working;
try
{
var message = task.Message.Parts[0].Text;
var response = await GenerateResponseAsync(message, ct);
task.Artifacts = new[]
{
new { type = "text", content = response }
};
task.Status = TaskStatus.Completed;
}
catch (Exception ex)
{
task.Status = TaskStatus.Failed;
task.Error = ex.Message;
}
}
private static async Task<string> GenerateResponseAsync(
string message,
CancellationToken ct)
{
await Task.Delay(1000, ct);
return $"Research results for: {message}";
}
}
Use Cases
Enterprise Agent Marketplace
Organizations publish specialized agents (HR, finance, legal) that other teams can discover and use via A2A.
Multi-Agent Workflows
An orchestrator agent breaks down complex tasks and delegates to specialized agents: research → analysis → writing → review.
Cross-Organization Collaboration
Company A's agent requests data analysis from Company B's specialized analytics agent, with proper authentication and billing.
Agent Ecosystems
Third-party developers build and monetize specialized agents (translation, legal review, code analysis) that any A2A client can use.
A2A vs MCP Comparison
Aspect A2A MCP Purpose Agent-to-agent communication Agent-to-tool communication Transport HTTPS (remote) stdio (local process) Discovery Agent Cards at well-known URL Configuration file Auth OAuth2, API keys Process-level trust Streaming SSE (Server-Sent Events) JSON-RPC notifications Use Case Cross-org agent collaboration Local tool integration
Using Both Protocols
A well-designed agent system might use both: MCP for local tools (file access, database queries) and A2A to delegate specialized tasks to remote agents.
Security Considerations
Authentication Required
Always require authentication for A2A endpoints. Use OAuth2 for cross-organization scenarios or API keys for internal use.
Input Validation
Validate all task inputs against the skill's inputSchema. Never trust data from remote agents without validation.
Rate Limiting
Implement rate limiting to prevent abuse. Track usage per client agent for billing and quota management.
Audit Logging
Log all task requests with client identity, inputs (sanitized), and outcomes for security and debugging.
Related Topics