A2A (Agent-to-Agent) Communication Actions#
Parent document: Actions Index Story: TEA-AGENT-001.5 Inter-Agent Communication
Overview#
A2A actions enable communication between multiple agents running in the same process or namespace. These primitives support message passing, task delegation, shared state coordination, and agent discovery.
Key Features:
Direct messaging and broadcast
Request/response delegation with timeout
Shared state with optimistic locking
Agent discovery by capability
Namespace isolation
Actions#
Action |
Description |
|---|---|
Send message to specific agent |
|
Receive messages from agents |
|
Broadcast to all agents in namespace |
|
Request/response task delegation |
|
Get shared state value |
|
Set shared state value |
|
Discover available agents |
|
Register agent for discovery |
|
Unregister agent |
|
Update agent last_seen timestamp |
Configuration#
Configure A2A settings in your YAML agent:
settings:
a2a:
enabled: true
agent_id: "coordinator" # This agent's ID
namespace: "research-team" # Namespace for isolation
discovery: broadcast # Discovery mode: broadcast, registry, static
a2a.send#
Send a message to a specific agent.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
Yes |
- |
Recipient agent ID |
|
object |
Yes |
- |
Message with |
|
bool |
No |
|
Wait for delivery confirmation |
|
string |
No |
- |
ID for request/response matching |
|
int |
No |
- |
Time-to-live in seconds |
Example#
- name: send_status
uses: a2a.send
with:
to: coordinator
message:
type: status_update
payload:
progress: "{{ state.progress }}"
task_id: "{{ state.task_id }}"
confirm: true
Return Value#
{
"a2a_message_sent": true,
"a2a_message_id": "msg_abc123" # Only if confirm=true
}
a2a.receive#
Receive messages from agents with optional filtering.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
list |
No |
- |
Filter by sender agent IDs |
|
string |
No |
- |
Filter by message type |
|
string |
No |
|
Timeout (e.g., “30s”, “5m”) |
|
bool |
No |
|
Wait for messages from ALL agents |
Example#
- name: gather_results
uses: a2a.receive
with:
from_agents: [worker-1, worker-2, worker-3]
type: task_result
timeout: 60s
require_all: true
output: worker_results
Return Value#
# Success
{
"a2a_messages": [
{
"from_agent": "worker-1",
"to_agent": "coordinator",
"namespace": "research-team",
"type": "task_result",
"payload": {"result": "..."},
"timestamp": 1704067200.0,
"correlation_id": null
}
]
}
# Timeout
{
"a2a_messages": [],
"a2a_timeout": true,
"a2a_error": "Timeout waiting for messages from [worker-3]"
}
a2a.broadcast#
Broadcast a message to all agents in the namespace.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
object |
Yes |
- |
Message with |
|
string |
No |
- |
Filter recipients by agent type |
|
int |
No |
- |
Time-to-live in seconds |
Example#
- name: announce_completion
uses: a2a.broadcast
with:
message:
type: announcement
payload:
event: task_complete
result: "{{ state.final_result }}"
agent_type_filter: worker # Only send to workers
Return Value#
{
"a2a_broadcast_count": 5 # Number of agents that received the message
}
a2a.delegate#
Delegate a task to another agent and wait for the response (request/response pattern).
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
Yes |
- |
Delegate agent ID |
|
object |
Yes |
- |
Task with |
|
string |
No |
|
Timeout for response |
|
string |
No |
|
Strategy: raise, fallback_local, retry |
|
object |
No |
- |
Fallback action config |
Example#
- name: delegate_search
uses: a2a.delegate
with:
to: search-specialist
task:
type: web_search
query: "{{ state.query }}"
max_results: 10
timeout: 60s
on_timeout: fallback_local
fallback:
action: web.search
with:
query: "{{ state.query }}"
output: search_results
Return Value#
# Success
{
"a2a_delegation_result": {"results": [...]},
"a2a_delegation_success": true
}
# Timeout with fallback
{
"a2a_delegation_result": {"results": [...]}, # From fallback action
"a2a_delegation_success": false,
"a2a_delegation_fallback": true
}
# Timeout without fallback
{
"a2a_delegation_result": null,
"a2a_delegation_success": false,
"a2a_delegation_timeout": true,
"a2a_error": "Delegation to search-specialist timed out after 60.0s"
}
a2a.state.get#
Get a value from shared state (accessible by all agents in namespace).
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
Yes |
- |
State key |
|
any |
No |
|
Default value if key doesn’t exist |
Example#
- name: get_progress
uses: a2a.state.get
with:
key: team_progress
default:
completed: 0
total: 0
output: progress
Return Value#
{
"a2a_shared_state": {"completed": 5, "total": 10},
"a2a_state_version": 3 # Version for optimistic locking
}
a2a.state.set#
Set a value in shared state with optional optimistic locking.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
Yes |
- |
State key |
|
any |
Yes |
- |
Value to store |
|
int |
No |
- |
Time-to-live in seconds |
|
int |
No |
- |
Version for optimistic locking |
Example#
- name: update_progress
uses: a2a.state.set
with:
key: team_progress
value:
completed: "{{ state.completed_tasks }}"
total: "{{ state.total_tasks }}"
expected_version: "{{ state.progress.a2a_state_version }}"
ttl: 3600
output: update_result
Return Value#
# Success
{
"a2a_state_version": 4,
"a2a_state_updated": true
}
# Conflict (optimistic lock failure)
{
"a2a_state_updated": false,
"a2a_state_conflict": true,
"a2a_error": "Version conflict: expected 3, actual 4",
"a2a_state_expected_version": 3,
"a2a_state_actual_version": 4
}
a2a.discover#
Discover available agents in the namespace.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
No |
- |
Filter by capability |
|
string |
No |
- |
Filter by agent type |
|
string |
No |
- |
Filter by status (active, idle, etc.) |
Example#
- name: find_summarizers
uses: a2a.discover
with:
capability: summarize
status: active
output: available_agents
Return Value#
{
"a2a_agents": [
{
"agent_id": "summarizer-1",
"namespace": "research-team",
"capabilities": ["summarize", "translate"],
"agent_type": "worker",
"status": "active",
"last_seen": 1704067200.0,
"metadata": {"model": "gpt-4"}
}
]
}
a2a.register#
Register the current agent for discovery and broadcast messages.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
No |
From config |
Agent ID override |
|
list |
No |
|
List of capabilities |
|
string |
No |
- |
Agent type category |
|
object |
No |
|
Additional metadata |
Example#
- name: register_worker
uses: a2a.register
with:
capabilities: [search, summarize, translate]
agent_type: worker
metadata:
model: gpt-4
version: "1.0"
Return Value#
{
"a2a_registered": true,
"a2a_agent_info": {
"agent_id": "worker-1",
"namespace": "research-team",
"capabilities": ["search", "summarize", "translate"],
"agent_type": "worker",
"status": "active",
"last_seen": 1704067200.0
},
"_agent_id": "worker-1",
"_namespace": "research-team"
}
a2a.unregister#
Unregister the current agent from discovery and broadcasts.
Parameters#
Parameter |
Type |
Required |
Default |
Description |
|---|---|---|---|---|
|
string |
No |
From config |
Agent ID override |
Example#
- name: cleanup
uses: a2a.unregister
a2a.heartbeat#
Send heartbeat to update the agent’s last_seen timestamp.
Example#
- name: keepalive
uses: a2a.heartbeat
Return Value#
{
"a2a_heartbeat": 1704067200.0,
"a2a_status": "active"
}
Complete Example: Coordinator-Worker Pattern#
Coordinator Agent#
name: coordinator-agent
description: Coordinates worker agents for parallel research
settings:
a2a:
enabled: true
agent_id: coordinator
namespace: research-team
nodes:
- name: register
uses: a2a.register
with:
capabilities: [coordinate, aggregate]
agent_type: coordinator
- name: discover_workers
uses: a2a.discover
with:
capability: search
status: active
output: workers
- name: distribute_tasks
run: |
workers = state["workers"]["a2a_agents"]
tasks = state["subtasks"]
assignments = []
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
assignments.append({"worker": worker["agent_id"], "task": task})
return {"assignments": assignments}
- name: delegate_tasks
type: dynamic_parallel
items: "{{ state.assignments }}"
action: a2a.delegate
with:
to: "{{ item.worker }}"
task:
type: search
query: "{{ item.task.query }}"
timeout: 120s
on_timeout: fallback_local
fallback:
action: web.search
with:
query: "{{ item.task.query }}"
- name: aggregate_results
uses: a2a.state.set
with:
key: research_results
value: "{{ state.parallel_results }}"
- name: broadcast_completion
uses: a2a.broadcast
with:
message:
type: research_complete
payload:
task_id: "{{ state.task_id }}"
result_count: "{{ state.parallel_results | length }}"
Worker Agent#
name: worker-agent
description: Search worker for research tasks
settings:
a2a:
enabled: true
agent_id: "worker-{{ state.worker_id }}"
namespace: research-team
nodes:
- name: register
uses: a2a.register
with:
capabilities: [search]
agent_type: worker
- name: wait_for_task
uses: a2a.receive
with:
type: search
timeout: 300s
output: task_message
- name: execute_search
uses: web.search
with:
query: "{{ state.task_message.a2a_messages[0].payload.query }}"
output: search_results
- name: send_result
uses: a2a.send
with:
to: coordinator
message:
type: search_result
payload: "{{ state.search_results }}"
correlation_id: "{{ state.task_message.a2a_messages[0].correlation_id }}"
Best Practices#
Use namespaces: Isolate agent groups to prevent message collision
Set appropriate timeouts: Balance responsiveness with reliability
Implement fallbacks: Use
on_timeout: fallback_localfor critical pathsRegister with capabilities: Enable capability-based discovery
Use optimistic locking: Prevent race conditions in shared state
Send heartbeats: Keep agent discovery accurate with periodic heartbeats
See Also#
Agent Actions - Multi-agent coordination primitives
Planning Actions - Task decomposition and parallel execution
Reflection Actions - Self-correcting patterns