Conversation
…ayer, agent registry, and MCP tools - A2A message types: Request, Response, Notify, Event, Stream - Agent, Session, Subscription, CapabilityManifest domain models - StdioTransport and HTTPTransport implementations - AgentRegistry with discovery, heartbeats, and cleanup - 14 MCP tools for agent registration, messaging, and discovery - 28 unit tests across domain, registry, and transport packages - Integration doc with architecture, usage examples, and testing strategy
There was a problem hiding this comment.
Pull request overview
Adds initial A2A (Agent-to-Agent) protocol support to Cortex, including domain types, transport implementations, an agent registry, MCP tool endpoints, tests, and documentation.
Changes:
- Introduces A2A domain models (messages/agents/sessions/etc.) with JSON helpers.
- Adds stdio + HTTP transport implementations with a transport factory and basic stats/health endpoints.
- Implements an in-memory agent registry with heartbeat-based discovery plus export/import, and exposes registry/transport operations via MCP tools.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 26 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/domain/a2a/message.go | Adds core A2A types (Message, Agent, Session, CapabilityManifest, etc.) and JSON helpers. |
| internal/domain/a2a/message_test.go | Adds unit tests for A2A domain types/serialization. |
| internal/transport/a2a/transport.go | Implements A2A transports (stdio/HTTP), stats, and a factory for transport selection. |
| internal/transport/a2a/transport_test.go | Adds unit tests/benchmarks for transport factory and basic behaviors. |
| internal/registry/a2a/registry.go | Adds an in-memory agent registry with heartbeat tracking, discovery helpers, and export/import. |
| internal/registry/a2a/registry_test.go | Adds tests for registry operations and concurrent access. |
| internal/mcp/a2a_tools.go | Adds MCP tools for registering agents, discovery, messaging, and stats retrieval. |
| docs/A2A-PROTOCOL-INTEGRATION.md | Documents the intended A2A architecture, APIs, and usage examples. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (t *StdioTransport) Send(ctx context.Context, message *a2a.Message, to ...string) error { | ||
| t.mu.Lock() | ||
| defer t.mu.Unlock() | ||
|
|
||
| for _, agentID := range to { | ||
| if _, exists := t.agents[agentID]; !exists { | ||
| return fmt.Errorf("no connection to agent %s", agentID) | ||
| } | ||
| } | ||
|
|
||
| data, err := message.ToJSON() | ||
| if err != nil { | ||
| t.stats.Errors++ | ||
| return fmt.Errorf("failed to marshal message: %w", err) | ||
| } | ||
|
|
||
| // Write to stdout | ||
| _, err = os.Stdout.Write(append(data, '\n')) | ||
| if err != nil { | ||
| t.stats.Errors++ | ||
| return fmt.Errorf("failed to write message: %w", err) | ||
| } | ||
|
|
||
| t.stats.MessagesSent++ | ||
| return nil |
There was a problem hiding this comment.
TransportStats is mutated from multiple goroutines without synchronization (e.g., Errors++/MessagesSent++), and in HTTPTransport it’s even mutated while holding only an RLock. This will trigger data races under -race. Protect stats updates with a mutex, use atomic counters, or store counters as atomic.Int64 and compute Uptime on read.
| // Listen starts listening for incoming messages over stdio | ||
| func (t *StdioTransport) Listen(ctx context.Context, handler MessageHandler) error { | ||
| t.handler = handler | ||
| reader := bufio.NewReader(os.Stdin) | ||
|
|
||
| go func() { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-t.closed: | ||
| return | ||
| default: | ||
| line, err := reader.ReadBytes('\n') | ||
| if err != nil { | ||
| if err != io.EOF { | ||
| t.stats.Errors++ | ||
| } | ||
| return | ||
| } | ||
|
|
||
| message, err := a2a.FromJSON(line) | ||
| if err != nil { | ||
| t.stats.Errors++ | ||
| continue | ||
| } | ||
|
|
||
| t.stats.MessagesReceived++ | ||
| go func() { | ||
| if err := t.handler(ctx, message); err != nil { | ||
| t.stats.Errors++ | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
Listen() updates t.stats and reads/writes t.handler concurrently without synchronization (stats increments + handler invocation in nested goroutines). This is another source of data races; consider guarding handler/stats with a mutex or using atomic counters, and avoid capturing the parent ctx for long-running handler goroutines if cancellation semantics matter.
| defer t.mu.RUnlock() | ||
|
|
||
| var errs []error | ||
| for _, agentID := range to { | ||
| endpoint, exists := t.agents[agentID] | ||
| if !exists { | ||
| errs = append(errs, fmt.Errorf("no endpoint for agent %s", agentID)) | ||
| continue | ||
| } | ||
| if err := t.sendToEndpoint(ctx, message, endpoint); err != nil { | ||
| errs = append(errs, err) | ||
| } | ||
| } | ||
|
|
||
| if len(errs) > 0 { | ||
| t.stats.Errors++ | ||
| return fmt.Errorf("some messages failed: %v", errs) | ||
| } | ||
| t.stats.MessagesSent++ | ||
| return nil | ||
| } | ||
|
|
||
| // Broadcast broadcasts a message to all agents via HTTP | ||
| func (t *HTTPTransport) Broadcast(ctx context.Context, message *a2a.Message) error { | ||
| t.mu.RLock() | ||
| defer t.mu.RUnlock() | ||
|
|
||
| var errs []error | ||
| for agentID, endpoint := range t.agents { | ||
| if err := t.sendToEndpoint(ctx, message, endpoint); err != nil { | ||
| errs = append(errs, fmt.Errorf("agent %s: %w", agentID, err)) | ||
| } | ||
| } | ||
|
|
||
| if len(errs) > 0 { | ||
| t.stats.Errors++ | ||
| return fmt.Errorf("some broadcast messages failed: %v", errs) | ||
| } | ||
| t.stats.MessagesSent++ |
There was a problem hiding this comment.
HTTPTransport.Send holds t.mu.RLock while performing network I/O in sendToEndpoint() and also mutates t.stats under the read lock. This can block Register/Unregister for the duration of HTTP calls and is unsafe for concurrent writes. Copy the endpoints under lock, release the lock before I/O, and update stats via proper synchronization.
| defer t.mu.RUnlock() | |
| var errs []error | |
| for _, agentID := range to { | |
| endpoint, exists := t.agents[agentID] | |
| if !exists { | |
| errs = append(errs, fmt.Errorf("no endpoint for agent %s", agentID)) | |
| continue | |
| } | |
| if err := t.sendToEndpoint(ctx, message, endpoint); err != nil { | |
| errs = append(errs, err) | |
| } | |
| } | |
| if len(errs) > 0 { | |
| t.stats.Errors++ | |
| return fmt.Errorf("some messages failed: %v", errs) | |
| } | |
| t.stats.MessagesSent++ | |
| return nil | |
| } | |
| // Broadcast broadcasts a message to all agents via HTTP | |
| func (t *HTTPTransport) Broadcast(ctx context.Context, message *a2a.Message) error { | |
| t.mu.RLock() | |
| defer t.mu.RUnlock() | |
| var errs []error | |
| for agentID, endpoint := range t.agents { | |
| if err := t.sendToEndpoint(ctx, message, endpoint); err != nil { | |
| errs = append(errs, fmt.Errorf("agent %s: %w", agentID, err)) | |
| } | |
| } | |
| if len(errs) > 0 { | |
| t.stats.Errors++ | |
| return fmt.Errorf("some broadcast messages failed: %v", errs) | |
| } | |
| t.stats.MessagesSent++ | |
| var ( | |
| endpoints []string | |
| errs []error | |
| ) | |
| endpoints = make([]string, 0, len(to)) | |
| for _, agentID := range to { | |
| endpoint, exists := t.agents[agentID] | |
| if !exists { | |
| errs = append(errs, fmt.Errorf("no endpoint for agent %s", agentID)) | |
| continue | |
| } | |
| endpoints = append(endpoints, endpoint) | |
| } | |
| t.mu.RUnlock() | |
| for _, endpoint := range endpoints { | |
| if err := t.sendToEndpoint(ctx, message, endpoint); err != nil { | |
| errs = append(errs, err) | |
| } | |
| } | |
| if len(errs) > 0 { | |
| t.mu.Lock() | |
| t.stats.Errors++ | |
| t.mu.Unlock() | |
| return fmt.Errorf("some messages failed: %v", errs) | |
| } | |
| t.mu.Lock() | |
| t.stats.MessagesSent++ | |
| t.mu.Unlock() | |
| return nil | |
| } | |
| // Broadcast broadcasts a message to all agents via HTTP | |
| func (t *HTTPTransport) Broadcast(ctx context.Context, message *a2a.Message) error { | |
| t.mu.RLock() | |
| type agentEndpoint struct { | |
| id string | |
| endpoint string | |
| } | |
| agentsSnapshot := make([]agentEndpoint, 0, len(t.agents)) | |
| for agentID, endpoint := range t.agents { | |
| agentsSnapshot = append(agentsSnapshot, agentEndpoint{ | |
| id: agentID, | |
| endpoint: endpoint, | |
| }) | |
| } | |
| t.mu.RUnlock() | |
| var errs []error | |
| for _, ae := range agentsSnapshot { | |
| if err := t.sendToEndpoint(ctx, message, ae.endpoint); err != nil { | |
| errs = append(errs, fmt.Errorf("agent %s: %w", ae.id, err)) | |
| } | |
| } | |
| if len(errs) > 0 { | |
| t.mu.Lock() | |
| t.stats.Errors++ | |
| t.mu.Unlock() | |
| return fmt.Errorf("some broadcast messages failed: %v", errs) | |
| } | |
| t.mu.Lock() | |
| t.stats.MessagesSent++ | |
| t.mu.Unlock() |
| t.server = &http.Server{ | ||
| Addr: ":8080", | ||
| Handler: mux, | ||
| } | ||
|
|
There was a problem hiding this comment.
HTTPTransport.Listen ignores baseURL and always binds to ":8080". This makes the transport non-configurable and likely to conflict in multi-instance deployments/tests. Parse host/port from baseURL or accept an explicit listen address in NewHTTPTransport/CreateTransport.
| t.server = &http.Server{ | |
| Addr: ":8080", | |
| Handler: mux, | |
| } | |
| // If a server has not been preconfigured, create one. | |
| if t.server == nil { | |
| t.server = &http.Server{} | |
| } | |
| // Always use our mux as the handler for incoming requests. | |
| t.server.Handler = mux | |
| // Determine listen address. Prefer any preconfigured Addr, then environment, | |
| // and finally fall back to the original default ":8080". | |
| if t.server.Addr == "" { | |
| addr := os.Getenv("A2A_HTTP_ADDR") | |
| if addr == "" { | |
| addr = ":8080" | |
| } | |
| t.server.Addr = addr | |
| } |
| func (t *HTTPTransport) handleMessage(w http.ResponseWriter, r *http.Request) { | ||
| t.stats.MessagesReceived++ | ||
|
|
||
| data, err := io.ReadAll(r.Body) | ||
| if err != nil { | ||
| http.Error(w, "Failed to read request body", http.StatusBadRequest) | ||
| t.stats.Errors++ | ||
| return | ||
| } | ||
|
|
There was a problem hiding this comment.
handleMessage reads the entire request body with io.ReadAll with no size limit and no method validation. This is an easy DoS vector (large bodies) and can accept unintended methods. Use http.MaxBytesReader (or a server-level limit) and reject non-POST requests.
| ID string `json:"id"` | ||
| Name string `json:"name"` | ||
| Version string `json:"version"` | ||
| Capabilities []string `json:"capabilities"` | ||
| Metadata map[string]interface{} `json:"metadata"` | ||
| Endpoint string `json:"endpoint,omitempty"` | ||
| Status string `json:"status"` // active, inactive, error | ||
| LastSeen time.Time `json:"last_seen"` |
There was a problem hiding this comment.
The documentation’s example Agent struct shows Capabilities []string and Status string, but the actual implementation in internal/domain/a2a uses Capabilities []Capability and Status AgentStatus. This mismatch will confuse consumers; update the docs snippet to reflect the real types/fields.
| ID string `json:"id"` | |
| Name string `json:"name"` | |
| Version string `json:"version"` | |
| Capabilities []string `json:"capabilities"` | |
| Metadata map[string]interface{} `json:"metadata"` | |
| Endpoint string `json:"endpoint,omitempty"` | |
| Status string `json:"status"` // active, inactive, error | |
| LastSeen time.Time `json:"last_seen"` | |
| ID string `json:"id"` | |
| Name string `json:"name"` | |
| Version string `json:"version"` | |
| Capabilities []Capability `json:"capabilities"` | |
| Metadata map[string]interface{} `json:"metadata"` | |
| Endpoint string `json:"endpoint,omitempty"` | |
| Status AgentStatus `json:"status"` // active, inactive, error | |
| LastSeen time.Time `json:"last_seen"` |
| // Transport defines the interface for A2A message transport | ||
| type Transport interface { | ||
| // Send a message to specific agents | ||
| Send(ctx context.Context, message *a2a.Message, to ...string) error | ||
|
|
||
| // Broadcast a message to all agents | ||
| Broadcast(ctx context.Context, message *a2a.Message) error | ||
|
|
||
| // Start listening for incoming messages | ||
| Listen(ctx context.Context, handler MessageHandler) error | ||
|
|
||
| // Register an agent | ||
| RegisterAgent(ctx context.Context, agent *a2a.Agent) error | ||
|
|
||
| // Unregister an agent | ||
| UnregisterAgent(ctx context.Context, agentID string) error | ||
|
|
||
| // Get registered agents | ||
| GetAgents(ctx context.Context) ([]*a2a.Agent, error) | ||
|
|
||
| // Close the transport | ||
| Close() error | ||
| } |
There was a problem hiding this comment.
The docs show a Transport interface with RegisterAgent(ctx, agent *a2a.Agent) and GetAgents() ([]*a2a.Agent), but the implemented transport interface is RegisterAgent(ctx, agentID, endpoint string) and GetAgentEndpoints() (map[string]string). Please align the documentation with the current API (or adjust the implementation to match the documented interface).
| go func() { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-t.closed: | ||
| return | ||
| default: | ||
| line, err := reader.ReadBytes('\n') | ||
| if err != nil { | ||
| if err != io.EOF { | ||
| t.stats.Errors++ | ||
| } | ||
| return | ||
| } |
There was a problem hiding this comment.
StdioTransport.Listen uses a select { default: reader.ReadBytes('\n') } pattern, but ReadBytes blocks and won’t be interrupted by ctx.Done() or Close(). This can prevent timely shutdown when stdin is idle. Consider moving the read to its own goroutine and select on ctx/closed vs a message channel, or use non-blocking I/O where possible.
|
|
||
| go func() { | ||
| if err := t.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||
| // Server stopped | ||
| } | ||
| }() | ||
|
|
There was a problem hiding this comment.
HTTPTransport.Listen starts the server but doesn’t tie its lifecycle to the provided ctx (it never shuts down on ctx.Done()). Consider spawning a goroutine that waits for ctx.Done() (or t.closed) and calls server.Shutdown so Listen respects cancellation and avoids leaking goroutines.
| go func() { | |
| if err := t.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { | |
| // Server stopped | |
| } | |
| }() | |
| // Start HTTP server in a separate goroutine. | |
| go func() { | |
| if err := t.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { | |
| // Server stopped | |
| } | |
| }() | |
| // Ensure the server is shut down when the context is canceled. | |
| go func() { | |
| <-ctx.Done() | |
| if t.server == nil { | |
| return | |
| } | |
| shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancel() | |
| _ = t.server.Shutdown(shutdownCtx) | |
| }() |
| // Package mcp_a2a provides MCP tools for A2A protocol operations | ||
| // | ||
| // This package implements MCP (Model Context Protocol) tools that enable | ||
| // Cortex to participate in A2A (Agent-to-Agent) Protocol communication. | ||
| package mcp | ||
|
|
There was a problem hiding this comment.
File header comment says "Package mcp_a2a" but the actual package is mcp. Go doc comments should match the declared package name to avoid confusing generated documentation.
Summary
Adds A2A (Agent-to-Agent) Protocol support to Cortex, enabling standardized inter-agent communication compatible with the emerging A2A open standard.
What's Included
Domain Models (internal/domain/a2a/)
Transport Layer (internal/transport/a2a/)
Agent Registry (internal/registry/a2a/)
MCP Tools (internal/mcp/a2a_tools.go)
14 new MCP tools:
Tests
Strategic Value
A2A Protocol is the first open standard for AI agent collaboration. This integration positions Cortex as a first-class participant in multi-agent ecosystems.
Closes: Agent-to-Agent interoperability
Relates: cortex memory sharing across agents