Overview
The Agent-to-Agent (A2A) Protocol defines standard specifications for agent communication and message formatting, enabling seamless interoperability between different AI agents.
Every agent served with agent.serve() automatically becomes A2A-compatible with standardized endpoints and agent card discovery.
Key Features
Standard Communication - JSON-RPC 2.0 based messaging
Agent Discovery - Automatic agent card generation at /.well-known/agent-card.json
Rich Interactions - Support for tasks, status updates, and artifact streaming
Protocol Version - Implements A2A protocol v0.3.0
Quick Start
Serving an agent automatically enables A2A protocol (src/agentor/core/agent.py:513):
from agentor import Agentor
agent = Agentor(
name = "Weather Agent" ,
model = "gpt-5-mini" ,
tools = [ "get_weather" ],
)
# Serve with A2A protocol enabled
agent.serve( port = 8000 )
The agent card is now available at:
http://localhost:8000/.well-known/agent-card.json
Agent Card
The agent card describes agent capabilities, endpoints, and skills (src/agentor/a2a.py:53):
{
"name" : "Weather Agent" ,
"description" : "Agent instructions and description" ,
"url" : "http://localhost:8000" ,
"version" : "0.0.1" ,
"skills" : [
{
"id" : "tool_get_weather" ,
"name" : "get_weather" ,
"description" : "Returns the weather in the given city" ,
"tags" : []
}
],
"capabilities" : {
"streaming" : true ,
"statefulness" : true ,
"asyncProcessing" : true
},
"defaultInputModes" : [ "application/json" ],
"securitySchemes" : {},
"security" : []
}
Agent Capabilities
The capabilities object (src/agentor/a2a.py:40) indicates:
streaming - Supports Server-Sent Events for real-time responses
statefulness - Maintains conversation context across requests
asyncProcessing - Can handle long-running tasks
A2A Controller
The A2AController (src/agentor/a2a.py:20) implements the A2A protocol on top of FastAPI:
from agentor.a2a import A2AController, AgentSkill
controller = A2AController(
name = "My Agent" ,
description = "Agent description" ,
url = "http://localhost:8000" ,
version = "1.0.0" ,
skills = [
AgentSkill(
id = "skill_1" ,
name = "Skill Name" ,
description = "What the skill does" ,
tags = [ "category" ]
)
]
)
Custom Endpoints
Add custom routes to the controller (src/agentor/core/agent.py:554):
controller.add_api_route( "/chat" , chat_handler, methods = [ "POST" ])
controller.add_api_route( "/health" , health_handler, methods = [ "GET" ])
JSON-RPC Methods
A2A protocol implements these JSON-RPC 2.0 methods (src/agentor/a2a.py:90):
message/send
Send a non-streaming message:
{
"jsonrpc" : "2.0" ,
"id" : 1 ,
"method" : "message/send" ,
"params" : {
"message" : {
"parts" : [
{
"kind" : "text" ,
"text" : "What is the weather in London?"
}
]
}
}
}
message/stream
Send a streaming message with Server-Sent Events (src/agentor/a2a.py:114):
{
"jsonrpc" : "2.0" ,
"id" : 2 ,
"method" : "message/stream" ,
"params" : {
"message" : {
"parts" : [
{
"kind" : "text" ,
"text" : "Tell me a story"
}
]
}
}
}
The response is an event stream with:
Task - Initial task object
TaskArtifactUpdateEvent - Streaming content updates
TaskStatusUpdateEvent - Final completion status
tasks/get
Retrieve task status:
{
"jsonrpc" : "2.0" ,
"id" : 3 ,
"method" : "tasks/get" ,
"params" : { "taskId" : "task_uuid" }
}
tasks/cancel
Cancel a running task:
{
"jsonrpc" : "2.0" ,
"id" : 4 ,
"method" : "tasks/cancel" ,
"params" : { "taskId" : "task_uuid" }
}
Streaming Implementation
The streaming handler (src/agentor/core/agent.py:579) sends events in Server-Sent Events format:
async def _message_stream_handler (
self , request : SendStreamingMessageRequest
) -> StreamingResponse:
async def event_generator ():
task_id = f "task_ { uuid.uuid4() } "
context_id = f "ctx_ { uuid.uuid4() } "
artifact_id = f "artifact_ { uuid.uuid4() } "
# Send initial task
task = Task(
id = task_id,
context_id = context_id,
status = TaskStatus( state = TaskState.working)
)
yield f "data: { json.dumps(task) }
"
# Stream artifact updates
async for event in agent.stream_chat(input_text):
artifact = Artifact(
artifact_id = artifact_id,
name = "response" ,
parts = [Part( root = TextPart( text = event.message))]
)
artifact_update = TaskArtifactUpdateEvent(
kind = "artifact-update" ,
task_id = task_id,
artifact = artifact,
append = True
)
yield f "data: { json.dumps(artifact_update) }
"
# Send completion
final_status = TaskStatusUpdateEvent(
task_id = task_id,
status = TaskStatus( state = TaskState.completed),
final = True
)
yield f "data: { json.dumps(final_status) }
"
return StreamingResponse(
event_generator(),
media_type = "text/event-stream"
)
Task States
Tasks progress through these states:
working - Task is processing
completed - Task finished successfully
failed - Task encountered an error
Error Handling
Errors are reported in the JSON-RPC error format (src/agentor/core/schema.py):
{
"jsonrpc" : "2.0" ,
"id" : 1 ,
"error" : {
"code" : -32601 ,
"message" : "Method not found"
}
}
Error Codes
class JSONRPCReturnCodes :
METHOD_NOT_FOUND = - 32601
INVALID_PARAMS = - 32602
INTERNAL_ERROR = - 32603
Custom Handlers
Register custom A2A handlers (src/agentor/core/agent.py:576):
def _register_a2a_handlers ( self , controller : A2AController):
controller.add_handler( "message/stream" , self ._message_stream_handler)
controller.add_handler( "message/send" , self ._message_send_handler)
Implement your own handler:
from agentor.a2a import A2AController
from a2a.types import JSONRPCRequest, JSONRPCResponse
async def custom_handler ( request : JSONRPCRequest) -> JSONRPCResponse:
# Process request
return JSONRPCResponse(
id = request.id,
result = { "status" : "success" }
)
controller.add_handler( "custom/method" , custom_handler)
Agent Skills in A2A
Tools are automatically exposed as agent skills (src/agentor/core/agent.py:535):
skills = [
AgentSkill(
id = f "tool_ { tool.name.lower().replace( ' ' , '_' ) } " ,
name = tool.name,
description = tool.description,
tags = []
)
for tool in self .tools
]
controller = A2AController(
name = self .name,
skills = skills,
url = f "http:// { host } : { port } "
)
Complete Server Example
From examples/agent-server/main.py:
from agentor import Agentor
agent = Agentor(
name = "Weather Agent" ,
model = "gpt-5-mini" ,
tools = [ "get_weather" ],
)
# Automatic A2A protocol support
agent.serve( port = 8000 )
Test the agent card:
curl http://localhost:8000/.well-known/agent-card.json
Send a message:
curl -X POST http://localhost:8000/ \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "message/stream",
"params": {
"message": {
"parts": [{"kind": "text", "text": "Hello!"}]
}
}
}'
Integration with FastAPI
The A2A controller is a FastAPI router (src/agentor/core/agent.py:559):
from fastapi import FastAPI
app = FastAPI()
app.include_router(controller)
This enables:
Automatic OpenAPI documentation
Dependency injection
Middleware support
Request validation
Protocol Versioning
The current implementation uses A2A protocol v0.3.0. The agent card includes:
{
"version" : "0.0.1" ,
"signatures" : []
}
Future versions may add cryptographic signatures for agent verification.
Next Steps
Agents Learn about agent architecture
Deployment Deploy A2A-compatible agents
MCP Protocol Compare with Model Context Protocol
Last modified on March 20, 2026