From 164d194c47b43edeab4a55166bcb06c3ca07d2a3 Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Tue, 12 May 2026 15:41:08 +0200 Subject: [PATCH 1/2] fix(controller): recover MCP auth session from `RequestExtra` in tool handlers The Go MCP SDK detaches the HTTP request context before dispatching to tool handlers. From the [SDK source](https://github.com/modelcontextprotocol/go-sdk/blob/v1.5.0/mcp/streamable.go#L485-L487): > // Pass req.Context() here, to allow middleware to add context values. > // The context is detached in the jsonrpc2 library when handling the > // long-running stream. This means the auth session placed by `AuthnMiddleware` is not visible via `auth.AuthSessionFrom(ctx)` in tool handlers. The SDK does preserve the original HTTP headers in [RequestExtra.Header](https://github.com/modelcontextprotocol/go-sdk/blob/v1.5.0/mcp/streamable.go#L1155-L1158) though. Re-authenticate from those headers at the top of handleInvokeAgent so the A2A client's outbound request to the agent carries the user's JWT. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- go/core/internal/mcp/mcp_handler.go | 10 ++ go/core/internal/mcp/mcp_handler_test.go | 175 +++++++++++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 go/core/internal/mcp/mcp_handler_test.go diff --git a/go/core/internal/mcp/mcp_handler.go b/go/core/internal/mcp/mcp_handler.go index 8182df6fb..6b157a3e7 100644 --- a/go/core/internal/mcp/mcp_handler.go +++ b/go/core/internal/mcp/mcp_handler.go @@ -183,6 +183,16 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") + // The Go MCP SDK detaches the HTTP request context when dispatching to + // tool handlers, so auth.AuthSessionFrom(ctx) returns nothing. Recover + // the auth session from the HTTP headers preserved in RequestExtra so + // that the A2A client's outbound request to the agent carries the user's JWT. + if extra := req.GetExtra(); extra != nil { + if session, err := h.authenticator.Authenticate(ctx, extra.Header, nil); err == nil { + ctx = auth.AuthSessionTo(ctx, session) + } + } + // Parse agent reference (namespace/name or just name) agentNS, agentName, ok := strings.Cut(input.Agent, "/") if !ok { diff --git a/go/core/internal/mcp/mcp_handler_test.go b/go/core/internal/mcp/mcp_handler_test.go new file mode 100644 index 000000000..a01561bab --- /dev/null +++ b/go/core/internal/mcp/mcp_handler_test.go @@ -0,0 +1,175 @@ +package mcp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "testing" + "time" + + "github.com/kagent-dev/kagent/go/core/pkg/auth" + mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakeSession is a minimal auth.Session for testing. +type fakeSession struct{ principal auth.Principal } + +func (s *fakeSession) Principal() auth.Principal { return s.principal } + +// fakeAuthProvider propagates the incoming Bearer token to upstream requests unchanged. +type fakeAuthProvider struct { + session auth.Session +} + +func (f *fakeAuthProvider) Authenticate(_ context.Context, headers http.Header, _ url.Values) (auth.Session, error) { + if headers.Get("Authorization") != "" { + return f.session, nil + } + return nil, http.ErrNoCookie +} + +func (f *fakeAuthProvider) UpstreamAuth(r *http.Request, _ auth.Session, _ auth.Principal) error { + r.Header.Set("Authorization", "Bearer upstream-token") + return nil +} + +// a2aBackend is a fake A2A server that records the Authorization header of each request. +type a2aBackend struct { + server *httptest.Server + mu sync.Mutex + lastAuthHeader string +} + +func (b *a2aBackend) getLastAuthHeader() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.lastAuthHeader +} + +func newA2ABackend(t *testing.T) *a2aBackend { + t.Helper() + b := &a2aBackend{} + b.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b.mu.Lock() + b.lastAuthHeader = r.Header.Get("Authorization") + b.mu.Unlock() + resp := map[string]any{ + "jsonrpc": "2.0", + "id": "", + "result": map[string]any{ + "kind": "message", + "messageId": "test-msg", + "role": "agent", + "parts": []any{map[string]any{"kind": "text", "text": "hello from agent"}}, + }, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + t.Errorf("failed to encode fake A2A response: %v", err) + } + })) + t.Cleanup(b.server.Close) + return b +} + +// authRoundTripper injects a fixed Authorization header into every outgoing request. +type authRoundTripper struct { + base http.RoundTripper + token string +} + +func (a *authRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r = r.Clone(r.Context()) + r.Header.Set("Authorization", "Bearer "+a.token) + return a.base.RoundTrip(r) +} + +// TestInvokeAgent_AuthPropagation exercises the full MCP HTTP stack: +// the MCP client sends a request with an Authorization header, the handler +// recovers the auth session from RequestExtra, and the A2A backend receives +// the token produced by UpstreamAuth. +func TestInvokeAgent_AuthPropagation(t *testing.T) { + // Fake A2A backend — records the Authorization header it receives. + backend := newA2ABackend(t) + + authProvider := &fakeAuthProvider{session: &fakeSession{}} + + // Real MCP handler (kubeClient is nil; invoke_agent does not use it). + mcpHandler, err := NewMCPHandler(nil, backend.server.URL, authProvider, 5*time.Second) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + // MCP client whose HTTP transport injects an Authorization header on every request. + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + HTTPClient: &http.Client{ + Transport: &authRoundTripper{ + base: http.DefaultTransport, + token: "test-token", + }, + }, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{ + "agent": "default/test-agent", + "task": "say hello", + }, + }) + require.NoError(t, err) + assert.False(t, result.IsError, "expected successful tool result, got: %v", result.Content) + assert.Equal(t, "Bearer upstream-token", backend.getLastAuthHeader(), "A2A backend should receive the token produced by UpstreamAuth") +} + +// TestInvokeAgent_NoAuthPropagationWithoutHeader verifies that when the MCP +// client sends no Authorization header, no Authorization header is +// propagated to the A2A backend. +func TestInvokeAgent_NoAuthPropagationWithoutHeader(t *testing.T) { + backend := newA2ABackend(t) + + authProvider := &fakeAuthProvider{session: &fakeSession{}} + + mcpHandler, err := NewMCPHandler(nil, backend.server.URL, authProvider, 5*time.Second) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + // No custom transport — requests carry no Authorization header. + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{ + "agent": "default/test-agent", + "task": "say hello", + }, + }) + require.NoError(t, err) + assert.False(t, result.IsError) + assert.Empty(t, backend.getLastAuthHeader(), "A2A backend should receive no Authorization header when the client sends none") +} From 496c320fa2a7e17bea0231873a513e8b9934484f Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Tue, 12 May 2026 16:50:31 +0200 Subject: [PATCH 2/2] refactor(controller): invoke agents directly in MCP handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the HTTP round-trip through the controller's own A2A listener with direct invocation via a new `AgentClientRegistry`. The registry is owned by `A2ARegistrar`, which already maintains an `A2AClient` per agent for its HTTP mux — the registry gives the MCP handler access to those same clients without an extra network hop. The old approach routed through the controller's public A2A endpoint, meaning requests could traverse the external network (and any ingress or load-balancer in front of it) unnecessarily. The new path stays in-process. The old handler also cached its own `A2AClient` per agent in a `sync.Map` with no eviction, so clients for deleted agents would remain indefinitely. The registry is kept consistent by the registrar's add/update/delete lifecycle, eliminating that staleness. `A2ARegistrar.upsertAgentHandler` writes to both the HTTP mux (for inbound /api/a2a/// routing) and the registry (for direct invocation). The registry is exposed via `ClientRegistry()` and passed to `NewMCPHandler` in app.go. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- go/core/internal/a2a/a2a_registrar.go | 24 +++++-- go/core/internal/a2a/agent_client_registry.go | 58 +++++++++++++++ go/core/internal/mcp/mcp_handler.go | 70 +++---------------- go/core/internal/mcp/mcp_handler_test.go | 37 ++++++---- go/core/pkg/app/app.go | 12 ++-- 5 files changed, 115 insertions(+), 86 deletions(-) create mode 100644 go/core/internal/a2a/agent_client_registry.go diff --git a/go/core/internal/a2a/a2a_registrar.go b/go/core/internal/a2a/a2a_registrar.go index d21dbec04..f3d96e9fd 100644 --- a/go/core/internal/a2a/a2a_registrar.go +++ b/go/core/internal/a2a/a2a_registrar.go @@ -26,6 +26,7 @@ import ( type A2ARegistrar struct { cache crcache.Cache handlerMux A2AHandlerMux + clientRegistry *AgentClientRegistry a2aBaseURL string sandboxA2AURL string authenticator auth.AuthProvider @@ -45,11 +46,12 @@ func NewA2ARegistrar( streamingTimeout time.Duration, ) *A2ARegistrar { reg := &A2ARegistrar{ - cache: cache, - handlerMux: mux, - a2aBaseURL: a2aBaseUrl, - sandboxA2AURL: sandboxA2ABaseURL, - authenticator: authenticator, + cache: cache, + handlerMux: mux, + clientRegistry: NewAgentClientRegistry(), + a2aBaseURL: a2aBaseUrl, + sandboxA2AURL: sandboxA2ABaseURL, + authenticator: authenticator, a2aBaseOptions: []a2aclient.Option{ a2aclient.WithTimeout(streamingTimeout), a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf), @@ -60,6 +62,12 @@ func NewA2ARegistrar( return reg } +// ClientRegistry returns the registry of A2A clients for direct agent +// invocation, populated as agents are registered and deregistered. +func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry { + return a.clientRegistry +} + func (a *A2ARegistrar) NeedLeaderElection() bool { return false } @@ -117,6 +125,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } ref := a2aRouteKey(agent) a.handlerMux.RemoveAgentHandler(ref) + a.clientRegistry.delete(ref) log.V(1).Info("removed A2A handler", "agent", ref) }, }); err != nil { @@ -182,10 +191,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag cardCopy := *card cardCopy.URL = a.a2aRouteURL(agent) - if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil { + routeRef := a2aRouteKey(agent) + if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil { return fmt.Errorf("set handler for %s: %w", agentRef, err) } + a.clientRegistry.set(routeRef, client) + log.V(1).Info("registered/updated A2A handler", "agent", agentRef) return nil } diff --git a/go/core/internal/a2a/agent_client_registry.go b/go/core/internal/a2a/agent_client_registry.go new file mode 100644 index 000000000..1e5b7b6bc --- /dev/null +++ b/go/core/internal/a2a/agent_client_registry.go @@ -0,0 +1,58 @@ +package a2a + +import ( + "context" + "fmt" + "sync" + + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" + "trpc.group/trpc-go/trpc-a2a-go/protocol" +) + +// AgentClientRegistry maps agent route keys to their A2A clients. +// The A2ARegistrar populates it; the MCP handler reads from it to invoke +// agents without an HTTP round trip through the controller's own A2A listener. +type AgentClientRegistry struct { + mu sync.RWMutex + clients map[string]*a2aclient.A2AClient +} + +func NewAgentClientRegistry() *AgentClientRegistry { + return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)} +} + +// set stores the client under the agent's route key (e.g. "namespace/name" or +// "sandboxes/namespace/name"). +func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) { + r.mu.Lock() + defer r.mu.Unlock() + r.clients[agentRef] = c +} + +// delete removes the client for the given agent route key. +func (r *AgentClientRegistry) delete(agentRef string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.clients, agentRef) +} + +// Register adds or replaces the A2A client for the given agent. It is the +// exported counterpart of set, intended for use in tests and explicit +// registrations outside the A2ARegistrar lifecycle. +func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) { + r.set(namespace+"/"+name, c) +} + +// SendMessage invokes an agent directly via its cached A2A client. +// namespace and name must identify a non-sandbox agent; sandbox agents use a +// different route key and are not yet reachable via this method. +func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) { + key := namespace + "/" + name + r.mu.RLock() + c, ok := r.clients[key] + r.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name) + } + return c.SendMessage(ctx, params) +} diff --git a/go/core/internal/mcp/mcp_handler.go b/go/core/internal/mcp/mcp_handler.go index 6b157a3e7..8e2d76fa2 100644 --- a/go/core/internal/mcp/mcp_handler.go +++ b/go/core/internal/mcp/mcp_handler.go @@ -5,31 +5,24 @@ import ( "fmt" "net/http" "strings" - "sync" - "time" "github.com/kagent-dev/kagent/go/api/v1alpha2" "github.com/kagent-dev/kagent/go/core/internal/a2a" - authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth" "github.com/kagent-dev/kagent/go/core/internal/version" "github.com/kagent-dev/kagent/go/core/pkg/auth" mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" - a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" "trpc.group/trpc-go/trpc-a2a-go/protocol" ) // MCPHandler handles MCP requests and bridges them to A2A endpoints type MCPHandler struct { kubeClient client.Client - a2aBaseURL string - a2aTimeout time.Duration + agentClients *a2a.AgentClientRegistry authenticator auth.AuthProvider httpHandler *mcpsdk.StreamableHTTPHandler server *mcpsdk.Server - a2aClients sync.Map } // Input types for MCP tools @@ -56,20 +49,12 @@ type InvokeAgentOutput struct { ContextID string `json:"context_id,omitempty"` } -// defaultA2ATimeout is the fallback timeout for A2A client calls and should match -// the configured default streaming timeout. -const defaultA2ATimeout = 10 * time.Minute - -// NewMCPHandler creates a new MCP handler -// Wraps the StreamableHTTPHandler and adds A2A bridging and context management. -func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) { - if a2aTimeout <= 0 { - a2aTimeout = defaultA2ATimeout - } +// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly +// to agent A2A clients, bypassing the controller's own HTTP A2A listener. +func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) { handler := &MCPHandler{ kubeClient: kubeClient, - a2aBaseURL: a2aBaseURL, - a2aTimeout: a2aTimeout, + agentClients: agentClients, authenticator: authenticator, } @@ -204,7 +189,6 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool }, InvokeAgentOutput{}, nil } agentRef := agentNS + "/" + agentName - agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName} // Get context ID from client request (stateless mode) // If not provided, contextIDPtr will be nil and a new conversation will start @@ -214,47 +198,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool log.V(1).Info("Using context_id from client request", "context_id", input.ContextID) } - // Get or create cached A2A client for this agent - a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef) - var a2aClient *a2aclient.A2AClient - - if cached, ok := h.a2aClients.Load(agentRef); ok { - if client, ok := cached.(*a2aclient.A2AClient); ok { - a2aClient = client - } - } - - // Create new client if not cached - if a2aClient == nil { - // Build A2A client options with authentication propagation - a2aOpts := []a2aclient.Option{ - a2aclient.WithTimeout(h.a2aTimeout), - a2aclient.WithHTTPReqHandler( - authimpl.A2ARequestHandler( - h.authenticator, - agentNns, - ), - ), - } - - newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...) - if err != nil { - log.Error(err, "Failed to create A2A client", "agent", agentRef) - return &mcpsdk.CallToolResult{ - Content: []mcpsdk.Content{ - &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)}, - }, - IsError: true, - }, InvokeAgentOutput{}, nil - } - - // Cache the client - h.a2aClients.Store(agentRef, newClient) - a2aClient = newClient - } - - // Send message via A2A - result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{ + // Send message directly via the agent's A2A client, bypassing the + // controller's own HTTP A2A listener. + result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{ Message: protocol.Message{ Kind: protocol.KindMessage, Role: protocol.MessageRoleUser, diff --git a/go/core/internal/mcp/mcp_handler_test.go b/go/core/internal/mcp/mcp_handler_test.go index a01561bab..5369769d5 100644 --- a/go/core/internal/mcp/mcp_handler_test.go +++ b/go/core/internal/mcp/mcp_handler_test.go @@ -8,12 +8,15 @@ import ( "net/url" "sync" "testing" - "time" + "github.com/kagent-dev/kagent/go/core/internal/a2a" + authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth" "github.com/kagent-dev/kagent/go/core/pkg/auth" mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" ) // fakeSession is a minimal auth.Session for testing. @@ -89,31 +92,41 @@ func (a *authRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return a.base.RoundTrip(r) } +// newTestRegistry builds an AgentClientRegistry with a single agent pre-registered, +// wired to send A2A requests to backendURL and propagate auth via authProvider. +func newTestRegistry(t *testing.T, namespace, name string, backendURL string, authProvider auth.AuthProvider) *a2a.AgentClientRegistry { + t.Helper() + agentRef := types.NamespacedName{Namespace: namespace, Name: name} + c, err := a2aclient.NewA2AClient( + backendURL+"/"+namespace+"/"+name+"/", + a2aclient.WithHTTPReqHandler(authimpl.A2ARequestHandler(authProvider, agentRef)), + ) + require.NoError(t, err) + + registry := a2a.NewAgentClientRegistry() + registry.Register(namespace, name, c) + return registry +} + // TestInvokeAgent_AuthPropagation exercises the full MCP HTTP stack: // the MCP client sends a request with an Authorization header, the handler // recovers the auth session from RequestExtra, and the A2A backend receives // the token produced by UpstreamAuth. func TestInvokeAgent_AuthPropagation(t *testing.T) { - // Fake A2A backend — records the Authorization header it receives. backend := newA2ABackend(t) - authProvider := &fakeAuthProvider{session: &fakeSession{}} - // Real MCP handler (kubeClient is nil; invoke_agent does not use it). - mcpHandler, err := NewMCPHandler(nil, backend.server.URL, authProvider, 5*time.Second) + registry := newTestRegistry(t, "default", "test-agent", backend.server.URL, authProvider) + mcpHandler, err := NewMCPHandler(nil, registry, authProvider) require.NoError(t, err) mcpServer := httptest.NewServer(mcpHandler) t.Cleanup(mcpServer.Close) - // MCP client whose HTTP transport injects an Authorization header on every request. transport := &mcpsdk.StreamableClientTransport{ Endpoint: mcpServer.URL, HTTPClient: &http.Client{ - Transport: &authRoundTripper{ - base: http.DefaultTransport, - token: "test-token", - }, + Transport: &authRoundTripper{base: http.DefaultTransport, token: "test-token"}, }, DisableStandaloneSSE: true, } @@ -141,10 +154,10 @@ func TestInvokeAgent_AuthPropagation(t *testing.T) { // propagated to the A2A backend. func TestInvokeAgent_NoAuthPropagationWithoutHeader(t *testing.T) { backend := newA2ABackend(t) - authProvider := &fakeAuthProvider{session: &fakeSession{}} - mcpHandler, err := NewMCPHandler(nil, backend.server.URL, authProvider, 5*time.Second) + registry := newTestRegistry(t, "default", "test-agent", backend.server.URL, authProvider) + mcpHandler, err := NewMCPHandler(nil, registry, authProvider) require.NoError(t, err) mcpServer := httptest.NewServer(mcpHandler) diff --git a/go/core/pkg/app/app.go b/go/core/pkg/app/app.go index d47ab55ad..f14476cf1 100644 --- a/go/core/pkg/app/app.go +++ b/go/core/pkg/app/app.go @@ -613,8 +613,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne // Register A2A handlers on all replicas a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, httpserver.APIPathA2ASandboxes, extensionCfg.Authenticator) - - if err := mgr.Add(a2a.NewA2ARegistrar( + a2aRegistrar := a2a.NewA2ARegistrar( mgr.GetCache(), a2aHandler, cfg.A2ABaseUrl+httpserver.APIPathA2A, @@ -623,17 +622,18 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne int(cfg.Streaming.MaxBufSize.Value()), int(cfg.Streaming.InitialBufSize.Value()), cfg.Streaming.Timeout, - )); err != nil { + ) + if err := mgr.Add(a2aRegistrar); err != nil { setupLog.Error(err, "unable to set up a2a registrar") os.Exit(1) } - // Create MCP handler that bridges to A2A + // Create MCP handler that invokes agents directly via their A2A clients, + // bypassing the controller's own HTTP A2A listener. mcpHandler, err := mcp.NewMCPHandler( mgr.GetClient(), - cfg.A2ABaseUrl+httpserver.APIPathA2A, + a2aRegistrar.ClientRegistry(), extensionCfg.Authenticator, - cfg.Streaming.Timeout, ) if err != nil { setupLog.Error(err, "unable to create MCP handler")