@@ -106,7 +106,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
106
106
private final AtomicReference <Consumer <Throwable >> exceptionHandler = new AtomicReference <>();
107
107
108
108
private WebClientStreamableHttpTransport (McpJsonMapper jsonMapper , WebClient .Builder webClientBuilder ,
109
- String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
109
+ String endpoint , boolean resumableStreams , boolean openConnectionOnStartup ) {
110
110
this .jsonMapper = jsonMapper ;
111
111
this .webClient = webClientBuilder .build ();
112
112
this .endpoint = endpoint ;
@@ -146,16 +146,16 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
146
146
private DefaultMcpTransportSession createTransportSession () {
147
147
Function <String , Publisher <Void >> onClose = sessionId -> sessionId == null ? Mono .empty ()
148
148
: webClient .delete ()
149
- .uri (this .endpoint )
150
- .header (HttpHeaders .MCP_SESSION_ID , sessionId )
151
- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
152
- .retrieve ()
153
- .toBodilessEntity ()
154
- .onErrorComplete (e -> {
155
- logger .warn ("Got error when closing transport" , e );
156
- return true ;
157
- })
158
- .then ();
149
+ .uri (this .endpoint )
150
+ .header (HttpHeaders .MCP_SESSION_ID , sessionId )
151
+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
152
+ .retrieve ()
153
+ .toBodilessEntity ()
154
+ .onErrorComplete (e -> {
155
+ logger .warn ("Got error when closing transport" , e );
156
+ return true ;
157
+ })
158
+ .then ();
159
159
return new DefaultMcpTransportSession (onClose );
160
160
}
161
161
@@ -206,52 +206,52 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
206
206
final McpTransportSession <Disposable > transportSession = this .activeSession .get ();
207
207
208
208
Disposable connection = webClient .get ()
209
- .uri (this .endpoint )
210
- .accept (MediaType .TEXT_EVENT_STREAM )
211
- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
212
- .headers (httpHeaders -> {
213
- transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
214
- if (stream != null ) {
215
- stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders .LAST_EVENT_ID , id ));
216
- }
217
- })
218
- .exchangeToFlux (response -> {
219
- if (isEventStream (response )) {
220
- logger .debug ("Established SSE stream via GET" );
221
- return eventStream (stream , response );
222
- }
223
- else if (isNotAllowed (response )) {
224
- logger .debug ("The server does not support SSE streams, using request-response mode." );
225
- return Flux .empty ();
226
- }
227
- else if (isNotFound (response )) {
228
- if (transportSession .sessionId ().isPresent ()) {
229
- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
230
- return mcpSessionNotFoundError (sessionIdRepresentation );
231
- }
232
- else {
233
- return this .extractError (response , MISSING_SESSION_ID );
234
- }
209
+ .uri (this .endpoint )
210
+ .accept (MediaType .TEXT_EVENT_STREAM )
211
+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
212
+ .headers (httpHeaders -> {
213
+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
214
+ if (stream != null ) {
215
+ stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders .LAST_EVENT_ID , id ));
216
+ }
217
+ })
218
+ .exchangeToFlux (response -> {
219
+ if (isEventStream (response )) {
220
+ logger .debug ("Established SSE stream via GET" );
221
+ return eventStream (stream , response );
222
+ }
223
+ else if (isNotAllowed (response )) {
224
+ logger .debug ("The server does not support SSE streams, using request-response mode." );
225
+ return Flux .empty ();
226
+ }
227
+ else if (isNotFound (response )) {
228
+ if (transportSession .sessionId ().isPresent ()) {
229
+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
230
+ return mcpSessionNotFoundError (sessionIdRepresentation );
235
231
}
236
232
else {
237
- return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
238
- logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
239
- }).flux ();
233
+ return this .extractError (response , MISSING_SESSION_ID );
240
234
}
241
- })
242
- .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
243
- .onErrorComplete (t -> {
244
- this .handleException (t );
245
- return true ;
246
- })
247
- .doFinally (s -> {
248
- Disposable ref = disposableRef .getAndSet (null );
249
- if (ref != null ) {
250
- transportSession .removeConnection (ref );
251
- }
252
- })
253
- .contextWrite (ctx )
254
- .subscribe ();
235
+ }
236
+ else {
237
+ return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
238
+ logger .info ("Opening an SSE stream failed. This can be safely ignored." , e );
239
+ }).flux ();
240
+ }
241
+ })
242
+ .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
243
+ .onErrorComplete (t -> {
244
+ this .handleException (t );
245
+ return true ;
246
+ })
247
+ .doFinally (s -> {
248
+ Disposable ref = disposableRef .getAndSet (null );
249
+ if (ref != null ) {
250
+ transportSession .removeConnection (ref );
251
+ }
252
+ })
253
+ .contextWrite (ctx )
254
+ .subscribe ();
255
255
256
256
disposableRef .set (connection );
257
257
transportSession .addConnection (connection );
@@ -272,83 +272,83 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
272
272
final McpTransportSession <Disposable > transportSession = this .activeSession .get ();
273
273
274
274
Disposable connection = webClient .post ()
275
- .uri (this .endpoint )
276
- .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
277
- .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
278
- .headers (httpHeaders -> {
279
- transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
280
- })
281
- .bodyValue (message )
282
- .exchangeToFlux (response -> {
283
- String mcpSessionId = response .headers ().asHttpHeaders ().getFirst (HttpHeaders .MCP_SESSION_ID );
284
- if (StringUtils .hasText (mcpSessionId ) && transportSession .markInitialized (mcpSessionId )) {
285
- // Once we have a session, we try to open an async stream for
286
- // the server to send notifications and requests out-of-band.
287
- reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
275
+ .uri (this .endpoint )
276
+ .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
277
+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
278
+ .headers (httpHeaders -> {
279
+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders .MCP_SESSION_ID , id ));
280
+ })
281
+ .bodyValue (message )
282
+ .exchangeToFlux (response -> {
283
+ String mcpSessionId = response .headers ().asHttpHeaders ().getFirst (HttpHeaders .MCP_SESSION_ID );
284
+ if (StringUtils .hasText (mcpSessionId ) && transportSession .markInitialized (mcpSessionId )) {
285
+ // Once we have a session, we try to open an async stream for
286
+ // the server to send notifications and requests out-of-band.
287
+ reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
288
+ }
289
+
290
+ String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
291
+
292
+ // The spec mentions only ACCEPTED, but the existing SDKs can return
293
+ // 200 OK for notifications
294
+ if (response .statusCode ().is2xxSuccessful ()) {
295
+ Optional <MediaType > contentType = response .headers ().contentType ();
296
+ // Existing SDKs consume notifications with no response body nor
297
+ // content type
298
+ if (contentType .isEmpty ()) {
299
+ logger .trace ("Message was successfully sent via POST for session {}" ,
300
+ sessionRepresentation );
301
+ // signal the caller that the message was successfully
302
+ // delivered
303
+ sink .success ();
304
+ // communicate to downstream there is no streamed data coming
305
+ return Flux .empty ();
288
306
}
289
-
290
- String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
291
-
292
- // The spec mentions only ACCEPTED, but the existing SDKs can return
293
- // 200 OK for notifications
294
- if (response .statusCode ().is2xxSuccessful ()) {
295
- Optional <MediaType > contentType = response .headers ().contentType ();
296
- // Existing SDKs consume notifications with no response body nor
297
- // content type
298
- if (contentType .isEmpty ()) {
299
- logger .trace ("Message was successfully sent via POST for session {}" ,
300
- sessionRepresentation );
301
- // signal the caller that the message was successfully
302
- // delivered
307
+ else {
308
+ MediaType mediaType = contentType .get ();
309
+ if (mediaType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
310
+ logger .debug ("Established SSE stream via POST" );
311
+ // communicate to caller that the message was delivered
303
312
sink .success ();
304
- // communicate to downstream there is no streamed data coming
305
- return Flux . empty ( );
313
+ // starting a stream
314
+ return newEventStream ( response , sessionRepresentation );
306
315
}
307
- else {
308
- MediaType mediaType = contentType .get ();
309
- if (mediaType .isCompatibleWith (MediaType .TEXT_EVENT_STREAM )) {
310
- logger .debug ("Established SSE stream via POST" );
311
- // communicate to caller that the message was delivered
312
- sink .success ();
313
- // starting a stream
314
- return newEventStream (response , sessionRepresentation );
315
- }
316
- else if (mediaType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
317
- logger .trace ("Received response to POST for session {}" , sessionRepresentation );
318
- // communicate to caller the message was delivered
319
- sink .success ();
320
- return directResponseFlux (message , response );
321
- }
322
- else {
323
- logger .warn ("Unknown media type {} returned for POST in session {}" , contentType ,
324
- sessionRepresentation );
325
- return Flux .error (new RuntimeException ("Unknown media type returned: " + contentType ));
326
- }
316
+ else if (mediaType .isCompatibleWith (MediaType .APPLICATION_JSON )) {
317
+ logger .trace ("Received response to POST for session {}" , sessionRepresentation );
318
+ // communicate to caller the message was delivered
319
+ sink .success ();
320
+ return directResponseFlux (message , response );
327
321
}
328
- }
329
- else {
330
- if ( isNotFound ( response ) && ! sessionRepresentation . equals ( MISSING_SESSION_ID )) {
331
- return mcpSessionNotFoundError ( sessionRepresentation );
322
+ else {
323
+ logger . warn ( "Unknown media type {} returned for POST in session {}" , contentType ,
324
+ sessionRepresentation );
325
+ return Flux . error ( new RuntimeException ( "Unknown media type returned: " + contentType ) );
332
326
}
333
- return this .extractError (response , sessionRepresentation );
334
327
}
335
- })
336
- .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
337
- .onErrorComplete (t -> {
338
- // handle the error first
339
- this .handleException (t );
340
- // inform the caller of sendMessage
341
- sink .error (t );
342
- return true ;
343
- })
344
- .doFinally (s -> {
345
- Disposable ref = disposableRef .getAndSet (null );
346
- if (ref != null ) {
347
- transportSession .removeConnection (ref );
328
+ }
329
+ else {
330
+ if (isNotFound (response ) && !sessionRepresentation .equals (MISSING_SESSION_ID )) {
331
+ return mcpSessionNotFoundError (sessionRepresentation );
348
332
}
349
- })
350
- .contextWrite (sink .contextView ())
351
- .subscribe ();
333
+ return this .extractError (response , sessionRepresentation );
334
+ }
335
+ })
336
+ .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
337
+ .onErrorComplete (t -> {
338
+ // handle the error first
339
+ this .handleException (t );
340
+ // inform the caller of sendMessage
341
+ sink .error (t );
342
+ return true ;
343
+ })
344
+ .doFinally (s -> {
345
+ Disposable ref = disposableRef .getAndSet (null );
346
+ if (ref != null ) {
347
+ transportSession .removeConnection (ref );
348
+ }
349
+ })
350
+ .contextWrite (sink .contextView ())
351
+ .subscribe ();
352
352
disposableRef .set (connection );
353
353
transportSession .addConnection (connection );
354
354
});
@@ -419,7 +419,7 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
419
419
}
420
420
421
421
private Flux <McpSchema .JSONRPCMessage > directResponseFlux (McpSchema .JSONRPCMessage sentMessage ,
422
- ClientResponse response ) {
422
+ ClientResponse response ) {
423
423
return response .bodyToMono (String .class ).<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
424
424
try {
425
425
if (sentMessage instanceof McpSchema .JSONRPCNotification && Utils .hasText (responseMessage )) {
0 commit comments