Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions go/core/internal/a2a/a2a_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type A2ARegistrar struct {
cache crcache.Cache
handlerMux A2AHandlerMux
clientRegistry *AgentClientRegistry
a2aBaseURL string
sandboxA2AURL string
authenticator auth.AuthProvider
Expand All @@ -45,11 +46,12 @@ func NewA2ARegistrar(
streamingTimeout time.Duration,
) *A2ARegistrar {
reg := &A2ARegistrar{
cache: cache,
handlerMux: mux,
a2aBaseURL: a2aBaseUrl,
sandboxA2AURL: sandboxA2ABaseURL,
authenticator: authenticator,
cache: cache,
handlerMux: mux,
clientRegistry: NewAgentClientRegistry(),
a2aBaseURL: a2aBaseUrl,
sandboxA2AURL: sandboxA2ABaseURL,
authenticator: authenticator,
a2aBaseOptions: []a2aclient.Option{
a2aclient.WithTimeout(streamingTimeout),
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
Expand All @@ -60,6 +62,12 @@ func NewA2ARegistrar(
return reg
}

// ClientRegistry returns the registry of A2A clients for direct agent
// invocation, populated as agents are registered and deregistered.
func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry {
return a.clientRegistry
}

func (a *A2ARegistrar) NeedLeaderElection() bool {
return false
}
Expand Down Expand Up @@ -117,6 +125,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
}
ref := a2aRouteKey(agent)
a.handlerMux.RemoveAgentHandler(ref)
a.clientRegistry.delete(ref)
log.V(1).Info("removed A2A handler", "agent", ref)
},
}); err != nil {
Expand Down Expand Up @@ -182,10 +191,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
cardCopy := *card
cardCopy.URL = a.a2aRouteURL(agent)

if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
routeRef := a2aRouteKey(agent)
if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
return fmt.Errorf("set handler for %s: %w", agentRef, err)
}

a.clientRegistry.set(routeRef, client)

log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
return nil
}
Expand Down
58 changes: 58 additions & 0 deletions go/core/internal/a2a/agent_client_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package a2a

import (
"context"
"fmt"
"sync"

a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
"trpc.group/trpc-go/trpc-a2a-go/protocol"
)

// AgentClientRegistry maps agent route keys to their A2A clients.
// The A2ARegistrar populates it; the MCP handler reads from it to invoke
// agents without an HTTP round trip through the controller's own A2A listener.
type AgentClientRegistry struct {
mu sync.RWMutex
clients map[string]*a2aclient.A2AClient
}

func NewAgentClientRegistry() *AgentClientRegistry {
return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)}
}

// set stores the client under the agent's route key (e.g. "namespace/name" or
// "sandboxes/namespace/name").
func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) {
r.mu.Lock()
defer r.mu.Unlock()
r.clients[agentRef] = c
}

// delete removes the client for the given agent route key.
func (r *AgentClientRegistry) delete(agentRef string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.clients, agentRef)
}

// Register adds or replaces the A2A client for the given agent. It is the
// exported counterpart of set, intended for use in tests and explicit
// registrations outside the A2ARegistrar lifecycle.
func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) {
r.set(namespace+"/"+name, c)
}

// SendMessage invokes an agent directly via its cached A2A client.
// namespace and name must identify a non-sandbox agent; sandbox agents use a
// different route key and are not yet reachable via this method.
func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) {
key := namespace + "/" + name
r.mu.RLock()
c, ok := r.clients[key]
r.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name)
}
return c.SendMessage(ctx, params)
}
80 changes: 18 additions & 62 deletions go/core/internal/mcp/mcp_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,24 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/kagent-dev/kagent/go/api/v1alpha2"
"github.com/kagent-dev/kagent/go/core/internal/a2a"
authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth"
"github.com/kagent-dev/kagent/go/core/internal/version"
"github.com/kagent-dev/kagent/go/core/pkg/auth"
mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
"trpc.group/trpc-go/trpc-a2a-go/protocol"
)

// MCPHandler handles MCP requests and bridges them to A2A endpoints
type MCPHandler struct {
kubeClient client.Client
a2aBaseURL string
a2aTimeout time.Duration
agentClients *a2a.AgentClientRegistry
authenticator auth.AuthProvider
httpHandler *mcpsdk.StreamableHTTPHandler
server *mcpsdk.Server
a2aClients sync.Map
}

// Input types for MCP tools
Expand All @@ -56,20 +49,12 @@ type InvokeAgentOutput struct {
ContextID string `json:"context_id,omitempty"`
}

// defaultA2ATimeout is the fallback timeout for A2A client calls and should match
// the configured default streaming timeout.
const defaultA2ATimeout = 10 * time.Minute

// NewMCPHandler creates a new MCP handler
// Wraps the StreamableHTTPHandler and adds A2A bridging and context management.
func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) {
if a2aTimeout <= 0 {
a2aTimeout = defaultA2ATimeout
}
// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly
// to agent A2A clients, bypassing the controller's own HTTP A2A listener.
func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) {
handler := &MCPHandler{
kubeClient: kubeClient,
a2aBaseURL: a2aBaseURL,
a2aTimeout: a2aTimeout,
agentClients: agentClients,
authenticator: authenticator,
}
Comment on lines +52 to 59

Expand Down Expand Up @@ -183,6 +168,16 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR
func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) {
log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent")

// The Go MCP SDK detaches the HTTP request context when dispatching to
// tool handlers, so auth.AuthSessionFrom(ctx) returns nothing. Recover
// the auth session from the HTTP headers preserved in RequestExtra so
// that the A2A client's outbound request to the agent carries the user's JWT.
if extra := req.GetExtra(); extra != nil {
if session, err := h.authenticator.Authenticate(ctx, extra.Header, nil); err == nil {
ctx = auth.AuthSessionTo(ctx, session)
Comment on lines +175 to +177
}
}

// Parse agent reference (namespace/name or just name)
agentNS, agentName, ok := strings.Cut(input.Agent, "/")
if !ok {
Expand All @@ -194,7 +189,6 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool
}, InvokeAgentOutput{}, nil
}
agentRef := agentNS + "/" + agentName
Comment on lines 181 to 191
agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName}

// Get context ID from client request (stateless mode)
// If not provided, contextIDPtr will be nil and a new conversation will start
Expand All @@ -204,47 +198,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool
log.V(1).Info("Using context_id from client request", "context_id", input.ContextID)
}

// Get or create cached A2A client for this agent
a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef)
var a2aClient *a2aclient.A2AClient

if cached, ok := h.a2aClients.Load(agentRef); ok {
if client, ok := cached.(*a2aclient.A2AClient); ok {
a2aClient = client
}
}

// Create new client if not cached
if a2aClient == nil {
// Build A2A client options with authentication propagation
a2aOpts := []a2aclient.Option{
a2aclient.WithTimeout(h.a2aTimeout),
a2aclient.WithHTTPReqHandler(
authimpl.A2ARequestHandler(
h.authenticator,
agentNns,
),
),
}

newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...)
if err != nil {
log.Error(err, "Failed to create A2A client", "agent", agentRef)
return &mcpsdk.CallToolResult{
Content: []mcpsdk.Content{
&mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)},
},
IsError: true,
}, InvokeAgentOutput{}, nil
}

// Cache the client
h.a2aClients.Store(agentRef, newClient)
a2aClient = newClient
}

// Send message via A2A
result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{
// Send message directly via the agent's A2A client, bypassing the
// controller's own HTTP A2A listener.
result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{
Message: protocol.Message{
Kind: protocol.KindMessage,
Role: protocol.MessageRoleUser,
Expand Down
Loading
Loading