From a4f5935d669c882dc722ed6ff978e19239ab77fd Mon Sep 17 00:00:00 2001 From: irwin Date: Thu, 5 Feb 2026 18:34:38 +0900 Subject: [PATCH] Fix: Move markInitialized() and reconnect() after status code check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When connecting to an MCP server that doesn't support Streamable HTTP transport (returns 405 Method Not Allowed), the transports still called markInitialized() and reconnect() before checking the status code, which triggered unnecessary GET requests. This caused issues when using transport fallback (Streamable HTTP → SSE), as duplicate SSE sessions were created on the upstream server. This fix moves markInitialized() and reconnect() inside the success status code check (2xx), so they are only called for successful responses. Fixes #773 Related to #362 Co-Authored-By: Claude Opus 4.5 --- .../HttpClientStreamableHttpTransport.java | 18 ++++++++++-------- .../WebClientStreamableHttpTransport.java | 15 ++++++++------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 0a8dff363..3b411fdfe 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -466,19 +466,21 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); })).flatMap(responseEvent -> { - if (transportSession.markInitialized( - responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - - reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); - } - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); int statusCode = responseEvent.responseInfo().statusCode(); if (statusCode >= 200 && statusCode < 300) { + // Only initialize session and open async stream for successful + // responses + if (transportSession.markInitialized(responseEvent.responseInfo() + .headers() + .firstValue("mcp-session-id") + .orElseGet(() -> null))) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); + } String contentType = responseEvent.responseInfo() .headers() diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 5af98985d..6f90bde5b 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -310,18 +310,19 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { }) .bodyValue(jsonText) .exchangeToFlux(response -> { - if (transportSession - .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - reconnect(null).contextWrite(sink.contextView()).subscribe(); - } - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); // The spec mentions only ACCEPTED, but the existing SDKs can return // 200 OK for notifications if (response.statusCode().is2xxSuccessful()) { + // Only initialize session and open async stream for successful + // responses + if (transportSession + .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + reconnect(null).contextWrite(sink.contextView()).subscribe(); + } Optional contentType = response.headers().contentType(); long contentLength = response.headers().contentLength().orElse(-1); // Existing SDKs consume notifications with no response body nor