diff --git a/go/core/internal/a2a/a2a_registrar.go b/go/core/internal/a2a/a2a_registrar.go index d21dbec04..5d8d6e821 100644 --- a/go/core/internal/a2a/a2a_registrar.go +++ b/go/core/internal/a2a/a2a_registrar.go @@ -26,10 +26,12 @@ import ( type A2ARegistrar struct { cache crcache.Cache handlerMux A2AHandlerMux + clientRegistry *AgentClientRegistry a2aBaseURL string sandboxA2AURL string authenticator auth.AuthProvider a2aBaseOptions []a2aclient.Option + onAgentChange func(ctx context.Context) } var _ manager.Runnable = (*A2ARegistrar)(nil) @@ -45,11 +47,12 @@ func NewA2ARegistrar( streamingTimeout time.Duration, ) *A2ARegistrar { reg := &A2ARegistrar{ - cache: cache, - handlerMux: mux, - a2aBaseURL: a2aBaseUrl, - sandboxA2AURL: sandboxA2ABaseURL, - authenticator: authenticator, + cache: cache, + handlerMux: mux, + clientRegistry: NewAgentClientRegistry(), + a2aBaseURL: a2aBaseUrl, + sandboxA2AURL: sandboxA2ABaseURL, + authenticator: authenticator, a2aBaseOptions: []a2aclient.Option{ a2aclient.WithTimeout(streamingTimeout), a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf), @@ -60,6 +63,25 @@ func NewA2ARegistrar( return reg } +// ClientRegistry returns the registry of A2A clients for direct agent +// invocation, populated as agents are registered and deregistered. +func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry { + return a.clientRegistry +} + +// SetAgentChangeCallback registers a function called whenever an agent is +// added, updated, or removed. The callback receives the context from the +// informer event so it can propagate cancellation. +func (a *A2ARegistrar) SetAgentChangeCallback(fn func(ctx context.Context)) { + a.onAgentChange = fn +} + +func (a *A2ARegistrar) notifyAgentChange(ctx context.Context) { + if a.onAgentChange != nil { + a.onAgentChange(ctx) + } +} + func (a *A2ARegistrar) NeedLeaderElection() bool { return false } @@ -96,7 +118,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } if err := a.upsertAgentHandler(ctx, agent, log); err != nil { log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent)) + return } + a.notifyAgentChange(ctx) }, UpdateFunc: func(oldObj, newObj any) { oldAgent, ok1 := informerAgentObject(oldObj) @@ -107,7 +131,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al if oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) { if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil { log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent)) + return } + a.notifyAgentChange(ctx) } }, DeleteFunc: func(obj any) { @@ -117,7 +143,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } ref := a2aRouteKey(agent) a.handlerMux.RemoveAgentHandler(ref) + a.clientRegistry.delete(ref) log.V(1).Info("removed A2A handler", "agent", ref) + a.notifyAgentChange(ctx) }, }); err != nil { return fmt.Errorf("failed to add informer event handler for %T: %w", prototype, err) @@ -182,10 +210,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag cardCopy := *card cardCopy.URL = a.a2aRouteURL(agent) - if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil { + routeRef := a2aRouteKey(agent) + if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil { return fmt.Errorf("set handler for %s: %w", agentRef, err) } + a.clientRegistry.set(routeRef, client) + log.V(1).Info("registered/updated A2A handler", "agent", agentRef) return nil } diff --git a/go/core/internal/a2a/agent_client_registry.go b/go/core/internal/a2a/agent_client_registry.go new file mode 100644 index 000000000..1e5b7b6bc --- /dev/null +++ b/go/core/internal/a2a/agent_client_registry.go @@ -0,0 +1,58 @@ +package a2a + +import ( + "context" + "fmt" + "sync" + + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" + "trpc.group/trpc-go/trpc-a2a-go/protocol" +) + +// AgentClientRegistry maps agent route keys to their A2A clients. +// The A2ARegistrar populates it; the MCP handler reads from it to invoke +// agents without an HTTP round trip through the controller's own A2A listener. +type AgentClientRegistry struct { + mu sync.RWMutex + clients map[string]*a2aclient.A2AClient +} + +func NewAgentClientRegistry() *AgentClientRegistry { + return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)} +} + +// set stores the client under the agent's route key (e.g. "namespace/name" or +// "sandboxes/namespace/name"). +func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) { + r.mu.Lock() + defer r.mu.Unlock() + r.clients[agentRef] = c +} + +// delete removes the client for the given agent route key. +func (r *AgentClientRegistry) delete(agentRef string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.clients, agentRef) +} + +// Register adds or replaces the A2A client for the given agent. It is the +// exported counterpart of set, intended for use in tests and explicit +// registrations outside the A2ARegistrar lifecycle. +func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) { + r.set(namespace+"/"+name, c) +} + +// SendMessage invokes an agent directly via its cached A2A client. +// namespace and name must identify a non-sandbox agent; sandbox agents use a +// different route key and are not yet reachable via this method. +func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) { + key := namespace + "/" + name + r.mu.RLock() + c, ok := r.clients[key] + r.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name) + } + return c.SendMessage(ctx, params) +} diff --git a/go/core/internal/mcp/mcp_handler.go b/go/core/internal/mcp/mcp_handler.go index 8182df6fb..ebba2fa10 100644 --- a/go/core/internal/mcp/mcp_handler.go +++ b/go/core/internal/mcp/mcp_handler.go @@ -2,34 +2,28 @@ package mcp import ( "context" + "encoding/json" "fmt" "net/http" "strings" - "sync" - "time" "github.com/kagent-dev/kagent/go/api/v1alpha2" "github.com/kagent-dev/kagent/go/core/internal/a2a" - authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth" "github.com/kagent-dev/kagent/go/core/internal/version" "github.com/kagent-dev/kagent/go/core/pkg/auth" mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" - a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" "trpc.group/trpc-go/trpc-a2a-go/protocol" ) // MCPHandler handles MCP requests and bridges them to A2A endpoints type MCPHandler struct { kubeClient client.Client - a2aBaseURL string - a2aTimeout time.Duration + agentClients *a2a.AgentClientRegistry authenticator auth.AuthProvider httpHandler *mcpsdk.StreamableHTTPHandler server *mcpsdk.Server - a2aClients sync.Map } // Input types for MCP tools @@ -45,7 +39,7 @@ type AgentSummary struct { } type InvokeAgentInput struct { - Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name"` + Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name. To find a list of available sources, use the 'agents' resource."` Task string `json:"task" jsonschema:"Task to run"` ContextID string `json:"context_id,omitempty" jsonschema:"Optional A2A context ID to continue a conversation"` } @@ -56,20 +50,12 @@ type InvokeAgentOutput struct { ContextID string `json:"context_id,omitempty"` } -// defaultA2ATimeout is the fallback timeout for A2A client calls and should match -// the configured default streaming timeout. -const defaultA2ATimeout = 10 * time.Minute - -// NewMCPHandler creates a new MCP handler -// Wraps the StreamableHTTPHandler and adds A2A bridging and context management. -func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) { - if a2aTimeout <= 0 { - a2aTimeout = defaultA2ATimeout - } +// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly +// to agent A2A clients, bypassing the controller's own HTTP A2A listener. +func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) { handler := &MCPHandler{ kubeClient: kubeClient, - a2aBaseURL: a2aBaseURL, - a2aTimeout: a2aTimeout, + agentClients: agentClients, authenticator: authenticator, } @@ -78,7 +64,12 @@ func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator au Name: "kagent-agents", Version: version.Version, } - server := mcpsdk.NewServer(impl, nil) + server := mcpsdk.NewServer(impl, &mcpsdk.ServerOptions{ + // No-op handlers enable subscription tracking in the SDK; actual + // notifications are sent via NotifyAgentsChanged. + SubscribeHandler: func(context.Context, *mcpsdk.SubscribeRequest) error { return nil }, + UnsubscribeHandler: func(context.Context, *mcpsdk.UnsubscribeRequest) error { return nil }, + }) handler.server = server // Add list_agents tool @@ -101,6 +92,17 @@ func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator au handler.handleInvokeAgent, ) + // Add agents resource for clients that pre-populate context + server.AddResource( + &mcpsdk.Resource{ + URI: "kagent://agents", + Name: "agents", + Description: "List of invokable kagent agents (accepted + deploymentReady)", + MIMEType: "application/json", + }, + handler.readAgentsResource, + ) + // Create HTTP handler handler.httpHandler = mcpsdk.NewStreamableHTTPHandler( func(*http.Request) *mcpsdk.Server { @@ -112,23 +114,14 @@ func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator au return handler, nil } -// handleListAgents handles the list_agents MCP tool -func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { - log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") - +// listReadyAgents returns agents that are accepted and deployment-ready. +func (h *MCPHandler) listReadyAgents(ctx context.Context) ([]AgentSummary, error) { agentList := &v1alpha2.AgentList{} if err := h.kubeClient.List(ctx, agentList); err != nil { - return &mcpsdk.CallToolResult{ - Content: []mcpsdk.Content{ - &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, - }, - IsError: true, - }, ListAgentsOutput{}, nil + return nil, err } - - agents := make([]AgentSummary, 0) + agents := make([]AgentSummary, 0, len(agentList.Items)) for _, agent := range agentList.Items { - // Check if agent is accepted and deployment ready deploymentReady := false accepted := false for _, condition := range agent.Status.Conditions { @@ -139,18 +132,30 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR accepted = true } } - if !accepted || !deploymentReady { continue } - - ref := agent.Namespace + "/" + agent.Name - description := agent.Spec.Description agents = append(agents, AgentSummary{ - Ref: ref, - Description: description, + Ref: agent.Namespace + "/" + agent.Name, + Description: agent.Spec.Description, }) } + return agents, nil +} + +// handleListAgents handles the list_agents MCP tool +func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") + + agents, err := h.listReadyAgents(ctx) + if err != nil { + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, + }, + IsError: true, + }, ListAgentsOutput{}, nil + } log.Info("Listed agents", "count", len(agents)) @@ -179,10 +184,50 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR }, output, nil } +// readAgentsResource handles reads of the kagent://agents resource. +func (h *MCPHandler) readAgentsResource(ctx context.Context, req *mcpsdk.ReadResourceRequest) (*mcpsdk.ReadResourceResult, error) { + agents, err := h.listReadyAgents(ctx) + if err != nil { + return nil, fmt.Errorf("listing agents: %w", err) + } + data, err := json.Marshal(agents) + if err != nil { + return nil, fmt.Errorf("marshaling agents: %w", err) + } + return &mcpsdk.ReadResourceResult{ + Contents: []*mcpsdk.ResourceContents{{ + URI: "kagent://agents", + MIMEType: "application/json", + Text: string(data), + }}, + }, nil +} + +// NotifyAgentsChanged sends a resources/updated notification for kagent://agents +// to all subscribed clients. Called by A2ARegistrar when agents are added, updated, +// or removed. +func (h *MCPHandler) NotifyAgentsChanged(ctx context.Context) { + if err := h.server.ResourceUpdated(ctx, &mcpsdk.ResourceUpdatedNotificationParams{ + URI: "kagent://agents", + }); err != nil { + ctrllog.FromContext(ctx).WithName("mcp-handler").Error(err, "failed to send resource updated notification") + } +} + // handleInvokeAgent handles the invoke_agent MCP tool func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") + // The Go MCP SDK detaches the HTTP request context when dispatching to + // tool handlers, so auth.AuthSessionFrom(ctx) returns nothing. Recover + // the auth session from the HTTP headers preserved in RequestExtra so + // that the A2A client's outbound request to the agent carries the user's JWT. + if extra := req.GetExtra(); extra != nil { + if session, err := h.authenticator.Authenticate(ctx, extra.Header, nil); err == nil { + ctx = auth.AuthSessionTo(ctx, session) + } + } + // Parse agent reference (namespace/name or just name) agentNS, agentName, ok := strings.Cut(input.Agent, "/") if !ok { @@ -194,7 +239,6 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool }, InvokeAgentOutput{}, nil } agentRef := agentNS + "/" + agentName - agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName} // Get context ID from client request (stateless mode) // If not provided, contextIDPtr will be nil and a new conversation will start @@ -204,47 +248,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool log.V(1).Info("Using context_id from client request", "context_id", input.ContextID) } - // Get or create cached A2A client for this agent - a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef) - var a2aClient *a2aclient.A2AClient - - if cached, ok := h.a2aClients.Load(agentRef); ok { - if client, ok := cached.(*a2aclient.A2AClient); ok { - a2aClient = client - } - } - - // Create new client if not cached - if a2aClient == nil { - // Build A2A client options with authentication propagation - a2aOpts := []a2aclient.Option{ - a2aclient.WithTimeout(h.a2aTimeout), - a2aclient.WithHTTPReqHandler( - authimpl.A2ARequestHandler( - h.authenticator, - agentNns, - ), - ), - } - - newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...) - if err != nil { - log.Error(err, "Failed to create A2A client", "agent", agentRef) - return &mcpsdk.CallToolResult{ - Content: []mcpsdk.Content{ - &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)}, - }, - IsError: true, - }, InvokeAgentOutput{}, nil - } - - // Cache the client - h.a2aClients.Store(agentRef, newClient) - a2aClient = newClient - } - - // Send message via A2A - result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{ + // Send message directly via the agent's A2A client, bypassing the + // controller's own HTTP A2A listener. + result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{ Message: protocol.Message{ Kind: protocol.KindMessage, Role: protocol.MessageRoleUser, diff --git a/go/core/internal/mcp/mcp_handler_test.go b/go/core/internal/mcp/mcp_handler_test.go new file mode 100644 index 000000000..5369769d5 --- /dev/null +++ b/go/core/internal/mcp/mcp_handler_test.go @@ -0,0 +1,188 @@ +package mcp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "testing" + + "github.com/kagent-dev/kagent/go/core/internal/a2a" + authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth" + "github.com/kagent-dev/kagent/go/core/pkg/auth" + mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" +) + +// fakeSession is a minimal auth.Session for testing. +type fakeSession struct{ principal auth.Principal } + +func (s *fakeSession) Principal() auth.Principal { return s.principal } + +// fakeAuthProvider propagates the incoming Bearer token to upstream requests unchanged. +type fakeAuthProvider struct { + session auth.Session +} + +func (f *fakeAuthProvider) Authenticate(_ context.Context, headers http.Header, _ url.Values) (auth.Session, error) { + if headers.Get("Authorization") != "" { + return f.session, nil + } + return nil, http.ErrNoCookie +} + +func (f *fakeAuthProvider) UpstreamAuth(r *http.Request, _ auth.Session, _ auth.Principal) error { + r.Header.Set("Authorization", "Bearer upstream-token") + return nil +} + +// a2aBackend is a fake A2A server that records the Authorization header of each request. +type a2aBackend struct { + server *httptest.Server + mu sync.Mutex + lastAuthHeader string +} + +func (b *a2aBackend) getLastAuthHeader() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.lastAuthHeader +} + +func newA2ABackend(t *testing.T) *a2aBackend { + t.Helper() + b := &a2aBackend{} + b.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b.mu.Lock() + b.lastAuthHeader = r.Header.Get("Authorization") + b.mu.Unlock() + resp := map[string]any{ + "jsonrpc": "2.0", + "id": "", + "result": map[string]any{ + "kind": "message", + "messageId": "test-msg", + "role": "agent", + "parts": []any{map[string]any{"kind": "text", "text": "hello from agent"}}, + }, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + t.Errorf("failed to encode fake A2A response: %v", err) + } + })) + t.Cleanup(b.server.Close) + return b +} + +// authRoundTripper injects a fixed Authorization header into every outgoing request. +type authRoundTripper struct { + base http.RoundTripper + token string +} + +func (a *authRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r = r.Clone(r.Context()) + r.Header.Set("Authorization", "Bearer "+a.token) + return a.base.RoundTrip(r) +} + +// newTestRegistry builds an AgentClientRegistry with a single agent pre-registered, +// wired to send A2A requests to backendURL and propagate auth via authProvider. +func newTestRegistry(t *testing.T, namespace, name string, backendURL string, authProvider auth.AuthProvider) *a2a.AgentClientRegistry { + t.Helper() + agentRef := types.NamespacedName{Namespace: namespace, Name: name} + c, err := a2aclient.NewA2AClient( + backendURL+"/"+namespace+"/"+name+"/", + a2aclient.WithHTTPReqHandler(authimpl.A2ARequestHandler(authProvider, agentRef)), + ) + require.NoError(t, err) + + registry := a2a.NewAgentClientRegistry() + registry.Register(namespace, name, c) + return registry +} + +// TestInvokeAgent_AuthPropagation exercises the full MCP HTTP stack: +// the MCP client sends a request with an Authorization header, the handler +// recovers the auth session from RequestExtra, and the A2A backend receives +// the token produced by UpstreamAuth. +func TestInvokeAgent_AuthPropagation(t *testing.T) { + backend := newA2ABackend(t) + authProvider := &fakeAuthProvider{session: &fakeSession{}} + + registry := newTestRegistry(t, "default", "test-agent", backend.server.URL, authProvider) + mcpHandler, err := NewMCPHandler(nil, registry, authProvider) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + HTTPClient: &http.Client{ + Transport: &authRoundTripper{base: http.DefaultTransport, token: "test-token"}, + }, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{ + "agent": "default/test-agent", + "task": "say hello", + }, + }) + require.NoError(t, err) + assert.False(t, result.IsError, "expected successful tool result, got: %v", result.Content) + assert.Equal(t, "Bearer upstream-token", backend.getLastAuthHeader(), "A2A backend should receive the token produced by UpstreamAuth") +} + +// TestInvokeAgent_NoAuthPropagationWithoutHeader verifies that when the MCP +// client sends no Authorization header, no Authorization header is +// propagated to the A2A backend. +func TestInvokeAgent_NoAuthPropagationWithoutHeader(t *testing.T) { + backend := newA2ABackend(t) + authProvider := &fakeAuthProvider{session: &fakeSession{}} + + registry := newTestRegistry(t, "default", "test-agent", backend.server.URL, authProvider) + mcpHandler, err := NewMCPHandler(nil, registry, authProvider) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + // No custom transport — requests carry no Authorization header. + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{ + "agent": "default/test-agent", + "task": "say hello", + }, + }) + require.NoError(t, err) + assert.False(t, result.IsError) + assert.Empty(t, backend.getLastAuthHeader(), "A2A backend should receive no Authorization header when the client sends none") +} diff --git a/go/core/pkg/app/app.go b/go/core/pkg/app/app.go index d47ab55ad..05cd1b916 100644 --- a/go/core/pkg/app/app.go +++ b/go/core/pkg/app/app.go @@ -613,8 +613,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne // Register A2A handlers on all replicas a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, httpserver.APIPathA2ASandboxes, extensionCfg.Authenticator) - - if err := mgr.Add(a2a.NewA2ARegistrar( + a2aRegistrar := a2a.NewA2ARegistrar( mgr.GetCache(), a2aHandler, cfg.A2ABaseUrl+httpserver.APIPathA2A, @@ -623,22 +622,24 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne int(cfg.Streaming.MaxBufSize.Value()), int(cfg.Streaming.InitialBufSize.Value()), cfg.Streaming.Timeout, - )); err != nil { + ) + if err := mgr.Add(a2aRegistrar); err != nil { setupLog.Error(err, "unable to set up a2a registrar") os.Exit(1) } - // Create MCP handler that bridges to A2A + // Create MCP handler that invokes agents directly via their A2A clients, + // bypassing the controller's own HTTP A2A listener. mcpHandler, err := mcp.NewMCPHandler( mgr.GetClient(), - cfg.A2ABaseUrl+httpserver.APIPathA2A, + a2aRegistrar.ClientRegistry(), extensionCfg.Authenticator, - cfg.Streaming.Timeout, ) if err != nil { setupLog.Error(err, "unable to create MCP handler") os.Exit(1) } + a2aRegistrar.SetAgentChangeCallback(mcpHandler.NotifyAgentsChanged) // +kubebuilder:scaffold:builder if metricsCertWatcher != nil {