Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions go/core/internal/a2a/a2a_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
type A2ARegistrar struct {
cache crcache.Cache
handlerMux A2AHandlerMux
clientRegistry *AgentClientRegistry
a2aBaseURL string
sandboxA2AURL string
authenticator auth.AuthProvider
a2aBaseOptions []a2aclient.Option
onAgentChange func(ctx context.Context)
}

var _ manager.Runnable = (*A2ARegistrar)(nil)
Expand All @@ -45,11 +47,12 @@ func NewA2ARegistrar(
streamingTimeout time.Duration,
) *A2ARegistrar {
reg := &A2ARegistrar{
cache: cache,
handlerMux: mux,
a2aBaseURL: a2aBaseUrl,
sandboxA2AURL: sandboxA2ABaseURL,
authenticator: authenticator,
cache: cache,
handlerMux: mux,
clientRegistry: NewAgentClientRegistry(),
a2aBaseURL: a2aBaseUrl,
sandboxA2AURL: sandboxA2ABaseURL,
authenticator: authenticator,
a2aBaseOptions: []a2aclient.Option{
a2aclient.WithTimeout(streamingTimeout),
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
Expand All @@ -60,6 +63,25 @@ func NewA2ARegistrar(
return reg
}

// ClientRegistry returns the registry of A2A clients for direct agent
// invocation, populated as agents are registered and deregistered.
func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry {
return a.clientRegistry
}

// SetAgentChangeCallback registers a function called whenever an agent is
// added, updated, or removed. The callback receives the context from the
// informer event so it can propagate cancellation.
func (a *A2ARegistrar) SetAgentChangeCallback(fn func(ctx context.Context)) {
a.onAgentChange = fn
}

func (a *A2ARegistrar) notifyAgentChange(ctx context.Context) {
if a.onAgentChange != nil {
a.onAgentChange(ctx)
}
}

func (a *A2ARegistrar) NeedLeaderElection() bool {
return false
}
Expand Down Expand Up @@ -96,7 +118,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
}
if err := a.upsertAgentHandler(ctx, agent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent))
return
}
a.notifyAgentChange(ctx)
},
UpdateFunc: func(oldObj, newObj any) {
oldAgent, ok1 := informerAgentObject(oldObj)
Expand All @@ -107,7 +131,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
if oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) {
if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent))
return
}
a.notifyAgentChange(ctx)
Comment on lines 131 to +136
}
},
DeleteFunc: func(obj any) {
Expand All @@ -117,7 +143,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
}
ref := a2aRouteKey(agent)
a.handlerMux.RemoveAgentHandler(ref)
a.clientRegistry.delete(ref)
log.V(1).Info("removed A2A handler", "agent", ref)
a.notifyAgentChange(ctx)
},
}); err != nil {
return fmt.Errorf("failed to add informer event handler for %T: %w", prototype, err)
Expand Down Expand Up @@ -182,10 +210,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
cardCopy := *card
cardCopy.URL = a.a2aRouteURL(agent)

if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
routeRef := a2aRouteKey(agent)
if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
return fmt.Errorf("set handler for %s: %w", agentRef, err)
}

a.clientRegistry.set(routeRef, client)

log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
return nil
}
Expand Down
58 changes: 58 additions & 0 deletions go/core/internal/a2a/agent_client_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package a2a

import (
"context"
"fmt"
"sync"

a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
"trpc.group/trpc-go/trpc-a2a-go/protocol"
)

// AgentClientRegistry maps agent route keys to their A2A clients.
// The A2ARegistrar populates it; the MCP handler reads from it to invoke
// agents without an HTTP round trip through the controller's own A2A listener.
type AgentClientRegistry struct {
mu sync.RWMutex
clients map[string]*a2aclient.A2AClient
}

func NewAgentClientRegistry() *AgentClientRegistry {
return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)}
}

// set stores the client under the agent's route key (e.g. "namespace/name" or
// "sandboxes/namespace/name").
func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) {
r.mu.Lock()
defer r.mu.Unlock()
r.clients[agentRef] = c
}

// delete removes the client for the given agent route key.
func (r *AgentClientRegistry) delete(agentRef string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.clients, agentRef)
}

// Register adds or replaces the A2A client for the given agent. It is the
// exported counterpart of set, intended for use in tests and explicit
// registrations outside the A2ARegistrar lifecycle.
func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) {
r.set(namespace+"/"+name, c)
}

// SendMessage invokes an agent directly via its cached A2A client.
// namespace and name must identify a non-sandbox agent; sandbox agents use a
// different route key and are not yet reachable via this method.
func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) {
key := namespace + "/" + name
r.mu.RLock()
c, ok := r.clients[key]
r.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name)
}
return c.SendMessage(ctx, params)
}
Loading
Loading