Skip to content

Commit 02b8c4b

Browse files
authored
test: Add additional MCP transport context integration tests (#529)
- Add integration tests for transport context propagation between MCP clients and servers - Test both sync and async server implementations across all transport types (stateless, streamable, SSE) - Cover Spring WebFlux and WebMVC environments with dedicated test suites - Validate context flow through HTTP headers for authentication, correlation IDs, and metadata - Rename existing McpTransportContextIntegrationTests to SyncServerMcpTransportContextIntegrationTests for clarity - Add proper resource cleanup for async clients in teardown methods Signed-off-by: Christian Tzolov <[email protected]>
1 parent a5fd1eb commit 02b8c4b

File tree

6 files changed

+1144
-2
lines changed

6 files changed

+1144
-2
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.common;
6+
7+
import java.util.Map;
8+
import java.util.function.BiFunction;
9+
10+
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import io.modelcontextprotocol.client.McpAsyncClient;
12+
import io.modelcontextprotocol.client.McpClient;
13+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
14+
import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;
15+
import io.modelcontextprotocol.server.McpAsyncServerExchange;
16+
import io.modelcontextprotocol.server.McpServer;
17+
import io.modelcontextprotocol.server.McpServerFeatures;
18+
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
19+
import io.modelcontextprotocol.server.McpTransportContextExtractor;
20+
import io.modelcontextprotocol.server.TestUtil;
21+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
22+
import io.modelcontextprotocol.server.transport.WebFluxStatelessServerTransport;
23+
import io.modelcontextprotocol.server.transport.WebFluxStreamableServerTransportProvider;
24+
import io.modelcontextprotocol.spec.McpSchema;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.Timeout;
28+
import reactor.core.publisher.Mono;
29+
import reactor.netty.DisposableServer;
30+
import reactor.netty.http.server.HttpServer;
31+
import reactor.test.StepVerifier;
32+
33+
import org.springframework.http.server.reactive.HttpHandler;
34+
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
35+
import org.springframework.web.reactive.function.client.ClientRequest;
36+
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
37+
import org.springframework.web.reactive.function.client.WebClient;
38+
import org.springframework.web.reactive.function.server.RouterFunction;
39+
import org.springframework.web.reactive.function.server.RouterFunctions;
40+
import org.springframework.web.reactive.function.server.ServerRequest;
41+
42+
import static org.assertj.core.api.Assertions.assertThat;
43+
44+
/**
45+
* Integration tests for {@link McpTransportContext} propagation between MCP clients and
46+
* async servers using Spring WebFlux infrastructure.
47+
*
48+
* <p>
49+
* This test class validates the end-to-end flow of transport context propagation in MCP
50+
* communication for asynchronous client and server implementations. It tests various
51+
* combinations of client types and server transport mechanisms (stateless, streamable,
52+
* SSE) to ensure proper context handling across different configurations.
53+
*
54+
* <h2>Context Propagation Flow</h2>
55+
* <ol>
56+
* <li>Client sets a value in its transport context via thread-local Reactor context</li>
57+
* <li>Client-side context provider extracts the value and adds it as an HTTP header to
58+
* the request</li>
59+
* <li>Server-side context extractor reads the header from the incoming request</li>
60+
* <li>Server handler receives the extracted context and returns the value as the tool
61+
* call result</li>
62+
* <li>Test verifies the round-trip context propagation was successful</li>
63+
* </ol>
64+
*
65+
* @author Daniel Garnier-Moiroux
66+
* @author Christian Tzolov
67+
*/
68+
@Timeout(15)
69+
public class AsyncServerMcpTransportContextIntegrationTests {
70+
71+
private static final int PORT = TestUtil.findAvailablePort();
72+
73+
private static final String HEADER_NAME = "x-test";
74+
75+
// Async client context provider
76+
ExchangeFilterFunction asyncClientContextProvider = (request, next) -> Mono.deferContextual(ctx -> {
77+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
78+
// // do stuff with the context
79+
var headerValue = transportContext.get("client-side-header-value");
80+
if (headerValue == null) {
81+
return next.exchange(request);
82+
}
83+
var reqWithHeader = ClientRequest.from(request).header(HEADER_NAME, headerValue.toString()).build();
84+
return next.exchange(reqWithHeader);
85+
});
86+
87+
// Tools
88+
private final McpSchema.Tool tool = McpSchema.Tool.builder()
89+
.name("test-tool")
90+
.description("return the value of the x-test header from call tool request")
91+
.build();
92+
93+
private final BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> asyncStatelessHandler = (
94+
transportContext, request) -> {
95+
return Mono
96+
.just(new McpSchema.CallToolResult(transportContext.get("server-side-header-value").toString(), null));
97+
};
98+
99+
private final BiFunction<McpAsyncServerExchange, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> asyncStatefulHandler = (
100+
exchange, request) -> {
101+
return asyncStatelessHandler.apply(exchange.transportContext(), request);
102+
};
103+
104+
// Server context extractor
105+
private final McpTransportContextExtractor<ServerRequest> serverContextExtractor = (ServerRequest r) -> {
106+
var headerValue = r.headers().firstHeader(HEADER_NAME);
107+
return headerValue != null ? McpTransportContext.create(Map.of("server-side-header-value", headerValue))
108+
: McpTransportContext.EMPTY;
109+
};
110+
111+
// Server transports
112+
private final WebFluxStatelessServerTransport statelessServerTransport = WebFluxStatelessServerTransport.builder()
113+
.objectMapper(new ObjectMapper())
114+
.contextExtractor(serverContextExtractor)
115+
.build();
116+
117+
private final WebFluxStreamableServerTransportProvider streamableServerTransport = WebFluxStreamableServerTransportProvider
118+
.builder()
119+
.objectMapper(new ObjectMapper())
120+
.contextExtractor(serverContextExtractor)
121+
.build();
122+
123+
private final WebFluxSseServerTransportProvider sseServerTransport = WebFluxSseServerTransportProvider.builder()
124+
.objectMapper(new ObjectMapper())
125+
.contextExtractor(serverContextExtractor)
126+
.messageEndpoint("/mcp/message")
127+
.build();
128+
129+
// Async clients
130+
private final McpAsyncClient asyncStreamableClient = McpClient
131+
.async(WebClientStreamableHttpTransport
132+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT).filter(asyncClientContextProvider))
133+
.build())
134+
.build();
135+
136+
private final McpAsyncClient asyncSseClient = McpClient
137+
.async(WebFluxSseClientTransport
138+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT).filter(asyncClientContextProvider))
139+
.build())
140+
.build();
141+
142+
private DisposableServer httpServer;
143+
144+
@AfterEach
145+
public void after() {
146+
if (statelessServerTransport != null) {
147+
statelessServerTransport.closeGracefully().block();
148+
}
149+
if (streamableServerTransport != null) {
150+
streamableServerTransport.closeGracefully().block();
151+
}
152+
if (sseServerTransport != null) {
153+
sseServerTransport.closeGracefully().block();
154+
}
155+
if (asyncStreamableClient != null) {
156+
asyncStreamableClient.closeGracefully().block();
157+
}
158+
if (asyncSseClient != null) {
159+
asyncSseClient.closeGracefully().block();
160+
}
161+
stopHttpServer();
162+
}
163+
164+
@Test
165+
void asyncClientStatelessServer() {
166+
167+
startHttpServer(statelessServerTransport.getRouterFunction());
168+
169+
var mcpServer = McpServer.async(statelessServerTransport)
170+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
171+
.tools(new McpStatelessServerFeatures.AsyncToolSpecification(tool, asyncStatelessHandler))
172+
.build();
173+
174+
StepVerifier.create(asyncStreamableClient.initialize()).assertNext(initResult -> {
175+
assertThat(initResult).isNotNull();
176+
}).verifyComplete();
177+
178+
// Test tool call with context
179+
StepVerifier
180+
.create(asyncStreamableClient.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()))
181+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY,
182+
McpTransportContext.create(Map.of("client-side-header-value", "some important value")))))
183+
.assertNext(response -> {
184+
assertThat(response).isNotNull();
185+
assertThat(response.content()).hasSize(1)
186+
.first()
187+
.extracting(McpSchema.TextContent.class::cast)
188+
.extracting(McpSchema.TextContent::text)
189+
.isEqualTo("some important value");
190+
})
191+
.verifyComplete();
192+
193+
mcpServer.close();
194+
}
195+
196+
@Test
197+
void asyncClientStreamableServer() {
198+
199+
startHttpServer(streamableServerTransport.getRouterFunction());
200+
201+
var mcpServer = McpServer.async(streamableServerTransport)
202+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
203+
.tools(new McpServerFeatures.AsyncToolSpecification(tool, null, asyncStatefulHandler))
204+
.build();
205+
206+
StepVerifier.create(asyncStreamableClient.initialize()).assertNext(initResult -> {
207+
assertThat(initResult).isNotNull();
208+
}).verifyComplete();
209+
210+
// Test tool call with context
211+
StepVerifier
212+
.create(asyncStreamableClient.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()))
213+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY,
214+
McpTransportContext.create(Map.of("client-side-header-value", "some important value")))))
215+
.assertNext(response -> {
216+
assertThat(response).isNotNull();
217+
assertThat(response.content()).hasSize(1)
218+
.first()
219+
.extracting(McpSchema.TextContent.class::cast)
220+
.extracting(McpSchema.TextContent::text)
221+
.isEqualTo("some important value");
222+
})
223+
.verifyComplete();
224+
225+
mcpServer.close();
226+
}
227+
228+
@Test
229+
void asyncClientSseServer() {
230+
231+
startHttpServer(sseServerTransport.getRouterFunction());
232+
233+
var mcpServer = McpServer.async(sseServerTransport)
234+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
235+
.tools(new McpServerFeatures.AsyncToolSpecification(tool, null, asyncStatefulHandler))
236+
.build();
237+
238+
StepVerifier.create(asyncSseClient.initialize()).assertNext(initResult -> {
239+
assertThat(initResult).isNotNull();
240+
}).verifyComplete();
241+
242+
// Test tool call with context
243+
StepVerifier
244+
.create(asyncSseClient.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()))
245+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY,
246+
McpTransportContext.create(Map.of("client-side-header-value", "some important value")))))
247+
.assertNext(response -> {
248+
assertThat(response).isNotNull();
249+
assertThat(response.content()).hasSize(1)
250+
.first()
251+
.extracting(McpSchema.TextContent.class::cast)
252+
.extracting(McpSchema.TextContent::text)
253+
.isEqualTo("some important value");
254+
})
255+
.verifyComplete();
256+
257+
mcpServer.close();
258+
}
259+
260+
private void startHttpServer(RouterFunction<?> routerFunction) {
261+
262+
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
263+
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
264+
this.httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
265+
}
266+
267+
private void stopHttpServer() {
268+
if (httpServer != null) {
269+
httpServer.disposeNow();
270+
}
271+
}
272+
273+
}

0 commit comments

Comments
 (0)