diff --git a/package-lock.json b/package-lock.json index d32963a73..f16d8cc4a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "1.24.3", "license": "MIT", "dependencies": { + "@hono/node-server": "^1.19.7", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "content-type": "^1.0.5", @@ -663,6 +664,18 @@ "node": "^18.18.0 || ^20.9.0 || >=21.1.0" } }, + "node_modules/@hono/node-server": { + "version": "1.19.7", + "resolved": "https://registry.npmjs.org/@hono/node-server/-/node-server-1.19.7.tgz", + "integrity": "sha512-vUcD0uauS7EU2caukW8z5lJKtoGMokxNbJtBiwHgpqxEXokaHCBkQUmCHhjFB1VUTWdqj25QoMkMKzgjq+uhrw==", + "license": "MIT", + "engines": { + "node": ">=18.14.1" + }, + "peerDependencies": { + "hono": "^4" + } + }, "node_modules/@humanfs/core": { "version": "0.19.1", "resolved": "https://registry.npmjs.org/@humanfs/core/-/core-0.19.1.tgz", @@ -3027,6 +3040,16 @@ "node": ">= 0.4" } }, + "node_modules/hono": { + "version": "4.10.8", + "resolved": "https://registry.npmjs.org/hono/-/hono-4.10.8.tgz", + "integrity": "sha512-DDT0A0r6wzhe8zCGoYOmMeuGu3dyTAE40HHjwUsWFTEy5WxK1x2WDSsBPlEXgPbRIFY6miDualuUDbasPogIww==", + "license": "MIT", + "peer": true, + "engines": { + "node": ">=16.9.0" + } + }, "node_modules/http-errors": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/http-errors/-/http-errors-2.0.1.tgz", diff --git a/package.json b/package.json index bfbc73802..5f1ca8e4d 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "client": "tsx scripts/cli.ts client" }, "dependencies": { + "@hono/node-server": "^1.19.7", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "content-type": "^1.0.5", diff --git a/src/examples/server/honoWebStandardStreamableHttp.ts b/src/examples/server/honoWebStandardStreamableHttp.ts new file mode 100644 index 000000000..ba8805eae --- /dev/null +++ b/src/examples/server/honoWebStandardStreamableHttp.ts @@ -0,0 +1,74 @@ +/** + * Example MCP server using Hono with WebStandardStreamableHTTPServerTransport + * + * This example demonstrates using the Web Standard transport directly with Hono, + * which works on any runtime: Node.js, Cloudflare Workers, Deno, Bun, etc. + * + * Run with: npx tsx src/examples/server/honoWebStandardStreamableHttp.ts + */ + +import { Hono } from 'hono'; +import { cors } from 'hono/cors'; +import { serve } from '@hono/node-server'; +import * as z from 'zod/v4'; +import { McpServer } from '../../server/mcp.js'; +import { WebStandardStreamableHTTPServerTransport } from '../../server/webStandardStreamableHttp.js'; +import { CallToolResult } from '../../types.js'; + +// Create the MCP server +const server = new McpServer({ + name: 'hono-webstandard-mcp-server', + version: '1.0.0' +}); + +// Register a simple greeting tool +server.registerTool( + 'greet', + { + title: 'Greeting Tool', + description: 'A simple greeting tool', + inputSchema: { name: z.string().describe('Name to greet') } + }, + async ({ name }): Promise => { + return { + content: [{ type: 'text', text: `Hello, ${name}! (from Hono + WebStandard transport)` }] + }; + } +); + +// Create a stateless transport (no options = no session management) +const transport = new WebStandardStreamableHTTPServerTransport(); + +// Create the Hono app +const app = new Hono(); + +// Enable CORS for all origins +app.use( + '*', + cors({ + origin: '*', + allowMethods: ['GET', 'POST', 'DELETE', 'OPTIONS'], + allowHeaders: ['Content-Type', 'mcp-session-id', 'Last-Event-ID', 'mcp-protocol-version'], + exposeHeaders: ['mcp-session-id', 'mcp-protocol-version'] + }) +); + +// Health check endpoint +app.get('/health', c => c.json({ status: 'ok' })); + +// MCP endpoint +app.all('/mcp', c => transport.handleRequest(c.req.raw)); + +// Start the server +const PORT = process.env.MCP_PORT ? parseInt(process.env.MCP_PORT, 10) : 3000; + +server.connect(transport).then(() => { + console.log(`Starting Hono MCP server on port ${PORT}`); + console.log(`Health check: http://localhost:${PORT}/health`); + console.log(`MCP endpoint: http://localhost:${PORT}/mcp`); + + serve({ + fetch: app.fetch, + port: PORT + }); +}); diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index ab1131f63..bc310d98e 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -1,142 +1,42 @@ -import { IncomingMessage, ServerResponse } from 'node:http'; -import { Transport } from '../shared/transport.js'; -import { - MessageExtraInfo, - RequestInfo, - isInitializeRequest, - isJSONRPCRequest, - isJSONRPCResultResponse, - JSONRPCMessage, - JSONRPCMessageSchema, - RequestId, - SUPPORTED_PROTOCOL_VERSIONS, - DEFAULT_NEGOTIATED_PROTOCOL_VERSION, - isJSONRPCErrorResponse -} from '../types.js'; -import getRawBody from 'raw-body'; -import contentType from 'content-type'; -import { randomUUID } from 'node:crypto'; -import { AuthInfo } from './auth/types.js'; - -const MAXIMUM_MESSAGE_SIZE = '4mb'; - -export type StreamId = string; -export type EventId = string; - /** - * Interface for resumability support via event storage + * Node.js HTTP Streamable HTTP Server Transport + * + * This is a thin wrapper around `WebStandardStreamableHTTPServerTransport` that provides + * compatibility with Node.js HTTP server (IncomingMessage/ServerResponse). + * + * For web-standard environments (Cloudflare Workers, Deno, Bun), use `WebStandardStreamableHTTPServerTransport` directly. */ -export interface EventStore { - /** - * Stores an event for later retrieval - * @param streamId ID of the stream the event belongs to - * @param message The JSON-RPC message to store - * @returns The generated event ID for the stored event - */ - storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise; - /** - * Get the stream ID associated with a given event ID. - * @param eventId The event ID to look up - * @returns The stream ID, or undefined if not found - * - * Optional: If not provided, the SDK will use the streamId returned by - * replayEventsAfter for stream mapping. - */ - getStreamIdForEventId?(eventId: EventId): Promise; +import { IncomingMessage, ServerResponse } from 'node:http'; +import { getRequestListener } from '@hono/node-server'; +import { Transport } from '../shared/transport.js'; +import { AuthInfo } from './auth/types.js'; +import { MessageExtraInfo, JSONRPCMessage, RequestId } from '../types.js'; +import { + WebStandardStreamableHTTPServerTransport, + WebStandardStreamableHTTPServerTransportOptions, + EventStore, + StreamId, + EventId +} from './webStandardStreamableHttp.js'; - replayEventsAfter( - lastEventId: EventId, - { - send - }: { - send: (eventId: EventId, message: JSONRPCMessage) => Promise; - } - ): Promise; -} +// Re-export types from the core transport for backward compatibility +export type { EventStore, StreamId, EventId }; /** * Configuration options for StreamableHTTPServerTransport + * + * This is an alias for WebStandardStreamableHTTPServerTransportOptions for backward compatibility. */ -export interface StreamableHTTPServerTransportOptions { - /** - * Function that generates a session ID for the transport. - * The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash) - * - * Return undefined to disable session management. - */ - sessionIdGenerator: (() => string) | undefined; - - /** - * A callback for session initialization events - * This is called when the server initializes a new session. - * Useful in cases when you need to register multiple mcp sessions - * and need to keep track of them. - * @param sessionId The generated session ID - */ - onsessioninitialized?: (sessionId: string) => void | Promise; - - /** - * A callback for session close events - * This is called when the server closes a session due to a DELETE request. - * Useful in cases when you need to clean up resources associated with the session. - * Note that this is different from the transport closing, if you are handling - * HTTP requests from multiple nodes you might want to close each - * StreamableHTTPServerTransport after a request is completed while still keeping the - * session open/running. - * @param sessionId The session ID that was closed - */ - onsessionclosed?: (sessionId: string) => void | Promise; - - /** - * If true, the server will return JSON responses instead of starting an SSE stream. - * This can be useful for simple request/response scenarios without streaming. - * Default is false (SSE streams are preferred). - */ - enableJsonResponse?: boolean; - - /** - * Event store for resumability support - * If provided, resumability will be enabled, allowing clients to reconnect and resume messages - */ - eventStore?: EventStore; - - /** - * List of allowed host header values for DNS rebinding protection. - * If not specified, host validation is disabled. - * @deprecated Use the `hostHeaderValidation` middleware from `@modelcontextprotocol/sdk/server/middleware/hostHeaderValidation.js` instead, - * or use `createMcpExpressApp` from `@modelcontextprotocol/sdk/server/express.js` which includes localhost protection by default. - */ - allowedHosts?: string[]; - - /** - * List of allowed origin header values for DNS rebinding protection. - * If not specified, origin validation is disabled. - * @deprecated Use the `hostHeaderValidation` middleware from `@modelcontextprotocol/sdk/server/middleware/hostHeaderValidation.js` instead, - * or use `createMcpExpressApp` from `@modelcontextprotocol/sdk/server/express.js` which includes localhost protection by default. - */ - allowedOrigins?: string[]; - - /** - * Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured). - * Default is false for backwards compatibility. - * @deprecated Use the `hostHeaderValidation` middleware from `@modelcontextprotocol/sdk/server/middleware/hostHeaderValidation.js` instead, - * or use `createMcpExpressApp` from `@modelcontextprotocol/sdk/server/express.js` which includes localhost protection by default. - */ - enableDnsRebindingProtection?: boolean; - - /** - * Retry interval in milliseconds to suggest to clients in SSE retry field. - * When set, the server will send a retry field in SSE priming events to control - * client reconnection timing for polling behavior. - */ - retryInterval?: number; -} +export type StreamableHTTPServerTransportOptions = WebStandardStreamableHTTPServerTransportOptions; /** * Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. * It supports both SSE streaming and direct HTTP responses. * + * This is a wrapper around `WebStandardStreamableHTTPServerTransport` that provides Node.js HTTP compatibility. + * It uses the `@hono/node-server` library to convert between Node.js HTTP and Web Standard APIs. + * * Usage example: * * ```typescript @@ -168,677 +68,114 @@ export interface StreamableHTTPServerTransportOptions { * - No session validation is performed */ export class StreamableHTTPServerTransport implements Transport { - // when sessionId is not set (undefined), it means the transport is in stateless mode - private sessionIdGenerator: (() => string) | undefined; - private _started: boolean = false; - private _streamMapping: Map = new Map(); - private _requestToStreamMapping: Map = new Map(); - private _requestResponseMap: Map = new Map(); - private _initialized: boolean = false; - private _enableJsonResponse: boolean = false; - private _standaloneSseStreamId: string = '_GET_stream'; - private _eventStore?: EventStore; - private _onsessioninitialized?: (sessionId: string) => void | Promise; - private _onsessionclosed?: (sessionId: string) => void | Promise; - private _allowedHosts?: string[]; - private _allowedOrigins?: string[]; - private _enableDnsRebindingProtection: boolean; - private _retryInterval?: number; - - sessionId?: string; - onclose?: () => void; - onerror?: (error: Error) => void; - onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; - - constructor(options: StreamableHTTPServerTransportOptions) { - this.sessionIdGenerator = options.sessionIdGenerator; - this._enableJsonResponse = options.enableJsonResponse ?? false; - this._eventStore = options.eventStore; - this._onsessioninitialized = options.onsessioninitialized; - this._onsessionclosed = options.onsessionclosed; - this._allowedHosts = options.allowedHosts; - this._allowedOrigins = options.allowedOrigins; - this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; - this._retryInterval = options.retryInterval; - } - - /** - * Starts the transport. This is required by the Transport interface but is a no-op - * for the Streamable HTTP transport as connections are managed per-request. - */ - async start(): Promise { - if (this._started) { - throw new Error('Transport already started'); - } - this._started = true; + private _webStandardTransport: WebStandardStreamableHTTPServerTransport; + private _requestListener: ReturnType; + // Store auth and parsedBody per request for passing through to handleRequest + private _requestContext: WeakMap = new WeakMap(); + + constructor(options: StreamableHTTPServerTransportOptions = {}) { + this._webStandardTransport = new WebStandardStreamableHTTPServerTransport(options); + + // Create a request listener that wraps the web standard transport + // getRequestListener converts Node.js HTTP to Web Standard and properly handles SSE streaming + this._requestListener = getRequestListener(async (webRequest: Request) => { + // Get context if available (set during handleRequest) + const context = this._requestContext.get(webRequest); + return this._webStandardTransport.handleRequest(webRequest, { + authInfo: context?.authInfo, + parsedBody: context?.parsedBody + }); + }); } /** - * Validates request headers for DNS rebinding protection. - * @returns Error message if validation fails, undefined if validation passes. + * Gets the session ID for this transport instance. */ - private validateRequestHeaders(req: IncomingMessage): string | undefined { - // Skip validation if protection is not enabled - if (!this._enableDnsRebindingProtection) { - return undefined; - } - - // Validate Host header if allowedHosts is configured - if (this._allowedHosts && this._allowedHosts.length > 0) { - const hostHeader = req.headers.host; - if (!hostHeader || !this._allowedHosts.includes(hostHeader)) { - return `Invalid Host header: ${hostHeader}`; - } - } - - // Validate Origin header if allowedOrigins is configured - if (this._allowedOrigins && this._allowedOrigins.length > 0) { - const originHeader = req.headers.origin; - if (originHeader && !this._allowedOrigins.includes(originHeader)) { - return `Invalid Origin header: ${originHeader}`; - } - } - - return undefined; + get sessionId(): string | undefined { + return this._webStandardTransport.sessionId; } /** - * Handles an incoming HTTP request, whether GET or POST + * Sets callback for when the transport is closed. */ - async handleRequest(req: IncomingMessage & { auth?: AuthInfo }, res: ServerResponse, parsedBody?: unknown): Promise { - // Validate request headers for DNS rebinding protection - const validationError = this.validateRequestHeaders(req); - if (validationError) { - res.writeHead(403).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: validationError - }, - id: null - }) - ); - this.onerror?.(new Error(validationError)); - return; - } - - if (req.method === 'POST') { - await this.handlePostRequest(req, res, parsedBody); - } else if (req.method === 'GET') { - await this.handleGetRequest(req, res); - } else if (req.method === 'DELETE') { - await this.handleDeleteRequest(req, res); - } else { - await this.handleUnsupportedRequest(res); - } + set onclose(handler: (() => void) | undefined) { + this._webStandardTransport.onclose = handler; } - /** - * Writes a priming event to establish resumption capability. - * Only sends if eventStore is configured (opt-in for resumability) and - * the client's protocol version supports empty SSE data (>= 2025-11-25). - */ - private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string, protocolVersion: string): Promise { - if (!this._eventStore) { - return; - } - - // Priming events have empty data which older clients cannot handle. - // Only send priming events to clients with protocol version >= 2025-11-25 - // which includes the fix for handling empty SSE data. - if (protocolVersion < '2025-11-25') { - return; - } - - const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage); - - let primingEvent = `id: ${primingEventId}\ndata: \n\n`; - if (this._retryInterval !== undefined) { - primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`; - } - res.write(primingEvent); + get onclose(): (() => void) | undefined { + return this._webStandardTransport.onclose; } /** - * Handles GET requests for SSE stream + * Sets callback for transport errors. */ - private async handleGetRequest(req: IncomingMessage, res: ServerResponse): Promise { - // The client MUST include an Accept header, listing text/event-stream as a supported content type. - const acceptHeader = req.headers.accept; - if (!acceptHeader?.includes('text/event-stream')) { - res.writeHead(406).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Not Acceptable: Client must accept text/event-stream' - }, - id: null - }) - ); - return; - } - - // If an Mcp-Session-Id is returned by the server during initialization, - // clients using the Streamable HTTP transport MUST include it - // in the Mcp-Session-Id header on all of their subsequent HTTP requests. - if (!this.validateSession(req, res)) { - return; - } - if (!this.validateProtocolVersion(req, res)) { - return; - } - // Handle resumability: check for Last-Event-ID header - if (this._eventStore) { - const lastEventId = req.headers['last-event-id'] as string | undefined; - if (lastEventId) { - await this.replayEvents(lastEventId, res); - return; - } - } - - // The server MUST either return Content-Type: text/event-stream in response to this HTTP GET, - // or else return HTTP 405 Method Not Allowed - const headers: Record = { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache, no-transform', - Connection: 'keep-alive' - }; - - // After initialization, always include the session ID if we have one - if (this.sessionId !== undefined) { - headers['mcp-session-id'] = this.sessionId; - } - - // Check if there's already an active standalone SSE stream for this session - if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) { - // Only one GET SSE stream is allowed per session - res.writeHead(409).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Conflict: Only one SSE stream is allowed per session' - }, - id: null - }) - ); - return; - } - - // We need to send headers immediately as messages will arrive much later, - // otherwise the client will just wait for the first message - res.writeHead(200, headers).flushHeaders(); - - // Assign the response to the standalone SSE stream - this._streamMapping.set(this._standaloneSseStreamId, res); - // Set up close handler for client disconnects - res.on('close', () => { - this._streamMapping.delete(this._standaloneSseStreamId); - }); - - // Add error handler for standalone SSE stream - res.on('error', error => { - this.onerror?.(error as Error); - }); + set onerror(handler: ((error: Error) => void) | undefined) { + this._webStandardTransport.onerror = handler; } - /** - * Replays events that would have been sent after the specified event ID - * Only used when resumability is enabled - */ - private async replayEvents(lastEventId: string, res: ServerResponse): Promise { - if (!this._eventStore) { - return; - } - try { - // If getStreamIdForEventId is available, use it for conflict checking - let streamId: string | undefined; - if (this._eventStore.getStreamIdForEventId) { - streamId = await this._eventStore.getStreamIdForEventId(lastEventId); - - if (!streamId) { - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Invalid event ID format' - }, - id: null - }) - ); - return; - } - - // Check conflict with the SAME streamId we'll use for mapping - if (this._streamMapping.get(streamId) !== undefined) { - res.writeHead(409).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Conflict: Stream already has an active connection' - }, - id: null - }) - ); - return; - } - } - - const headers: Record = { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache, no-transform', - Connection: 'keep-alive' - }; - - if (this.sessionId !== undefined) { - headers['mcp-session-id'] = this.sessionId; - } - res.writeHead(200, headers).flushHeaders(); - - // Replay events - returns the streamId for backwards compatibility - const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { - send: async (eventId: string, message: JSONRPCMessage) => { - if (!this.writeSSEEvent(res, message, eventId)) { - this.onerror?.(new Error('Failed replay events')); - res.end(); - } - } - }); - - this._streamMapping.set(replayedStreamId, res); - - // Set up close handler for client disconnects - res.on('close', () => { - this._streamMapping.delete(replayedStreamId); - }); - - // Add error handler for replay stream - res.on('error', error => { - this.onerror?.(error as Error); - }); - } catch (error) { - this.onerror?.(error as Error); - } + get onerror(): ((error: Error) => void) | undefined { + return this._webStandardTransport.onerror; } /** - * Writes an event to the SSE stream with proper formatting + * Sets callback for incoming messages. */ - private writeSSEEvent(res: ServerResponse, message: JSONRPCMessage, eventId?: string): boolean { - let eventData = `event: message\n`; - // Include event ID if provided - this is important for resumability - if (eventId) { - eventData += `id: ${eventId}\n`; - } - eventData += `data: ${JSON.stringify(message)}\n\n`; - - return res.write(eventData); + set onmessage(handler: ((message: JSONRPCMessage, extra?: MessageExtraInfo) => void) | undefined) { + this._webStandardTransport.onmessage = handler; } - /** - * Handles unsupported requests (PUT, PATCH, etc.) - */ - private async handleUnsupportedRequest(res: ServerResponse): Promise { - res.writeHead(405, { - Allow: 'GET, POST, DELETE' - }).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Method not allowed.' - }, - id: null - }) - ); + get onmessage(): ((message: JSONRPCMessage, extra?: MessageExtraInfo) => void) | undefined { + return this._webStandardTransport.onmessage; } /** - * Handles POST requests containing JSON-RPC messages + * Starts the transport. This is required by the Transport interface but is a no-op + * for the Streamable HTTP transport as connections are managed per-request. */ - private async handlePostRequest(req: IncomingMessage & { auth?: AuthInfo }, res: ServerResponse, parsedBody?: unknown): Promise { - try { - // Validate the Accept header - const acceptHeader = req.headers.accept; - // The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types. - if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) { - res.writeHead(406).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Not Acceptable: Client must accept both application/json and text/event-stream' - }, - id: null - }) - ); - return; - } - - const ct = req.headers['content-type']; - if (!ct || !ct.includes('application/json')) { - res.writeHead(415).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Unsupported Media Type: Content-Type must be application/json' - }, - id: null - }) - ); - return; - } - - const authInfo: AuthInfo | undefined = req.auth; - const requestInfo: RequestInfo = { headers: req.headers }; - - let rawMessage; - if (parsedBody !== undefined) { - rawMessage = parsedBody; - } else { - const parsedCt = contentType.parse(ct); - const body = await getRawBody(req, { - limit: MAXIMUM_MESSAGE_SIZE, - encoding: parsedCt.parameters.charset ?? 'utf-8' - }); - rawMessage = JSON.parse(body.toString()); - } - - let messages: JSONRPCMessage[]; - - // handle batch and single messages - if (Array.isArray(rawMessage)) { - messages = rawMessage.map(msg => JSONRPCMessageSchema.parse(msg)); - } else { - messages = [JSONRPCMessageSchema.parse(rawMessage)]; - } - - // Check if this is an initialization request - // https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/ - const isInitializationRequest = messages.some(isInitializeRequest); - if (isInitializationRequest) { - // If it's a server with session management and the session ID is already set we should reject the request - // to avoid re-initialization. - if (this._initialized && this.sessionId !== undefined) { - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32600, - message: 'Invalid Request: Server already initialized' - }, - id: null - }) - ); - return; - } - if (messages.length > 1) { - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32600, - message: 'Invalid Request: Only one initialization request is allowed' - }, - id: null - }) - ); - return; - } - this.sessionId = this.sessionIdGenerator?.(); - this._initialized = true; - - // If we have a session ID and an onsessioninitialized handler, call it immediately - // This is needed in cases where the server needs to keep track of multiple sessions - if (this.sessionId && this._onsessioninitialized) { - await Promise.resolve(this._onsessioninitialized(this.sessionId)); - } - } - if (!isInitializationRequest) { - // If an Mcp-Session-Id is returned by the server during initialization, - // clients using the Streamable HTTP transport MUST include it - // in the Mcp-Session-Id header on all of their subsequent HTTP requests. - if (!this.validateSession(req, res)) { - return; - } - // Mcp-Protocol-Version header is required for all requests after initialization. - if (!this.validateProtocolVersion(req, res)) { - return; - } - } - - // check if it contains requests - const hasRequests = messages.some(isJSONRPCRequest); - - if (!hasRequests) { - // if it only contains notifications or responses, return 202 - res.writeHead(202).end(); - - // handle each message - for (const message of messages) { - this.onmessage?.(message, { authInfo, requestInfo }); - } - } else if (hasRequests) { - // The default behavior is to use SSE streaming - // but in some cases server will return JSON responses - const streamId = randomUUID(); - - // Extract protocol version for priming event decision. - // For initialize requests, get from request params. - // For other requests, get from header (already validated). - const initRequest = messages.find(m => isInitializeRequest(m)); - const clientProtocolVersion = initRequest - ? initRequest.params.protocolVersion - : ((req.headers['mcp-protocol-version'] as string) ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION); - - if (!this._enableJsonResponse) { - const headers: Record = { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive' - }; - - // After initialization, always include the session ID if we have one - if (this.sessionId !== undefined) { - headers['mcp-session-id'] = this.sessionId; - } - - res.writeHead(200, headers); - - await this._maybeWritePrimingEvent(res, streamId, clientProtocolVersion); - } - // Store the response for this request to send messages back through this connection - // We need to track by request ID to maintain the connection - for (const message of messages) { - if (isJSONRPCRequest(message)) { - this._streamMapping.set(streamId, res); - this._requestToStreamMapping.set(message.id, streamId); - } - } - // Set up close handler for client disconnects - res.on('close', () => { - this._streamMapping.delete(streamId); - }); - - // Add error handler for stream write errors - res.on('error', error => { - this.onerror?.(error as Error); - }); - - // handle each message - for (const message of messages) { - // Build closeSSEStream callback for requests when eventStore is configured - // AND client supports resumability (protocol version >= 2025-11-25). - // Old clients can't resume if the stream is closed early because they - // didn't receive a priming event with an event ID. - let closeSSEStream: (() => void) | undefined; - let closeStandaloneSSEStream: (() => void) | undefined; - if (isJSONRPCRequest(message) && this._eventStore && clientProtocolVersion >= '2025-11-25') { - closeSSEStream = () => { - this.closeSSEStream(message.id); - }; - closeStandaloneSSEStream = () => { - this.closeStandaloneSSEStream(); - }; - } - - this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream }); - } - // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses - // This will be handled by the send() method when responses are ready - } - } catch (error) { - // return JSON-RPC formatted error - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32700, - message: 'Parse error', - data: String(error) - }, - id: null - }) - ); - this.onerror?.(error as Error); - } + async start(): Promise { + return this._webStandardTransport.start(); } /** - * Handles DELETE requests to terminate sessions + * Closes the transport and all active connections. */ - private async handleDeleteRequest(req: IncomingMessage, res: ServerResponse): Promise { - if (!this.validateSession(req, res)) { - return; - } - if (!this.validateProtocolVersion(req, res)) { - return; - } - await Promise.resolve(this._onsessionclosed?.(this.sessionId!)); - await this.close(); - res.writeHead(200).end(); + async close(): Promise { + return this._webStandardTransport.close(); } /** - * Validates session ID for non-initialization requests - * Returns true if the session is valid, false otherwise + * Sends a JSON-RPC message through the transport. */ - private validateSession(req: IncomingMessage, res: ServerResponse): boolean { - if (this.sessionIdGenerator === undefined) { - // If the sessionIdGenerator ID is not set, the session management is disabled - // and we don't need to validate the session ID - return true; - } - if (!this._initialized) { - // If the server has not been initialized yet, reject all requests - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Bad Request: Server not initialized' - }, - id: null - }) - ); - return false; - } - - const sessionId = req.headers['mcp-session-id']; - - if (!sessionId) { - // Non-initialization requests without a session ID should return 400 Bad Request - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Bad Request: Mcp-Session-Id header is required' - }, - id: null - }) - ); - return false; - } else if (Array.isArray(sessionId)) { - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: 'Bad Request: Mcp-Session-Id header must be a single value' - }, - id: null - }) - ); - return false; - } else if (sessionId !== this.sessionId) { - // Reject requests with invalid session ID with 404 Not Found - res.writeHead(404).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32001, - message: 'Session not found' - }, - id: null - }) - ); - return false; - } - - return true; + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { + return this._webStandardTransport.send(message, options); } /** - * Validates the MCP-Protocol-Version header on incoming requests. - * - * For initialization: Version negotiation handles unknown versions gracefully - * (server responds with its supported version). + * Handles an incoming HTTP request, whether GET or POST. * - * For subsequent requests with MCP-Protocol-Version header: - * - Accept if in supported list - * - 400 if unsupported + * This method converts Node.js HTTP objects to Web Standard Request/Response + * and delegates to the underlying WebStandardStreamableHTTPServerTransport. * - * For HTTP requests without the MCP-Protocol-Version header: - * - Accept and default to the version negotiated at initialization + * @param req - Node.js IncomingMessage, optionally with auth property from middleware + * @param res - Node.js ServerResponse + * @param parsedBody - Optional pre-parsed body from body-parser middleware */ - private validateProtocolVersion(req: IncomingMessage, res: ServerResponse): boolean { - let protocolVersion = req.headers['mcp-protocol-version']; - if (Array.isArray(protocolVersion)) { - protocolVersion = protocolVersion[protocolVersion.length - 1]; - } - - if (protocolVersion !== undefined && !SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) { - res.writeHead(400).end( - JSON.stringify({ - jsonrpc: '2.0', - error: { - code: -32000, - message: `Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${SUPPORTED_PROTOCOL_VERSIONS.join(', ')})` - }, - id: null - }) - ); - return false; - } - return true; - } - - async close(): Promise { - // Close all SSE connections - this._streamMapping.forEach(response => { - response.end(); + async handleRequest(req: IncomingMessage & { auth?: AuthInfo }, res: ServerResponse, parsedBody?: unknown): Promise { + // Store context for this request to pass through auth and parsedBody + // We need to intercept the request creation to attach this context + const authInfo = req.auth; + + // Create a custom handler that includes our context + const handler = getRequestListener(async (webRequest: Request) => { + return this._webStandardTransport.handleRequest(webRequest, { + authInfo, + parsedBody + }); }); - this._streamMapping.clear(); - // Clear any pending responses - this._requestResponseMap.clear(); - this.onclose?.(); + // Delegate to the request listener which handles all the Node.js <-> Web Standard conversion + // including proper SSE streaming support + await handler(req, res); } /** @@ -847,14 +184,7 @@ export class StreamableHTTPServerTransport implements Transport { * client will reconnect after the retry interval specified in the priming event. */ closeSSEStream(requestId: RequestId): void { - const streamId = this._requestToStreamMapping.get(requestId); - if (!streamId) return; - - const stream = this._streamMapping.get(streamId); - if (stream) { - stream.end(); - this._streamMapping.delete(streamId); - } + this._webStandardTransport.closeSSEStream(requestId); } /** @@ -862,108 +192,6 @@ export class StreamableHTTPServerTransport implements Transport { * Use this to implement polling behavior for server-initiated notifications. */ closeStandaloneSSEStream(): void { - const stream = this._streamMapping.get(this._standaloneSseStreamId); - if (stream) { - stream.end(); - this._streamMapping.delete(this._standaloneSseStreamId); - } - } - - async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { - let requestId = options?.relatedRequestId; - if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { - // If the message is a response, use the request ID from the message - requestId = message.id; - } - - // Check if this message should be sent on the standalone SSE stream (no request ID) - // Ignore notifications from tools (which have relatedRequestId set) - // Those will be sent via dedicated response SSE streams - if (requestId === undefined) { - // For standalone SSE streams, we can only send requests and notifications - if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { - throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request'); - } - - // Generate and store event ID if event store is provided - // Store even if stream is disconnected so events can be replayed on reconnect - let eventId: string | undefined; - if (this._eventStore) { - // Stores the event and gets the generated event ID - eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message); - } - - const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); - if (standaloneSse === undefined) { - // Stream is disconnected - event is stored for replay, nothing more to do - return; - } - - // Send the message to the standalone SSE stream - this.writeSSEEvent(standaloneSse, message, eventId); - return; - } - - // Get the response for this request - const streamId = this._requestToStreamMapping.get(requestId); - const response = this._streamMapping.get(streamId!); - if (!streamId) { - throw new Error(`No connection established for request ID: ${String(requestId)}`); - } - - if (!this._enableJsonResponse) { - // For SSE responses, generate event ID if event store is provided - let eventId: string | undefined; - - if (this._eventStore) { - eventId = await this._eventStore.storeEvent(streamId, message); - } - if (response) { - // Write the event to the response stream - this.writeSSEEvent(response, message, eventId); - } - } - - if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { - this._requestResponseMap.set(requestId, message); - const relatedIds = Array.from(this._requestToStreamMapping.entries()) - .filter(([_, streamId]) => this._streamMapping.get(streamId) === response) - .map(([id]) => id); - - // Check if we have responses for all requests using this connection - const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); - - if (allResponsesReady) { - if (!response) { - throw new Error(`No connection established for request ID: ${String(requestId)}`); - } - if (this._enableJsonResponse) { - // All responses ready, send as JSON - const headers: Record = { - 'Content-Type': 'application/json' - }; - if (this.sessionId !== undefined) { - headers['mcp-session-id'] = this.sessionId; - } - - const responses = relatedIds.map(id => this._requestResponseMap.get(id)!); - - response.writeHead(200, headers); - if (responses.length === 1) { - response.end(JSON.stringify(responses[0])); - } else { - response.end(JSON.stringify(responses)); - } - } else { - // End the SSE stream - response.end(); - } - // Clean up - for (const id of relatedIds) { - this._requestResponseMap.delete(id); - this._requestToStreamMapping.delete(id); - } - } - } + this._webStandardTransport.closeStandaloneSSEStream(); } } diff --git a/src/server/webStandardStreamableHttp.ts b/src/server/webStandardStreamableHttp.ts new file mode 100644 index 000000000..3ae9846c2 --- /dev/null +++ b/src/server/webStandardStreamableHttp.ts @@ -0,0 +1,997 @@ +/** + * Web Standards Streamable HTTP Server Transport + * + * This is the core transport implementation using Web Standard APIs (Request, Response, ReadableStream). + * It can run on any runtime that supports Web Standards: Node.js 18+, Cloudflare Workers, Deno, Bun, etc. + * + * For Node.js Express/HTTP compatibility, use `StreamableHTTPServerTransport` which wraps this transport. + */ + +import { Transport } from '../shared/transport.js'; +import { AuthInfo } from './auth/types.js'; +import { + MessageExtraInfo, + RequestInfo, + isInitializeRequest, + isJSONRPCErrorResponse, + isJSONRPCRequest, + isJSONRPCResultResponse, + JSONRPCMessage, + JSONRPCMessageSchema, + RequestId, + SUPPORTED_PROTOCOL_VERSIONS, + DEFAULT_NEGOTIATED_PROTOCOL_VERSION +} from '../types.js'; + +export type StreamId = string; +export type EventId = string; + +/** + * Interface for resumability support via event storage + */ +export interface EventStore { + /** + * Stores an event for later retrieval + * @param streamId ID of the stream the event belongs to + * @param message The JSON-RPC message to store + * @returns The generated event ID for the stored event + */ + storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise; + + /** + * Get the stream ID associated with a given event ID. + * @param eventId The event ID to look up + * @returns The stream ID, or undefined if not found + * + * Optional: If not provided, the SDK will use the streamId returned by + * replayEventsAfter for stream mapping. + */ + getStreamIdForEventId?(eventId: EventId): Promise; + + replayEventsAfter( + lastEventId: EventId, + { + send + }: { + send: (eventId: EventId, message: JSONRPCMessage) => Promise; + } + ): Promise; +} + +/** + * Internal stream mapping for managing SSE connections + */ +interface StreamMapping { + /** Stream controller for pushing SSE data - only used with ReadableStream approach */ + controller?: ReadableStreamDefaultController; + /** Text encoder for SSE formatting */ + encoder?: TextEncoder; + /** Promise resolver for JSON response mode */ + resolveJson?: (response: Response) => void; + /** Cleanup function to close stream and remove mapping */ + cleanup: () => void; +} + +/** + * Configuration options for WebStandardStreamableHTTPServerTransport + */ +export interface WebStandardStreamableHTTPServerTransportOptions { + /** + * Function that generates a session ID for the transport. + * The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash) + * + * If not provided, session management is disabled (stateless mode). + */ + sessionIdGenerator?: () => string; + + /** + * A callback for session initialization events + * This is called when the server initializes a new session. + * Useful in cases when you need to register multiple mcp sessions + * and need to keep track of them. + * @param sessionId The generated session ID + */ + onsessioninitialized?: (sessionId: string) => void | Promise; + + /** + * A callback for session close events + * This is called when the server closes a session due to a DELETE request. + * Useful in cases when you need to clean up resources associated with the session. + * Note that this is different from the transport closing, if you are handling + * HTTP requests from multiple nodes you might want to close each + * WebStandardStreamableHTTPServerTransport after a request is completed while still keeping the + * session open/running. + * @param sessionId The session ID that was closed + */ + onsessionclosed?: (sessionId: string) => void | Promise; + + /** + * If true, the server will return JSON responses instead of starting an SSE stream. + * This can be useful for simple request/response scenarios without streaming. + * Default is false (SSE streams are preferred). + */ + enableJsonResponse?: boolean; + + /** + * Event store for resumability support + * If provided, resumability will be enabled, allowing clients to reconnect and resume messages + */ + eventStore?: EventStore; + + /** + * List of allowed host header values for DNS rebinding protection. + * If not specified, host validation is disabled. + * @deprecated Use external middleware for host validation instead. + */ + allowedHosts?: string[]; + + /** + * List of allowed origin header values for DNS rebinding protection. + * If not specified, origin validation is disabled. + * @deprecated Use external middleware for origin validation instead. + */ + allowedOrigins?: string[]; + + /** + * Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured). + * Default is false for backwards compatibility. + * @deprecated Use external middleware for DNS rebinding protection instead. + */ + enableDnsRebindingProtection?: boolean; + + /** + * Retry interval in milliseconds to suggest to clients in SSE retry field. + * When set, the server will send a retry field in SSE priming events to control + * client reconnection timing for polling behavior. + */ + retryInterval?: number; +} + +/** + * Options for handling a request + */ +export interface HandleRequestOptions { + /** + * Pre-parsed request body. If provided, the transport will use this instead of parsing req.json(). + * Useful when using body-parser middleware that has already parsed the body. + */ + parsedBody?: unknown; + + /** + * Authentication info from middleware. If provided, will be passed to message handlers. + */ + authInfo?: AuthInfo; +} + +/** + * Server transport for Web Standards Streamable HTTP: this implements the MCP Streamable HTTP transport specification + * using Web Standard APIs (Request, Response, ReadableStream). + * + * This transport works on any runtime that supports Web Standards: Node.js 18+, Cloudflare Workers, Deno, Bun, etc. + * + * Usage example: + * + * ```typescript + * // Stateful mode - server sets the session ID + * const statefulTransport = new WebStandardStreamableHTTPServerTransport({ + * sessionIdGenerator: () => crypto.randomUUID(), + * }); + * + * // Stateless mode - explicitly set session ID to undefined + * const statelessTransport = new WebStandardStreamableHTTPServerTransport({ + * sessionIdGenerator: undefined, + * }); + * + * // Hono.js usage + * app.all('/mcp', async (c) => { + * return transport.handleRequest(c.req.raw); + * }); + * + * // Cloudflare Workers usage + * export default { + * async fetch(request: Request): Promise { + * return transport.handleRequest(request); + * } + * }; + * ``` + * + * In stateful mode: + * - Session ID is generated and included in response headers + * - Session ID is always included in initialization responses + * - Requests with invalid session IDs are rejected with 404 Not Found + * - Non-initialization requests without a session ID are rejected with 400 Bad Request + * - State is maintained in-memory (connections, message history) + * + * In stateless mode: + * - No Session ID is included in any responses + * - No session validation is performed + */ +export class WebStandardStreamableHTTPServerTransport implements Transport { + // when sessionId is not set (undefined), it means the transport is in stateless mode + private sessionIdGenerator: (() => string) | undefined; + private _started: boolean = false; + private _streamMapping: Map = new Map(); + private _requestToStreamMapping: Map = new Map(); + private _requestResponseMap: Map = new Map(); + private _initialized: boolean = false; + private _enableJsonResponse: boolean = false; + private _standaloneSseStreamId: string = '_GET_stream'; + private _eventStore?: EventStore; + private _onsessioninitialized?: (sessionId: string) => void | Promise; + private _onsessionclosed?: (sessionId: string) => void | Promise; + private _allowedHosts?: string[]; + private _allowedOrigins?: string[]; + private _enableDnsRebindingProtection: boolean; + private _retryInterval?: number; + + sessionId?: string; + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; + + constructor(options: WebStandardStreamableHTTPServerTransportOptions = {}) { + this.sessionIdGenerator = options.sessionIdGenerator; + this._enableJsonResponse = options.enableJsonResponse ?? false; + this._eventStore = options.eventStore; + this._onsessioninitialized = options.onsessioninitialized; + this._onsessionclosed = options.onsessionclosed; + this._allowedHosts = options.allowedHosts; + this._allowedOrigins = options.allowedOrigins; + this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; + this._retryInterval = options.retryInterval; + } + + /** + * Starts the transport. This is required by the Transport interface but is a no-op + * for the Streamable HTTP transport as connections are managed per-request. + */ + async start(): Promise { + if (this._started) { + throw new Error('Transport already started'); + } + this._started = true; + } + + /** + * Helper to create a JSON error response + */ + private createJsonErrorResponse( + status: number, + code: number, + message: string, + options?: { headers?: Record; data?: string } + ): Response { + const error: { code: number; message: string; data?: string } = { code, message }; + if (options?.data !== undefined) { + error.data = options.data; + } + return new Response( + JSON.stringify({ + jsonrpc: '2.0', + error, + id: null + }), + { + status, + headers: { + 'Content-Type': 'application/json', + ...options?.headers + } + } + ); + } + + /** + * Validates request headers for DNS rebinding protection. + * @returns Error response if validation fails, undefined if validation passes. + */ + private validateRequestHeaders(req: Request): Response | undefined { + // Skip validation if protection is not enabled + if (!this._enableDnsRebindingProtection) { + return undefined; + } + + // Validate Host header if allowedHosts is configured + if (this._allowedHosts && this._allowedHosts.length > 0) { + const hostHeader = req.headers.get('host'); + if (!hostHeader || !this._allowedHosts.includes(hostHeader)) { + const error = `Invalid Host header: ${hostHeader}`; + this.onerror?.(new Error(error)); + return this.createJsonErrorResponse(403, -32000, error); + } + } + + // Validate Origin header if allowedOrigins is configured + if (this._allowedOrigins && this._allowedOrigins.length > 0) { + const originHeader = req.headers.get('origin'); + if (originHeader && !this._allowedOrigins.includes(originHeader)) { + const error = `Invalid Origin header: ${originHeader}`; + this.onerror?.(new Error(error)); + return this.createJsonErrorResponse(403, -32000, error); + } + } + + return undefined; + } + + /** + * Handles an incoming HTTP request, whether GET, POST, or DELETE + * Returns a Response object (Web Standard) + */ + async handleRequest(req: Request, options?: HandleRequestOptions): Promise { + // Validate request headers for DNS rebinding protection + const validationError = this.validateRequestHeaders(req); + if (validationError) { + return validationError; + } + + switch (req.method) { + case 'POST': + return this.handlePostRequest(req, options); + case 'GET': + return this.handleGetRequest(req); + case 'DELETE': + return this.handleDeleteRequest(req); + default: + return this.handleUnsupportedRequest(); + } + } + + /** + * Writes a priming event to establish resumption capability. + * Only sends if eventStore is configured (opt-in for resumability) and + * the client's protocol version supports empty SSE data (>= 2025-11-25). + */ + private async writePrimingEvent( + controller: ReadableStreamDefaultController, + encoder: TextEncoder, + streamId: string, + protocolVersion: string + ): Promise { + if (!this._eventStore) { + return; + } + + // Priming events have empty data which older clients cannot handle. + // Only send priming events to clients with protocol version >= 2025-11-25 + // which includes the fix for handling empty SSE data. + if (protocolVersion < '2025-11-25') { + return; + } + + const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage); + + let primingEvent = `id: ${primingEventId}\ndata: \n\n`; + if (this._retryInterval !== undefined) { + primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`; + } + controller.enqueue(encoder.encode(primingEvent)); + } + + /** + * Handles GET requests for SSE stream + */ + private async handleGetRequest(req: Request): Promise { + // The client MUST include an Accept header, listing text/event-stream as a supported content type. + const acceptHeader = req.headers.get('accept'); + if (!acceptHeader?.includes('text/event-stream')) { + return this.createJsonErrorResponse(406, -32000, 'Not Acceptable: Client must accept text/event-stream'); + } + + // If an Mcp-Session-Id is returned by the server during initialization, + // clients using the Streamable HTTP transport MUST include it + // in the Mcp-Session-Id header on all of their subsequent HTTP requests. + const sessionError = this.validateSession(req); + if (sessionError) { + return sessionError; + } + const protocolError = this.validateProtocolVersion(req); + if (protocolError) { + return protocolError; + } + + // Handle resumability: check for Last-Event-ID header + if (this._eventStore) { + const lastEventId = req.headers.get('last-event-id'); + if (lastEventId) { + return this.replayEvents(lastEventId); + } + } + + // Check if there's already an active standalone SSE stream for this session + if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) { + // Only one GET SSE stream is allowed per session + return this.createJsonErrorResponse(409, -32000, 'Conflict: Only one SSE stream is allowed per session'); + } + + const encoder = new TextEncoder(); + let streamController: ReadableStreamDefaultController; + + // Create a ReadableStream with a controller we can use to push SSE events + const readable = new ReadableStream({ + start: controller => { + streamController = controller; + }, + cancel: () => { + // Stream was cancelled by client + this._streamMapping.delete(this._standaloneSseStreamId); + } + }); + + const headers: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive' + }; + + // After initialization, always include the session ID if we have one + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // Store the stream mapping with the controller for pushing data + this._streamMapping.set(this._standaloneSseStreamId, { + controller: streamController!, + encoder, + cleanup: () => { + this._streamMapping.delete(this._standaloneSseStreamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + }); + + return new Response(readable, { headers }); + } + + /** + * Replays events that would have been sent after the specified event ID + * Only used when resumability is enabled + */ + private async replayEvents(lastEventId: string): Promise { + if (!this._eventStore) { + return this.createJsonErrorResponse(400, -32000, 'Event store not configured'); + } + + try { + // If getStreamIdForEventId is available, use it for conflict checking + let streamId: string | undefined; + if (this._eventStore.getStreamIdForEventId) { + streamId = await this._eventStore.getStreamIdForEventId(lastEventId); + + if (!streamId) { + return this.createJsonErrorResponse(400, -32000, 'Invalid event ID format'); + } + + // Check conflict with the SAME streamId we'll use for mapping + if (this._streamMapping.get(streamId) !== undefined) { + return this.createJsonErrorResponse(409, -32000, 'Conflict: Stream already has an active connection'); + } + } + + const headers: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive' + }; + + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // Create a ReadableStream with controller for SSE + const encoder = new TextEncoder(); + let streamController: ReadableStreamDefaultController; + + const readable = new ReadableStream({ + start: controller => { + streamController = controller; + }, + cancel: () => { + // Stream was cancelled by client + // Cleanup will be handled by the mapping + } + }); + + // Replay events - returns the streamId for backwards compatibility + const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { + send: async (eventId: string, message: JSONRPCMessage) => { + const success = this.writeSSEEvent(streamController!, encoder, message, eventId); + if (!success) { + this.onerror?.(new Error('Failed replay events')); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + } + }); + + this._streamMapping.set(replayedStreamId, { + controller: streamController!, + encoder, + cleanup: () => { + this._streamMapping.delete(replayedStreamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + }); + + return new Response(readable, { headers }); + } catch (error) { + this.onerror?.(error as Error); + return this.createJsonErrorResponse(500, -32000, 'Error replaying events'); + } + } + + /** + * Writes an event to an SSE stream via controller with proper formatting + */ + private writeSSEEvent( + controller: ReadableStreamDefaultController, + encoder: TextEncoder, + message: JSONRPCMessage, + eventId?: string + ): boolean { + try { + let eventData = `event: message\n`; + // Include event ID if provided - this is important for resumability + if (eventId) { + eventData += `id: ${eventId}\n`; + } + eventData += `data: ${JSON.stringify(message)}\n\n`; + controller.enqueue(encoder.encode(eventData)); + return true; + } catch { + return false; + } + } + + /** + * Handles unsupported requests (PUT, PATCH, etc.) + */ + private handleUnsupportedRequest(): Response { + return new Response( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Method not allowed.' + }, + id: null + }), + { + status: 405, + headers: { + Allow: 'GET, POST, DELETE', + 'Content-Type': 'application/json' + } + } + ); + } + + /** + * Handles POST requests containing JSON-RPC messages + */ + private async handlePostRequest(req: Request, options?: HandleRequestOptions): Promise { + try { + // Validate the Accept header + const acceptHeader = req.headers.get('accept'); + // The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types. + if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) { + return this.createJsonErrorResponse( + 406, + -32000, + 'Not Acceptable: Client must accept both application/json and text/event-stream' + ); + } + + const ct = req.headers.get('content-type'); + if (!ct || !ct.includes('application/json')) { + return this.createJsonErrorResponse(415, -32000, 'Unsupported Media Type: Content-Type must be application/json'); + } + + // Build request info from headers + const requestInfo: RequestInfo = { + headers: Object.fromEntries(req.headers.entries()) + }; + + let rawMessage; + if (options?.parsedBody !== undefined) { + rawMessage = options.parsedBody; + } else { + try { + rawMessage = await req.json(); + } catch { + return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON'); + } + } + + let messages: JSONRPCMessage[]; + + // handle batch and single messages + try { + if (Array.isArray(rawMessage)) { + messages = rawMessage.map(msg => JSONRPCMessageSchema.parse(msg)); + } else { + messages = [JSONRPCMessageSchema.parse(rawMessage)]; + } + } catch { + return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON-RPC message'); + } + + // Check if this is an initialization request + // https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/ + const isInitializationRequest = messages.some(isInitializeRequest); + if (isInitializationRequest) { + // If it's a server with session management and the session ID is already set we should reject the request + // to avoid re-initialization. + if (this._initialized && this.sessionId !== undefined) { + return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Server already initialized'); + } + if (messages.length > 1) { + return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Only one initialization request is allowed'); + } + this.sessionId = this.sessionIdGenerator?.(); + this._initialized = true; + + // If we have a session ID and an onsessioninitialized handler, call it immediately + // This is needed in cases where the server needs to keep track of multiple sessions + if (this.sessionId && this._onsessioninitialized) { + await Promise.resolve(this._onsessioninitialized(this.sessionId)); + } + } + if (!isInitializationRequest) { + // If an Mcp-Session-Id is returned by the server during initialization, + // clients using the Streamable HTTP transport MUST include it + // in the Mcp-Session-Id header on all of their subsequent HTTP requests. + const sessionError = this.validateSession(req); + if (sessionError) { + return sessionError; + } + // Mcp-Protocol-Version header is required for all requests after initialization. + const protocolError = this.validateProtocolVersion(req); + if (protocolError) { + return protocolError; + } + } + + // check if it contains requests + const hasRequests = messages.some(isJSONRPCRequest); + + if (!hasRequests) { + // if it only contains notifications or responses, return 202 + for (const message of messages) { + this.onmessage?.(message, { authInfo: options?.authInfo, requestInfo }); + } + return new Response(null, { status: 202 }); + } + + // The default behavior is to use SSE streaming + // but in some cases server will return JSON responses + const streamId = crypto.randomUUID(); + + // Extract protocol version for priming event decision. + // For initialize requests, get from request params. + // For other requests, get from header (already validated). + const initRequest = messages.find(m => isInitializeRequest(m)); + const clientProtocolVersion = initRequest + ? initRequest.params.protocolVersion + : (req.headers.get('mcp-protocol-version') ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION); + + if (this._enableJsonResponse) { + // For JSON response mode, return a Promise that resolves when all responses are ready + return new Promise(resolve => { + this._streamMapping.set(streamId, { + resolveJson: resolve, + cleanup: () => { + this._streamMapping.delete(streamId); + } + }); + + for (const message of messages) { + if (isJSONRPCRequest(message)) { + this._requestToStreamMapping.set(message.id, streamId); + } + } + + for (const message of messages) { + this.onmessage?.(message, { authInfo: options?.authInfo, requestInfo }); + } + }); + } + + // SSE streaming mode - use ReadableStream with controller for more reliable data pushing + const encoder = new TextEncoder(); + let streamController: ReadableStreamDefaultController; + + const readable = new ReadableStream({ + start: controller => { + streamController = controller; + }, + cancel: () => { + // Stream was cancelled by client + this._streamMapping.delete(streamId); + } + }); + + const headers: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive' + }; + + // After initialization, always include the session ID if we have one + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // Store the response for this request to send messages back through this connection + // We need to track by request ID to maintain the connection + for (const message of messages) { + if (isJSONRPCRequest(message)) { + this._streamMapping.set(streamId, { + controller: streamController!, + encoder, + cleanup: () => { + this._streamMapping.delete(streamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + }); + this._requestToStreamMapping.set(message.id, streamId); + } + } + + // Write priming event if event store is configured (after mapping is set up) + await this.writePrimingEvent(streamController!, encoder, streamId, clientProtocolVersion); + + // handle each message + for (const message of messages) { + // Build closeSSEStream callback for requests when eventStore is configured + // AND client supports resumability (protocol version >= 2025-11-25). + // Old clients can't resume if the stream is closed early because they + // didn't receive a priming event with an event ID. + let closeSSEStream: (() => void) | undefined; + let closeStandaloneSSEStream: (() => void) | undefined; + if (isJSONRPCRequest(message) && this._eventStore && clientProtocolVersion >= '2025-11-25') { + closeSSEStream = () => { + this.closeSSEStream(message.id); + }; + closeStandaloneSSEStream = () => { + this.closeStandaloneSSEStream(); + }; + } + + this.onmessage?.(message, { authInfo: options?.authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream }); + } + // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses + // This will be handled by the send() method when responses are ready + + return new Response(readable, { status: 200, headers }); + } catch (error) { + // return JSON-RPC formatted error + this.onerror?.(error as Error); + return this.createJsonErrorResponse(400, -32700, 'Parse error', { data: String(error) }); + } + } + + /** + * Handles DELETE requests to terminate sessions + */ + private async handleDeleteRequest(req: Request): Promise { + const sessionError = this.validateSession(req); + if (sessionError) { + return sessionError; + } + const protocolError = this.validateProtocolVersion(req); + if (protocolError) { + return protocolError; + } + + await Promise.resolve(this._onsessionclosed?.(this.sessionId!)); + await this.close(); + return new Response(null, { status: 200 }); + } + + /** + * Validates session ID for non-initialization requests. + * Returns Response error if invalid, undefined otherwise + */ + private validateSession(req: Request): Response | undefined { + if (this.sessionIdGenerator === undefined) { + // If the sessionIdGenerator ID is not set, the session management is disabled + // and we don't need to validate the session ID + return undefined; + } + if (!this._initialized) { + // If the server has not been initialized yet, reject all requests + return this.createJsonErrorResponse(400, -32000, 'Bad Request: Server not initialized'); + } + + const sessionId = req.headers.get('mcp-session-id'); + + if (!sessionId) { + // Non-initialization requests without a session ID should return 400 Bad Request + return this.createJsonErrorResponse(400, -32000, 'Bad Request: Mcp-Session-Id header is required'); + } + + if (sessionId !== this.sessionId) { + // Reject requests with invalid session ID with 404 Not Found + return this.createJsonErrorResponse(404, -32001, 'Session not found'); + } + + return undefined; + } + + /** + * Validates the MCP-Protocol-Version header on incoming requests. + * + * For initialization: Version negotiation handles unknown versions gracefully + * (server responds with its supported version). + * + * For subsequent requests with MCP-Protocol-Version header: + * - Accept if in supported list + * - 400 if unsupported + * + * For HTTP requests without the MCP-Protocol-Version header: + * - Accept and default to the version negotiated at initialization + */ + private validateProtocolVersion(req: Request): Response | undefined { + const protocolVersion = req.headers.get('mcp-protocol-version'); + + if (protocolVersion !== null && !SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) { + return this.createJsonErrorResponse( + 400, + -32000, + `Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${SUPPORTED_PROTOCOL_VERSIONS.join(', ')})` + ); + } + return undefined; + } + + async close(): Promise { + // Close all SSE connections + this._streamMapping.forEach(({ cleanup }) => { + cleanup(); + }); + this._streamMapping.clear(); + + // Clear any pending responses + this._requestResponseMap.clear(); + this.onclose?.(); + } + + /** + * Close an SSE stream for a specific request, triggering client reconnection. + * Use this to implement polling behavior during long-running operations - + * client will reconnect after the retry interval specified in the priming event. + */ + closeSSEStream(requestId: RequestId): void { + const streamId = this._requestToStreamMapping.get(requestId); + if (!streamId) return; + + const stream = this._streamMapping.get(streamId); + if (stream) { + stream.cleanup(); + } + } + + /** + * Close the standalone GET SSE stream, triggering client reconnection. + * Use this to implement polling behavior for server-initiated notifications. + */ + closeStandaloneSSEStream(): void { + const stream = this._streamMapping.get(this._standaloneSseStreamId); + if (stream) { + stream.cleanup(); + } + } + + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { + let requestId = options?.relatedRequestId; + if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { + // If the message is a response, use the request ID from the message + requestId = message.id; + } + + // Check if this message should be sent on the standalone SSE stream (no request ID) + // Ignore notifications from tools (which have relatedRequestId set) + // Those will be sent via dedicated response SSE streams + if (requestId === undefined) { + // For standalone SSE streams, we can only send requests and notifications + if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { + throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request'); + } + + // Generate and store event ID if event store is provided + // Store even if stream is disconnected so events can be replayed on reconnect + let eventId: string | undefined; + if (this._eventStore) { + // Stores the event and gets the generated event ID + eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message); + } + + const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); + if (standaloneSse === undefined) { + // Stream is disconnected - event is stored for replay, nothing more to do + return; + } + + // Send the message to the standalone SSE stream + if (standaloneSse.controller && standaloneSse.encoder) { + this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId); + } + return; + } + + // Get the response for this request + const streamId = this._requestToStreamMapping.get(requestId); + if (!streamId) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } + + const stream = this._streamMapping.get(streamId); + + if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { + // For SSE responses, generate event ID if event store is provided + let eventId: string | undefined; + + if (this._eventStore) { + eventId = await this._eventStore.storeEvent(streamId, message); + } + // Write the event to the response stream + this.writeSSEEvent(stream.controller, stream.encoder, message, eventId); + } + + if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { + this._requestResponseMap.set(requestId, message); + const relatedIds = Array.from(this._requestToStreamMapping.entries()) + .filter(([_, sid]) => sid === streamId) + .map(([id]) => id); + + // Check if we have responses for all requests using this connection + const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); + + if (allResponsesReady) { + if (!stream) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } + if (this._enableJsonResponse && stream.resolveJson) { + // All responses ready, send as JSON + const headers: Record = { + 'Content-Type': 'application/json' + }; + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + const responses = relatedIds.map(id => this._requestResponseMap.get(id)!); + + if (responses.length === 1) { + stream.resolveJson(new Response(JSON.stringify(responses[0]), { status: 200, headers })); + } else { + stream.resolveJson(new Response(JSON.stringify(responses), { status: 200, headers })); + } + } else { + // End the SSE stream + stream.cleanup(); + } + // Clean up + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._requestToStreamMapping.delete(id); + } + } + } + } +} diff --git a/test/server/streamableHttp.test.ts b/test/server/streamableHttp.test.ts index 0161d82fb..36a12ca9c 100644 --- a/test/server/streamableHttp.test.ts +++ b/test/server/streamableHttp.test.ts @@ -119,7 +119,12 @@ async function sendPostRequest( }); } -function expectErrorResponse(data: unknown, expectedCode: number, expectedMessagePattern: RegExp): void { +function expectErrorResponse( + data: unknown, + expectedCode: number, + expectedMessagePattern: RegExp, + options?: { expectData?: boolean } +): void { expect(data).toMatchObject({ jsonrpc: '2.0', error: expect.objectContaining({ @@ -127,6 +132,9 @@ function expectErrorResponse(data: unknown, expectedCode: number, expectedMessag message: expect.stringMatching(expectedMessagePattern) }) }); + if (options?.expectData) { + expect((data as { error: { data?: string } }).error.data).toBeDefined(); + } } describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { /** @@ -679,6 +687,28 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { expectErrorResponse(errorData, -32700, /Parse error/); }); + it('should include error data in parse error response for unexpected errors', async () => { + sessionId = await initializeServer(); + + // We can't easily trigger the catch-all error handler, but we can verify + // that the JSON parse error includes useful information + const response = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-session-id': sessionId + }, + body: '{ invalid json }' + }); + + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32700, /Parse error/); + // The error message should contain details about what went wrong + expect(errorData.error.message).toContain('Invalid JSON'); + }); + it('should return 400 error for invalid JSON-RPC messages', async () => { sessionId = await initializeServer(); @@ -1309,9 +1339,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { baseUrl = result.baseUrl; mcpServer = result.mcpServer; - // Verify resumability is enabled on the transport - expect(transport['_eventStore']).toBeDefined(); - // Initialize the server const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); sessionId = initResponse.headers.get('mcp-session-id') as string;