diff --git a/dev-packages/e2e-tests/test-applications/node-express-v5/package.json b/dev-packages/e2e-tests/test-applications/node-express-v5/package.json index f29feda5eea8..a6dd5fab1fbf 100644 --- a/dev-packages/e2e-tests/test-applications/node-express-v5/package.json +++ b/dev-packages/e2e-tests/test-applications/node-express-v5/package.json @@ -11,7 +11,7 @@ "test:assert": "pnpm test" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.10.2", + "@modelcontextprotocol/sdk": "^1.26.0", "@sentry/node": "latest || *", "@trpc/server": "10.45.4", "@trpc/client": "10.45.4", diff --git a/dev-packages/e2e-tests/test-applications/node-express-v5/src/mcp.ts b/dev-packages/e2e-tests/test-applications/node-express-v5/src/mcp.ts index c5f2c24c61b8..a819858c8f37 100644 --- a/dev-packages/e2e-tests/test-applications/node-express-v5/src/mcp.ts +++ b/dev-packages/e2e-tests/test-applications/node-express-v5/src/mcp.ts @@ -1,9 +1,16 @@ +import { randomUUID } from 'node:crypto'; import express from 'express'; import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { z } from 'zod'; import { wrapMcpServerWithSentry } from '@sentry/node'; +// Helper to check if request is an initialize request (compatible with all MCP SDK versions) +function isInitializeRequest(body: unknown): boolean { + return typeof body === 'object' && body !== null && (body as { method?: string }).method === 'initialize'; +} + const mcpRouter = express.Router(); const server = wrapMcpServerWithSentry( @@ -61,4 +68,134 @@ mcpRouter.post('/messages', async (req, res) => { } }); +// ============================================================================= +// Streamable HTTP Transport Endpoints +// This uses StreamableHTTPServerTransport which wraps WebStandardStreamableHTTPServerTransport +// and exercises the wrapper transport pattern that was fixed in the sessionId-based correlation +// See: https://github.com/getsentry/sentry-mcp/issues/767 +// ============================================================================= + +// Create a separate wrapped server for streamable HTTP (to test independent of SSE) +const streamableServer = wrapMcpServerWithSentry( + new McpServer({ + name: 'Echo-Streamable', + version: '1.0.0', + }), +); + +// Register the same handlers on the streamable server +streamableServer.resource( + 'echo', + new ResourceTemplate('echo://{message}', { list: undefined }), + async (uri, { message }) => ({ + contents: [ + { + uri: uri.href, + text: `Resource echo: ${message}`, + }, + ], + }), +); + +streamableServer.tool('echo', { message: z.string() }, async ({ message }) => { + return { + content: [{ type: 'text', text: `Tool echo: ${message}` }], + }; +}); + +streamableServer.prompt('echo', { message: z.string() }, ({ message }) => ({ + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please process this message: ${message}`, + }, + }, + ], +})); + +// Map to store streamable transports by session ID +const streamableTransports: Record = {}; + +// POST endpoint for streamable HTTP (handles both initialization and subsequent requests) +mcpRouter.post('/mcp', express.json(), async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + try { + let transport: StreamableHTTPServerTransport; + + if (sessionId && streamableTransports[sessionId]) { + // Reuse existing transport for session + transport = streamableTransports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request - create new transport + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: sid => { + // Store transport when session is initialized + streamableTransports[sid] = transport; + }, + }); + + // Clean up on close + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && streamableTransports[sid]) { + delete streamableTransports[sid]; + } + }; + + // Connect to server before handling request + await streamableServer.connect(transport); + await transport.handleRequest(req, res, req.body); + return; + } else { + // Invalid request + res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Bad Request: No valid session ID provided' }, + id: null, + }); + return; + } + + // Handle request with existing transport + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling streamable HTTP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Internal server error' }, + id: null, + }); + } + } +}); + +// GET endpoint for SSE streams (server-initiated messages) +mcpRouter.get('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !streamableTransports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = streamableTransports[sessionId]; + await transport.handleRequest(req, res); +}); + +// DELETE endpoint for session termination +mcpRouter.delete('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !streamableTransports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = streamableTransports[sessionId]; + await transport.handleRequest(req, res); +}); + export { mcpRouter }; diff --git a/dev-packages/e2e-tests/test-applications/node-express-v5/tests/mcp.test.ts b/dev-packages/e2e-tests/test-applications/node-express-v5/tests/mcp.test.ts index 00a69b8554db..73dfc1d69432 100644 --- a/dev-packages/e2e-tests/test-applications/node-express-v5/tests/mcp.test.ts +++ b/dev-packages/e2e-tests/test-applications/node-express-v5/tests/mcp.test.ts @@ -2,6 +2,7 @@ import { expect, test } from '@playwright/test'; import { waitForTransaction } from '@sentry-internal/test-utils'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; test('Should record transactions for mcp handlers', async ({ baseURL }) => { const transport = new SSEClientTransport(new URL(`${baseURL}/sse`)); @@ -120,3 +121,135 @@ test('Should record transactions for mcp handlers', async ({ baseURL }) => { // TODO: When https://github.com/modelcontextprotocol/typescript-sdk/pull/358 is released check for trace id equality between the post transaction and the handler transaction }); }); + +/** + * Tests for StreamableHTTPServerTransport (wrapper transport pattern) + * + * StreamableHTTPServerTransport wraps WebStandardStreamableHTTPServerTransport via getters/setters. + * This causes different `this` values in onmessage vs send, which was breaking span correlation. + * + * The fix uses sessionId as the correlation key instead of transport object reference. + * This test verifies that spans are correctly recorded when using the wrapper transport. + * + * @see https://github.com/getsentry/sentry-mcp/issues/767 + */ +test('Should record transactions for streamable HTTP transport (wrapper transport pattern)', async ({ baseURL }) => { + const transport = new StreamableHTTPClientTransport(new URL(`${baseURL}/mcp`)); + + const client = new Client({ + name: 'test-client-streamable', + version: '1.0.0', + }); + + const initializeTransactionPromise = waitForTransaction('node-express-v5', transactionEvent => { + return ( + transactionEvent.transaction === 'initialize' && + transactionEvent.contexts?.trace?.data?.['mcp.server.name'] === 'Echo-Streamable' + ); + }); + + await client.connect(transport); + + await test.step('initialize handshake', async () => { + const initializeTransaction = await initializeTransactionPromise; + expect(initializeTransaction).toBeDefined(); + expect(initializeTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('initialize'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.client.name']).toEqual('test-client-streamable'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.server.name']).toEqual('Echo-Streamable'); + // Verify it's using a StreamableHTTP transport (may be wrapper or inner depending on environment) + expect(initializeTransaction.contexts?.trace?.data?.['mcp.transport']).toMatch(/StreamableHTTPServerTransport/); + }); + + await test.step('tool handler (tests wrapper transport correlation)', async () => { + // This is the critical test - without the sessionId fix, the span would not be completed + // because onmessage and send see different transport instances (wrapper vs inner) + const toolTransactionPromise = waitForTransaction('node-express-v5', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return transactionEvent.transaction === 'tools/call echo' && transport?.includes('StreamableHTTPServerTransport'); + }); + + const toolResult = await client.callTool({ + name: 'echo', + arguments: { + message: 'wrapper-transport-test', + }, + }); + + expect(toolResult).toMatchObject({ + content: [ + { + text: 'Tool echo: wrapper-transport-test', + type: 'text', + }, + ], + }); + + const toolTransaction = await toolTransactionPromise; + expect(toolTransaction).toBeDefined(); + expect(toolTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(toolTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('tools/call'); + expect(toolTransaction.contexts?.trace?.data?.['mcp.tool.name']).toEqual('echo'); + // This attribute proves the span was completed with results (sessionId correlation worked) + expect(toolTransaction.contexts?.trace?.data?.['mcp.tool.result.content_count']).toEqual(1); + }); + + await test.step('resource handler', async () => { + const resourceTransactionPromise = waitForTransaction('node-express-v5', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return ( + transactionEvent.transaction === 'resources/read echo://streamable-test' && + transport?.includes('StreamableHTTPServerTransport') + ); + }); + + const resourceResult = await client.readResource({ + uri: 'echo://streamable-test', + }); + + expect(resourceResult).toMatchObject({ + contents: [{ text: 'Resource echo: streamable-test', uri: 'echo://streamable-test' }], + }); + + const resourceTransaction = await resourceTransactionPromise; + expect(resourceTransaction).toBeDefined(); + expect(resourceTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(resourceTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('resources/read'); + }); + + await test.step('prompt handler', async () => { + const promptTransactionPromise = waitForTransaction('node-express-v5', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return ( + transactionEvent.transaction === 'prompts/get echo' && transport?.includes('StreamableHTTPServerTransport') + ); + }); + + const promptResult = await client.getPrompt({ + name: 'echo', + arguments: { + message: 'streamable-prompt', + }, + }); + + expect(promptResult).toMatchObject({ + messages: [ + { + content: { + text: 'Please process this message: streamable-prompt', + type: 'text', + }, + role: 'user', + }, + ], + }); + + const promptTransaction = await promptTransactionPromise; + expect(promptTransaction).toBeDefined(); + expect(promptTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(promptTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('prompts/get'); + }); + + // Clean up - close the client connection + await client.close(); +}); diff --git a/dev-packages/e2e-tests/test-applications/node-express/package.json b/dev-packages/e2e-tests/test-applications/node-express/package.json index 4305e8593a76..6ab9eb2047b9 100644 --- a/dev-packages/e2e-tests/test-applications/node-express/package.json +++ b/dev-packages/e2e-tests/test-applications/node-express/package.json @@ -11,7 +11,7 @@ "test:assert": "pnpm test" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.10.2", + "@modelcontextprotocol/sdk": "^1.26.0", "@sentry/node": "latest || *", "@trpc/server": "10.45.4", "@trpc/client": "10.45.4", diff --git a/dev-packages/e2e-tests/test-applications/node-express/src/mcp.ts b/dev-packages/e2e-tests/test-applications/node-express/src/mcp.ts index d4944ddcfa2d..638462423d11 100644 --- a/dev-packages/e2e-tests/test-applications/node-express/src/mcp.ts +++ b/dev-packages/e2e-tests/test-applications/node-express/src/mcp.ts @@ -1,9 +1,16 @@ +import { randomUUID } from 'node:crypto'; import express from 'express'; import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { z } from 'zod'; import { wrapMcpServerWithSentry } from '@sentry/node'; +// Helper to check if request is an initialize request (compatible with all MCP SDK versions) +function isInitializeRequest(body: unknown): boolean { + return typeof body === 'object' && body !== null && (body as { method?: string }).method === 'initialize'; +} + const mcpRouter = express.Router(); const server = wrapMcpServerWithSentry( @@ -61,4 +68,134 @@ mcpRouter.post('/messages', async (req, res) => { } }); +// ============================================================================= +// Streamable HTTP Transport Endpoints +// This uses StreamableHTTPServerTransport which wraps WebStandardStreamableHTTPServerTransport +// and exercises the wrapper transport pattern that was fixed in the sessionId-based correlation +// See: https://github.com/getsentry/sentry-mcp/issues/767 +// ============================================================================= + +// Create a separate wrapped server for streamable HTTP (to test independent of SSE) +const streamableServer = wrapMcpServerWithSentry( + new McpServer({ + name: 'Echo-Streamable', + version: '1.0.0', + }), +); + +// Register the same handlers on the streamable server +streamableServer.resource( + 'echo', + new ResourceTemplate('echo://{message}', { list: undefined }), + async (uri, { message }) => ({ + contents: [ + { + uri: uri.href, + text: `Resource echo: ${message}`, + }, + ], + }), +); + +streamableServer.tool('echo', { message: z.string() }, async ({ message }) => { + return { + content: [{ type: 'text', text: `Tool echo: ${message}` }], + }; +}); + +streamableServer.prompt('echo', { message: z.string() }, ({ message }) => ({ + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please process this message: ${message}`, + }, + }, + ], +})); + +// Map to store streamable transports by session ID +const streamableTransports: Record = {}; + +// POST endpoint for streamable HTTP (handles both initialization and subsequent requests) +mcpRouter.post('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + try { + let transport: StreamableHTTPServerTransport; + + if (sessionId && streamableTransports[sessionId]) { + // Reuse existing transport for session + transport = streamableTransports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request - create new transport + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: sid => { + // Store transport when session is initialized + streamableTransports[sid] = transport; + }, + }); + + // Clean up on close + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && streamableTransports[sid]) { + delete streamableTransports[sid]; + } + }; + + // Connect to server before handling request + await streamableServer.connect(transport); + await transport.handleRequest(req, res, req.body); + return; + } else { + // Invalid request + res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Bad Request: No valid session ID provided' }, + id: null, + }); + return; + } + + // Handle request with existing transport + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling streamable HTTP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Internal server error' }, + id: null, + }); + } + } +}); + +// GET endpoint for SSE streams (server-initiated messages) +mcpRouter.get('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !streamableTransports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = streamableTransports[sessionId]; + await transport.handleRequest(req, res); +}); + +// DELETE endpoint for session termination +mcpRouter.delete('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !streamableTransports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = streamableTransports[sessionId]; + await transport.handleRequest(req, res); +}); + export { mcpRouter }; diff --git a/dev-packages/e2e-tests/test-applications/node-express/tests/mcp.test.ts b/dev-packages/e2e-tests/test-applications/node-express/tests/mcp.test.ts index ab31d4c215b8..143867c773e6 100644 --- a/dev-packages/e2e-tests/test-applications/node-express/tests/mcp.test.ts +++ b/dev-packages/e2e-tests/test-applications/node-express/tests/mcp.test.ts @@ -2,6 +2,7 @@ import { expect, test } from '@playwright/test'; import { waitForTransaction } from '@sentry-internal/test-utils'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; test('Should record transactions for mcp handlers', async ({ baseURL }) => { const transport = new SSEClientTransport(new URL(`${baseURL}/sse`)); @@ -126,3 +127,135 @@ test('Should record transactions for mcp handlers', async ({ baseURL }) => { // TODO: When https://github.com/modelcontextprotocol/typescript-sdk/pull/358 is released check for trace id equality between the post transaction and the handler transaction }); }); + +/** + * Tests for StreamableHTTPServerTransport (wrapper transport pattern) + * + * StreamableHTTPServerTransport wraps WebStandardStreamableHTTPServerTransport via getters/setters. + * This causes different `this` values in onmessage vs send, which was breaking span correlation. + * + * The fix uses sessionId as the correlation key instead of transport object reference. + * This test verifies that spans are correctly recorded when using the wrapper transport. + * + * @see https://github.com/getsentry/sentry-mcp/issues/767 + */ +test('Should record transactions for streamable HTTP transport (wrapper transport pattern)', async ({ baseURL }) => { + const transport = new StreamableHTTPClientTransport(new URL(`${baseURL}/mcp`)); + + const client = new Client({ + name: 'test-client-streamable', + version: '1.0.0', + }); + + const initializeTransactionPromise = waitForTransaction('node-express', transactionEvent => { + return ( + transactionEvent.transaction === 'initialize' && + transactionEvent.contexts?.trace?.data?.['mcp.server.name'] === 'Echo-Streamable' + ); + }); + + await client.connect(transport); + + await test.step('initialize handshake', async () => { + const initializeTransaction = await initializeTransactionPromise; + expect(initializeTransaction).toBeDefined(); + expect(initializeTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('initialize'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.client.name']).toEqual('test-client-streamable'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.server.name']).toEqual('Echo-Streamable'); + // Verify it's using a StreamableHTTP transport (may be wrapper or inner depending on environment) + expect(initializeTransaction.contexts?.trace?.data?.['mcp.transport']).toMatch(/StreamableHTTPServerTransport/); + }); + + await test.step('tool handler (tests wrapper transport correlation)', async () => { + // This is the critical test - without the sessionId fix, the span would not be completed + // because onmessage and send see different transport instances (wrapper vs inner) + const toolTransactionPromise = waitForTransaction('node-express', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return transactionEvent.transaction === 'tools/call echo' && transport?.includes('StreamableHTTPServerTransport'); + }); + + const toolResult = await client.callTool({ + name: 'echo', + arguments: { + message: 'wrapper-transport-test', + }, + }); + + expect(toolResult).toMatchObject({ + content: [ + { + text: 'Tool echo: wrapper-transport-test', + type: 'text', + }, + ], + }); + + const toolTransaction = await toolTransactionPromise; + expect(toolTransaction).toBeDefined(); + expect(toolTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(toolTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('tools/call'); + expect(toolTransaction.contexts?.trace?.data?.['mcp.tool.name']).toEqual('echo'); + // This attribute proves the span was completed with results (sessionId correlation worked) + expect(toolTransaction.contexts?.trace?.data?.['mcp.tool.result.content_count']).toEqual(1); + }); + + await test.step('resource handler', async () => { + const resourceTransactionPromise = waitForTransaction('node-express', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return ( + transactionEvent.transaction === 'resources/read echo://streamable-test' && + transport?.includes('StreamableHTTPServerTransport') + ); + }); + + const resourceResult = await client.readResource({ + uri: 'echo://streamable-test', + }); + + expect(resourceResult).toMatchObject({ + contents: [{ text: 'Resource echo: streamable-test', uri: 'echo://streamable-test' }], + }); + + const resourceTransaction = await resourceTransactionPromise; + expect(resourceTransaction).toBeDefined(); + expect(resourceTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(resourceTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('resources/read'); + }); + + await test.step('prompt handler', async () => { + const promptTransactionPromise = waitForTransaction('node-express', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return ( + transactionEvent.transaction === 'prompts/get echo' && transport?.includes('StreamableHTTPServerTransport') + ); + }); + + const promptResult = await client.getPrompt({ + name: 'echo', + arguments: { + message: 'streamable-prompt', + }, + }); + + expect(promptResult).toMatchObject({ + messages: [ + { + content: { + text: 'Please process this message: streamable-prompt', + type: 'text', + }, + role: 'user', + }, + ], + }); + + const promptTransaction = await promptTransactionPromise; + expect(promptTransaction).toBeDefined(); + expect(promptTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(promptTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('prompts/get'); + }); + + // Clean up - close the client connection + await client.close(); +}); diff --git a/dev-packages/e2e-tests/test-applications/tsx-express/package.json b/dev-packages/e2e-tests/test-applications/tsx-express/package.json index cd6eeba3f19b..3c8cedfca04e 100644 --- a/dev-packages/e2e-tests/test-applications/tsx-express/package.json +++ b/dev-packages/e2e-tests/test-applications/tsx-express/package.json @@ -10,7 +10,7 @@ "test:assert": "pnpm test" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.10.2", + "@modelcontextprotocol/sdk": "^1.26.0", "@sentry/core": "latest || *", "@sentry/node": "latest || *", "@trpc/server": "10.45.4", diff --git a/dev-packages/e2e-tests/test-applications/tsx-express/src/mcp.ts b/dev-packages/e2e-tests/test-applications/tsx-express/src/mcp.ts index 7565e08f7c85..b3b401ac294c 100644 --- a/dev-packages/e2e-tests/test-applications/tsx-express/src/mcp.ts +++ b/dev-packages/e2e-tests/test-applications/tsx-express/src/mcp.ts @@ -1,9 +1,16 @@ +import { randomUUID } from 'node:crypto'; import express from 'express'; import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { z } from 'zod'; import { wrapMcpServerWithSentry } from '@sentry/core'; +// Helper to check if request is an initialize request (compatible with all MCP SDK versions) +function isInitializeRequest(body: unknown): boolean { + return typeof body === 'object' && body !== null && (body as { method?: string }).method === 'initialize'; +} + const mcpRouter = express.Router(); const server = wrapMcpServerWithSentry( @@ -55,10 +62,140 @@ mcpRouter.post('/messages', async (req, res) => { const sessionId = req.query.sessionId; const transport = transports[sessionId as string]; if (transport) { - await transport.handlePostMessage(req, res); + await transport.handlePostMessage(req, res, req.body); } else { res.status(400).send('No transport found for sessionId'); } }); +// ============================================================================= +// Streamable HTTP Transport Endpoints +// This uses StreamableHTTPServerTransport which wraps WebStandardStreamableHTTPServerTransport +// and exercises the wrapper transport pattern that was fixed in the sessionId-based correlation +// See: https://github.com/getsentry/sentry-mcp/issues/767 +// ============================================================================= + +// Create a separate wrapped server for streamable HTTP (to test independent of SSE) +const streamableServer = wrapMcpServerWithSentry( + new McpServer({ + name: 'Echo-Streamable', + version: '1.0.0', + }), +); + +// Register the same handlers on the streamable server +streamableServer.resource( + 'echo', + new ResourceTemplate('echo://{message}', { list: undefined }), + async (uri, { message }) => ({ + contents: [ + { + uri: uri.href, + text: `Resource echo: ${message}`, + }, + ], + }), +); + +streamableServer.tool('echo', { message: z.string() }, async ({ message }) => { + return { + content: [{ type: 'text', text: `Tool echo: ${message}` }], + }; +}); + +streamableServer.prompt('echo', { message: z.string() }, ({ message }) => ({ + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please process this message: ${message}`, + }, + }, + ], +})); + +// Map to store streamable transports by session ID +const streamableTransports: Record = {}; + +// POST endpoint for streamable HTTP (handles both initialization and subsequent requests) +mcpRouter.post('/mcp', express.json(), async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + try { + let transport: StreamableHTTPServerTransport; + + if (sessionId && streamableTransports[sessionId]) { + // Reuse existing transport for session + transport = streamableTransports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request - create new transport + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: sid => { + // Store transport when session is initialized + streamableTransports[sid] = transport; + }, + }); + + // Clean up on close + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && streamableTransports[sid]) { + delete streamableTransports[sid]; + } + }; + + // Connect to server before handling request + await streamableServer.connect(transport); + await transport.handleRequest(req, res, req.body); + return; + } else { + // Invalid request + res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Bad Request: No valid session ID provided' }, + id: null, + }); + return; + } + + // Handle request with existing transport + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling streamable HTTP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Internal server error' }, + id: null, + }); + } + } +}); + +// GET endpoint for SSE streams (server-initiated messages) +mcpRouter.get('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !streamableTransports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = streamableTransports[sessionId]; + await transport.handleRequest(req, res); +}); + +// DELETE endpoint for session termination +mcpRouter.delete('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !streamableTransports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = streamableTransports[sessionId]; + await transport.handleRequest(req, res); +}); + export { mcpRouter }; diff --git a/dev-packages/e2e-tests/test-applications/tsx-express/tests/mcp.test.ts b/dev-packages/e2e-tests/test-applications/tsx-express/tests/mcp.test.ts index 997d8d7dff82..a89cfcaa11c6 100644 --- a/dev-packages/e2e-tests/test-applications/tsx-express/tests/mcp.test.ts +++ b/dev-packages/e2e-tests/test-applications/tsx-express/tests/mcp.test.ts @@ -2,6 +2,7 @@ import { expect, test } from '@playwright/test'; import { waitForTransaction } from '@sentry-internal/test-utils'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; test('Records transactions for mcp handlers', async ({ baseURL }) => { const transport = new SSEClientTransport(new URL(`${baseURL}/sse`)); @@ -123,3 +124,135 @@ test('Records transactions for mcp handlers', async ({ baseURL }) => { // TODO: When https://github.com/modelcontextprotocol/typescript-sdk/pull/358 is released check for trace id equality between the post transaction and the handler transaction }); }); + +/** + * Tests for StreamableHTTPServerTransport (wrapper transport pattern) + * + * StreamableHTTPServerTransport wraps WebStandardStreamableHTTPServerTransport via getters/setters. + * This causes different `this` values in onmessage vs send, which was breaking span correlation. + * + * The fix uses sessionId as the correlation key instead of transport object reference. + * This test verifies that spans are correctly recorded when using the wrapper transport. + * + * @see https://github.com/getsentry/sentry-mcp/issues/767 + */ +test('Should record transactions for streamable HTTP transport (wrapper transport pattern)', async ({ baseURL }) => { + const transport = new StreamableHTTPClientTransport(new URL(`${baseURL}/mcp`)); + + const client = new Client({ + name: 'test-client-streamable', + version: '1.0.0', + }); + + const initializeTransactionPromise = waitForTransaction('tsx-express', transactionEvent => { + return ( + transactionEvent.transaction === 'initialize' && + transactionEvent.contexts?.trace?.data?.['mcp.server.name'] === 'Echo-Streamable' + ); + }); + + await client.connect(transport); + + await test.step('initialize handshake', async () => { + const initializeTransaction = await initializeTransactionPromise; + expect(initializeTransaction).toBeDefined(); + expect(initializeTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('initialize'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.client.name']).toEqual('test-client-streamable'); + expect(initializeTransaction.contexts?.trace?.data?.['mcp.server.name']).toEqual('Echo-Streamable'); + // Verify it's using a StreamableHTTP transport (may be wrapper or inner depending on environment) + expect(initializeTransaction.contexts?.trace?.data?.['mcp.transport']).toMatch(/StreamableHTTPServerTransport/); + }); + + await test.step('tool handler (tests wrapper transport correlation)', async () => { + // This is the critical test - without the sessionId fix, the span would not be completed + // because onmessage and send see different transport instances (wrapper vs inner) + const toolTransactionPromise = waitForTransaction('tsx-express', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return transactionEvent.transaction === 'tools/call echo' && transport?.includes('StreamableHTTPServerTransport'); + }); + + const toolResult = await client.callTool({ + name: 'echo', + arguments: { + message: 'wrapper-transport-test', + }, + }); + + expect(toolResult).toMatchObject({ + content: [ + { + text: 'Tool echo: wrapper-transport-test', + type: 'text', + }, + ], + }); + + const toolTransaction = await toolTransactionPromise; + expect(toolTransaction).toBeDefined(); + expect(toolTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(toolTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('tools/call'); + expect(toolTransaction.contexts?.trace?.data?.['mcp.tool.name']).toEqual('echo'); + // This attribute proves the span was completed with results (sessionId correlation worked) + expect(toolTransaction.contexts?.trace?.data?.['mcp.tool.result.content_count']).toEqual(1); + }); + + await test.step('resource handler', async () => { + const resourceTransactionPromise = waitForTransaction('tsx-express', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return ( + transactionEvent.transaction === 'resources/read echo://streamable-test' && + transport?.includes('StreamableHTTPServerTransport') + ); + }); + + const resourceResult = await client.readResource({ + uri: 'echo://streamable-test', + }); + + expect(resourceResult).toMatchObject({ + contents: [{ text: 'Resource echo: streamable-test', uri: 'echo://streamable-test' }], + }); + + const resourceTransaction = await resourceTransactionPromise; + expect(resourceTransaction).toBeDefined(); + expect(resourceTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(resourceTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('resources/read'); + }); + + await test.step('prompt handler', async () => { + const promptTransactionPromise = waitForTransaction('tsx-express', transactionEvent => { + const transport = transactionEvent.contexts?.trace?.data?.['mcp.transport'] as string | undefined; + return ( + transactionEvent.transaction === 'prompts/get echo' && transport?.includes('StreamableHTTPServerTransport') + ); + }); + + const promptResult = await client.getPrompt({ + name: 'echo', + arguments: { + message: 'streamable-prompt', + }, + }); + + expect(promptResult).toMatchObject({ + messages: [ + { + content: { + text: 'Please process this message: streamable-prompt', + type: 'text', + }, + role: 'user', + }, + ], + }); + + const promptTransaction = await promptTransactionPromise; + expect(promptTransaction).toBeDefined(); + expect(promptTransaction.contexts?.trace?.op).toEqual('mcp.server'); + expect(promptTransaction.contexts?.trace?.data?.['mcp.method.name']).toEqual('prompts/get'); + }); + + // Clean up - close the client connection + await client.close(); +}); diff --git a/packages/core/src/integrations/mcp-server/correlation.ts b/packages/core/src/integrations/mcp-server/correlation.ts index 3567ec382cdf..068e4d4432d1 100644 --- a/packages/core/src/integrations/mcp-server/correlation.ts +++ b/packages/core/src/integrations/mcp-server/correlation.ts @@ -2,8 +2,12 @@ * Request-span correlation system for MCP server instrumentation * * Handles mapping requestId to span data for correlation with handler execution. - * Uses WeakMap to scope correlation maps per transport instance, preventing - * request ID collisions between different MCP sessions. + * + * Uses sessionId as the primary key for stateful transports. This handles the wrapper + * transport pattern (e.g., NodeStreamableHTTPServerTransport wrapping WebStandardStreamableHTTPServerTransport) + * where onmessage and send may receive different `this` values but share the same sessionId. + * + * Falls back to WeakMap by transport instance for stateless transports (no sessionId). */ import { SPAN_STATUS_ERROR } from '../../tracing'; @@ -14,22 +18,42 @@ import { buildServerAttributesFromInfo, extractSessionDataFromInitializeResponse import type { MCPTransport, RequestId, RequestSpanMapValue, ResolvedMcpOptions } from './types'; /** - * Transport-scoped correlation system that prevents collisions between different MCP sessions - * @internal Each transport instance gets its own correlation map, eliminating request ID conflicts + * Session-scoped correlation for stateful transports (with sessionId) + * @internal Using sessionId as key handles wrapper transport patterns where + * different transport objects share the same logical session */ -const transportToSpanMap = new WeakMap>(); +const sessionToSpanMap = new Map>(); /** - * Gets or creates the span map for a specific transport instance + * Transport-scoped correlation fallback for stateless transports (no sessionId) + * @internal WeakMap allows automatic cleanup when transport is garbage collected + */ +const statelessSpanMap = new WeakMap>(); + +/** + * Gets or creates the span map for a transport, using sessionId when available * @internal * @param transport - MCP transport instance - * @returns Span map for the transport + * @returns Span map for the transport/session */ function getOrCreateSpanMap(transport: MCPTransport): Map { - let spanMap = transportToSpanMap.get(transport); + const sessionId = transport.sessionId; + + if (sessionId) { + // Stateful transport - use sessionId as key (handles wrapper pattern) + let spanMap = sessionToSpanMap.get(sessionId); + if (!spanMap) { + spanMap = new Map(); + sessionToSpanMap.set(sessionId, spanMap); + } + return spanMap; + } + + // Stateless fallback - use transport instance as key + let spanMap = statelessSpanMap.get(transport); if (!spanMap) { spanMap = new Map(); - transportToSpanMap.set(transport, spanMap); + statelessSpanMap.set(transport, spanMap); } return spanMap; } @@ -99,7 +123,26 @@ export function completeSpanWithResults( * @param transport - MCP transport instance */ export function cleanupPendingSpansForTransport(transport: MCPTransport): void { - const spanMap = transportToSpanMap.get(transport); + const sessionId = transport.sessionId; + + // Try sessionId-based cleanup first (for stateful transports) + if (sessionId) { + const spanMap = sessionToSpanMap.get(sessionId); + if (spanMap) { + for (const [, spanData] of spanMap) { + spanData.span.setStatus({ + code: SPAN_STATUS_ERROR, + message: 'cancelled', + }); + spanData.span.end(); + } + sessionToSpanMap.delete(sessionId); + } + return; + } + + // Fallback to transport-based cleanup (for stateless transports) + const spanMap = statelessSpanMap.get(transport); if (spanMap) { for (const [, spanData] of spanMap) { spanData.span.setStatus({ @@ -109,5 +152,6 @@ export function cleanupPendingSpansForTransport(transport: MCPTransport): void { spanData.span.end(); } spanMap.clear(); + // Note: WeakMap entries are automatically cleaned up when transport is GC'd } } diff --git a/packages/core/src/integrations/mcp-server/sessionManagement.ts b/packages/core/src/integrations/mcp-server/sessionManagement.ts index 9d9c8b48f27d..76f875e67739 100644 --- a/packages/core/src/integrations/mcp-server/sessionManagement.ts +++ b/packages/core/src/integrations/mcp-server/sessionManagement.ts @@ -1,36 +1,71 @@ /** * Session data management for MCP server instrumentation + * + * Uses sessionId as the primary key for stateful transports. This handles the wrapper + * transport pattern (e.g., NodeStreamableHTTPServerTransport wrapping WebStandardStreamableHTTPServerTransport) + * where different methods may receive different `this` values but share the same sessionId. + * + * Falls back to WeakMap by transport instance for stateless transports (no sessionId). */ import type { MCPTransport, PartyInfo, SessionData } from './types'; /** - * Transport-scoped session data storage (only for transports with sessionId) - * @internal Maps transport instances to session-level data + * Session-scoped data storage for stateful transports (with sessionId) + * @internal Using sessionId as key handles wrapper transport patterns */ -const transportToSessionData = new WeakMap(); +const sessionToSessionData = new Map(); /** - * Stores session data for a transport with sessionId + * Transport-scoped data storage fallback for stateless transports (no sessionId) + * @internal WeakMap allows automatic cleanup when transport is garbage collected + */ +const statelessSessionData = new WeakMap(); + +/** + * Gets session data for a transport, checking sessionId first then fallback + * @internal + */ +function getSessionData(transport: MCPTransport): SessionData | undefined { + const sessionId = transport.sessionId; + if (sessionId) { + return sessionToSessionData.get(sessionId); + } + return statelessSessionData.get(transport); +} + +/** + * Sets session data for a transport, using sessionId when available + * @internal + */ +function setSessionData(transport: MCPTransport, data: SessionData): void { + const sessionId = transport.sessionId; + if (sessionId) { + sessionToSessionData.set(sessionId, data); + } else { + statelessSessionData.set(transport, data); + } +} + +/** + * Stores session data for a transport * @param transport - MCP transport instance * @param sessionData - Session data to store */ export function storeSessionDataForTransport(transport: MCPTransport, sessionData: SessionData): void { - if (transport.sessionId) { - transportToSessionData.set(transport, sessionData); - } + // For stateful transports, always store (sessionId is the key) + // For stateless transports, also store (transport instance is the key) + setSessionData(transport, sessionData); } /** - * Updates session data for a transport with sessionId (merges with existing data) + * Updates session data for a transport (merges with existing data) * @param transport - MCP transport instance * @param partialSessionData - Partial session data to merge with existing data */ export function updateSessionDataForTransport(transport: MCPTransport, partialSessionData: Partial): void { - if (transport.sessionId) { - const existingData = transportToSessionData.get(transport) || {}; - transportToSessionData.set(transport, { ...existingData, ...partialSessionData }); - } + const existingData = getSessionData(transport) || {}; + setSessionData(transport, { ...existingData, ...partialSessionData }); } /** @@ -39,7 +74,7 @@ export function updateSessionDataForTransport(transport: MCPTransport, partialSe * @returns Client information if available */ export function getClientInfoForTransport(transport: MCPTransport): PartyInfo | undefined { - return transportToSessionData.get(transport)?.clientInfo; + return getSessionData(transport)?.clientInfo; } /** @@ -48,7 +83,7 @@ export function getClientInfoForTransport(transport: MCPTransport): PartyInfo | * @returns Protocol version if available */ export function getProtocolVersionForTransport(transport: MCPTransport): string | undefined { - return transportToSessionData.get(transport)?.protocolVersion; + return getSessionData(transport)?.protocolVersion; } /** @@ -57,7 +92,7 @@ export function getProtocolVersionForTransport(transport: MCPTransport): string * @returns Complete session data if available */ export function getSessionDataForTransport(transport: MCPTransport): SessionData | undefined { - return transportToSessionData.get(transport); + return getSessionData(transport); } /** @@ -65,5 +100,10 @@ export function getSessionDataForTransport(transport: MCPTransport): SessionData * @param transport - MCP transport instance */ export function cleanupSessionDataForTransport(transport: MCPTransport): void { - transportToSessionData.delete(transport); + const sessionId = transport.sessionId; + if (sessionId) { + sessionToSessionData.delete(sessionId); + } + // Note: WeakMap entries are automatically cleaned up when transport is GC'd + // No explicit delete needed for statelessSessionData } diff --git a/packages/core/test/lib/integrations/mcp-server/testUtils.ts b/packages/core/test/lib/integrations/mcp-server/testUtils.ts index 9593391ca856..782f03a78e35 100644 --- a/packages/core/test/lib/integrations/mcp-server/testUtils.ts +++ b/packages/core/test/lib/integrations/mcp-server/testUtils.ts @@ -61,3 +61,75 @@ export function createMockSseTransport() { return new SSEServerTransport(); } + +/** + * Create a mock wrapper transport that simulates the NodeStreamableHTTPServerTransport pattern. + * + * NodeStreamableHTTPServerTransport wraps WebStandardStreamableHTTPServerTransport and proxies + * onmessage, onclose, onerror via getters/setters, while send delegates to the inner transport. + * This causes the Sentry instrumentation to see different `this` values in onmessage vs send, + * which is the bug we're testing the fix for. + * + * @see https://github.com/getsentry/sentry-mcp/issues/767 + */ +export function createMockWrapperTransport(sessionId = 'wrapper-session-123') { + // Inner transport (simulates WebStandardStreamableHTTPServerTransport) + // Note: onmessage/onclose/onerror must be initialized to functions so that + // wrapTransportOnMessage/etc. will wrap them (they check for truthiness) + const innerTransport = { + onmessage: vi.fn() as ((message: unknown, extra?: unknown) => void) | undefined, + onclose: vi.fn() as (() => void) | undefined, + onerror: vi.fn() as ((error: Error) => void) | undefined, + send: vi.fn().mockResolvedValue(undefined), + sessionId: sessionId as string | undefined, + }; + + // Outer wrapper transport (simulates NodeStreamableHTTPServerTransport) + // Uses Object.defineProperty to create getter/setter pairs that proxy to inner transport + const wrapperTransport = { + send: async (message: unknown, _options?: unknown) => innerTransport.send(message, _options), + } as { + sessionId: string | undefined; + onmessage: ((message: unknown, extra?: unknown) => void) | undefined; + onclose: (() => void) | undefined; + onerror: ((error: Error) => void) | undefined; + send: (message: unknown, options?: unknown) => Promise; + }; + + // Define getter/setter pairs that proxy to inner transport + Object.defineProperty(wrapperTransport, 'sessionId', { + get: () => innerTransport.sessionId, + enumerable: true, + }); + + Object.defineProperty(wrapperTransport, 'onmessage', { + get: () => innerTransport.onmessage, + set: (handler: ((message: unknown, extra?: unknown) => void) | undefined) => { + innerTransport.onmessage = handler; + }, + enumerable: true, + }); + + Object.defineProperty(wrapperTransport, 'onclose', { + get: () => innerTransport.onclose, + set: (handler: (() => void) | undefined) => { + innerTransport.onclose = handler; + }, + enumerable: true, + }); + + Object.defineProperty(wrapperTransport, 'onerror', { + get: () => innerTransport.onerror, + set: (handler: ((error: Error) => void) | undefined) => { + innerTransport.onerror = handler; + }, + enumerable: true, + }); + + return { + /** The outer wrapper transport (what users pass to server.connect()) */ + wrapper: wrapperTransport, + /** The inner transport (what onmessage actually runs on due to getter/setter) */ + inner: innerTransport, + }; +} diff --git a/packages/core/test/lib/integrations/mcp-server/transportInstrumentation.test.ts b/packages/core/test/lib/integrations/mcp-server/transportInstrumentation.test.ts index e8ffb31477ad..c720512cb97b 100644 --- a/packages/core/test/lib/integrations/mcp-server/transportInstrumentation.test.ts +++ b/packages/core/test/lib/integrations/mcp-server/transportInstrumentation.test.ts @@ -28,6 +28,7 @@ import { createMockSseTransport, createMockStdioTransport, createMockTransport, + createMockWrapperTransport, } from './testUtils'; describe('MCP Server Transport Instrumentation', () => { @@ -492,19 +493,21 @@ describe('MCP Server Transport Instrumentation', () => { expect(getSessionDataForTransport(mockTransport)).toBeUndefined(); }); - it('should only store data for transports with sessionId', () => { + it('should store data for transports without sessionId using WeakMap fallback', () => { const transportWithoutSession = { onmessage: vi.fn(), onclose: vi.fn(), onerror: vi.fn(), send: vi.fn().mockResolvedValue(undefined), protocolVersion: '2025-06-18', + // No sessionId - uses WeakMap fallback }; const sessionData = { protocolVersion: '2025-06-18' }; storeSessionDataForTransport(transportWithoutSession, sessionData); - expect(getSessionDataForTransport(transportWithoutSession)).toBeUndefined(); + // With the WeakMap fallback, data IS stored even for transports without sessionId + expect(getSessionDataForTransport(transportWithoutSession)).toEqual(sessionData); }); }); @@ -758,4 +761,152 @@ describe('MCP Server Transport Instrumentation', () => { ); }); }); + + describe('Wrapper Transport Pattern (NodeStreamableHTTPServerTransport)', () => { + /** + * Tests for the wrapper transport pattern used by NodeStreamableHTTPServerTransport. + * + * NodeStreamableHTTPServerTransport wraps WebStandardStreamableHTTPServerTransport + * and proxies onmessage/onclose via getters/setters. This causes Sentry's instrumentation + * to see different `this` values in onmessage (inner transport) vs send (outer transport). + * + * The fix uses sessionId as the correlation key instead of transport object reference, + * since both inner and outer transports share the same sessionId. + * + * @see https://github.com/getsentry/sentry-mcp/issues/767 + */ + + it('should correlate spans correctly when using wrapper transport pattern', async () => { + const { wrapper } = createMockWrapperTransport('wrapper-test-session'); + const mockMcpServer = createMockMcpServer(); + const wrappedMcpServer = wrapMcpServerWithSentry(mockMcpServer); + + // Connect using the wrapper transport (what users do) + await wrappedMcpServer.connect(wrapper); + + const mockSpan = { setAttributes: vi.fn(), end: vi.fn() }; + startInactiveSpanSpy.mockReturnValue(mockSpan as any); + + // Simulate incoming request - due to getter/setter, onmessage runs on inner transport + // but we call it via the wrapper's property access + wrapper.onmessage?.( + { + jsonrpc: '2.0', + method: 'tools/call', + id: 'wrapper-req-1', + params: { name: 'test-tool' }, + }, + {}, + ); + + expect(startInactiveSpanSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'tools/call test-tool', + op: 'mcp.server', + }), + ); + + // Simulate outgoing response - send is called on wrapper, but the bug was that + // it couldn't find the span because `this` was different from onmessage's `this` + await wrapper.send({ + jsonrpc: '2.0', + id: 'wrapper-req-1', + result: { content: [{ type: 'text', text: 'success' }] }, + }); + + // The span should be completed (this was broken before the fix) + expect(mockSpan.end).toHaveBeenCalled(); + }); + + it('should handle initialize request/response with wrapper transport', async () => { + const { wrapper } = createMockWrapperTransport('init-wrapper-session'); + const mockMcpServer = createMockMcpServer(); + const wrappedMcpServer = wrapMcpServerWithSentry(mockMcpServer); + + await wrappedMcpServer.connect(wrapper); + + const mockSpan = { setAttributes: vi.fn(), end: vi.fn() }; + startInactiveSpanSpy.mockReturnValue(mockSpan as any); + + // Initialize request + wrapper.onmessage?.( + { + jsonrpc: '2.0', + method: 'initialize', + id: 'init-1', + params: { + protocolVersion: '2025-06-18', + clientInfo: { name: 'test-client', version: '1.0.0' }, + }, + }, + {}, + ); + + // Initialize response + await wrapper.send({ + jsonrpc: '2.0', + id: 'init-1', + result: { + protocolVersion: '2025-06-18', + serverInfo: { name: 'test-server', version: '2.0.0' }, + capabilities: {}, + }, + }); + + // Span should have client and server info attributes + expect(mockSpan.setAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + 'mcp.client.name': 'test-client', + 'mcp.client.version': '1.0.0', + }), + ); + expect(mockSpan.setAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + 'mcp.server.name': 'test-server', + 'mcp.server.version': '2.0.0', + }), + ); + expect(mockSpan.end).toHaveBeenCalled(); + }); + + it('should cleanup spans on close with wrapper transport', async () => { + const { wrapper } = createMockWrapperTransport('cleanup-wrapper-session'); + const mockMcpServer = createMockMcpServer(); + const wrappedMcpServer = wrapMcpServerWithSentry(mockMcpServer); + + await wrappedMcpServer.connect(wrapper); + + const mockSpan = { setAttributes: vi.fn(), end: vi.fn(), setStatus: vi.fn() }; + startInactiveSpanSpy.mockReturnValue(mockSpan as any); + + // Start a request but don't complete it + wrapper.onmessage?.( + { + jsonrpc: '2.0', + method: 'tools/call', + id: 'uncompleted-req', + params: { name: 'slow-tool' }, + }, + {}, + ); + + // Close the transport (should cleanup pending spans) + wrapper.onclose?.(); + + // Span should be ended with cancelled status + expect(mockSpan.setStatus).toHaveBeenCalledWith({ + code: 2, // SPAN_STATUS_ERROR + message: 'cancelled', + }); + expect(mockSpan.end).toHaveBeenCalled(); + }); + + it('should verify inner and outer transports share the same sessionId', () => { + const { wrapper, inner } = createMockWrapperTransport('shared-session-test'); + + // This is the key invariant that makes our fix work + expect(wrapper.sessionId).toBe(inner.sessionId); + expect(wrapper.sessionId).toBe('shared-session-test'); + }); + }); });