From 64da7390705d235aef8b052300b410f53c51721a Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Thu, 14 Aug 2025 16:03:48 -0700 Subject: [PATCH 1/7] Convert EverythingServer to use Streamable HTTP --- .../EverythingServer/EverythingServer.csproj | 5 +-- .../LoggingUpdateMessageSender.cs | 37 +++++++++++++------ samples/EverythingServer/Program.cs | 21 +++++------ .../Properties/launchSettings.json | 21 +++++++++++ .../SubscriptionMessageSender.cs | 31 ++++++++++++---- 5 files changed, 82 insertions(+), 33 deletions(-) create mode 100644 samples/EverythingServer/Properties/launchSettings.json diff --git a/samples/EverythingServer/EverythingServer.csproj b/samples/EverythingServer/EverythingServer.csproj index d5046f7eb..eadf720ca 100644 --- a/samples/EverythingServer/EverythingServer.csproj +++ b/samples/EverythingServer/EverythingServer.csproj @@ -1,4 +1,4 @@ - + net9.0 @@ -8,14 +8,13 @@ - - + diff --git a/samples/EverythingServer/LoggingUpdateMessageSender.cs b/samples/EverythingServer/LoggingUpdateMessageSender.cs index 844aa70d8..6870a653c 100644 --- a/samples/EverythingServer/LoggingUpdateMessageSender.cs +++ b/samples/EverythingServer/LoggingUpdateMessageSender.cs @@ -1,11 +1,10 @@ -using Microsoft.Extensions.Hosting; -using ModelContextProtocol; +using ModelContextProtocol; using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; namespace EverythingServer; -public class LoggingUpdateMessageSender(IMcpServer server, Func getMinLevel) : BackgroundService +public class LoggingUpdateMessageSender(IServiceProvider serviceProvider, Func getMinLevel) : BackgroundService { readonly Dictionary _loggingLevelMap = new() { @@ -21,19 +20,35 @@ public class LoggingUpdateMessageSender(IMcpServer server, Func ge protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + // Wait for the application to fully start before trying to access the MCP server + await Task.Delay(2000, stoppingToken); + while (!stoppingToken.IsCancellationRequested) { - var newLevel = (LoggingLevel)Random.Shared.Next(_loggingLevelMap.Count); - - var message = new + try + { + // Try to get the server from the service provider + var server = serviceProvider.GetService(); + if (server != null) { - Level = newLevel.ToString().ToLower(), - Data = _loggingLevelMap[newLevel], - }; + var newLevel = (LoggingLevel)Random.Shared.Next(_loggingLevelMap.Count); - if (newLevel > getMinLevel()) + var message = new + { + Level = newLevel.ToString().ToLower(), + Data = _loggingLevelMap[newLevel], + }; + + if (newLevel > getMinLevel()) + { + await server.SendNotificationAsync("notifications/message", message, cancellationToken: stoppingToken); + } + } + } + catch (Exception ex) { - await server.SendNotificationAsync("notifications/message", message, cancellationToken: stoppingToken); + // Log the exception but don't crash the service + Console.WriteLine($"Error in LoggingUpdateMessageSender: {ex.Message}"); } await Task.Delay(15000, stoppingToken); diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index b976bcc0a..26b583af1 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -3,10 +3,8 @@ using EverythingServer.Resources; using EverythingServer.Tools; using Microsoft.Extensions.AI; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; using ModelContextProtocol; +using ModelContextProtocol.AspNetCore; using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; using OpenTelemetry; @@ -15,19 +13,14 @@ using OpenTelemetry.Resources; using OpenTelemetry.Trace; -var builder = Host.CreateApplicationBuilder(args); -builder.Logging.AddConsole(consoleLogOptions => -{ - // Configure all logs to go to stderr - consoleLogOptions.LogToStandardErrorThreshold = LogLevel.Trace; -}); +var builder = WebApplication.CreateBuilder(args); HashSet subscriptions = []; var _minimumLoggingLevel = LoggingLevel.Debug; builder.Services .AddMcpServer() - .WithStdioServerTransport() + .WithHttpTransport() .WithTools() .WithTools() .WithTools() @@ -151,4 +144,10 @@ await ctx.Server.SampleAsync([ builder.Services.AddSingleton>(_ => () => _minimumLoggingLevel); -await builder.Build().RunAsync(); +var app = builder.Build(); + +app.UseHttpsRedirection(); + +app.MapMcp(); + +app.Run(); diff --git a/samples/EverythingServer/Properties/launchSettings.json b/samples/EverythingServer/Properties/launchSettings.json new file mode 100644 index 000000000..74cf457ef --- /dev/null +++ b/samples/EverythingServer/Properties/launchSettings.json @@ -0,0 +1,21 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "applicationUrl": "http://localhost:3001", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "applicationUrl": "https://localhost:7133;http://localhost:3001", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + } + } + } +} \ No newline at end of file diff --git a/samples/EverythingServer/SubscriptionMessageSender.cs b/samples/EverythingServer/SubscriptionMessageSender.cs index 774d98523..b287c3bf7 100644 --- a/samples/EverythingServer/SubscriptionMessageSender.cs +++ b/samples/EverythingServer/SubscriptionMessageSender.cs @@ -1,20 +1,35 @@ -using Microsoft.Extensions.Hosting; -using ModelContextProtocol; +using ModelContextProtocol; using ModelContextProtocol.Server; -internal class SubscriptionMessageSender(IMcpServer server, HashSet subscriptions) : BackgroundService +internal class SubscriptionMessageSender(IServiceProvider serviceProvider, HashSet subscriptions) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + // Wait for the application to fully start before trying to access the MCP server + await Task.Delay(2000, stoppingToken); + while (!stoppingToken.IsCancellationRequested) { - foreach (var uri in subscriptions) + try { - await server.SendNotificationAsync("notifications/resource/updated", - new + // Try to get the server from the service provider + var server = serviceProvider.GetService(); + if (server != null) + { + foreach (var uri in subscriptions) { - Uri = uri, - }, cancellationToken: stoppingToken); + await server.SendNotificationAsync("notifications/resource/updated", + new + { + Uri = uri, + }, cancellationToken: stoppingToken); + } + } + } + catch (Exception ex) + { + // Log the exception but don't crash the service + Console.WriteLine($"Error in SubscriptionMessageSender: {ex.Message}"); } await Task.Delay(5000, stoppingToken); From 10c96f201a911802166ec67eb2d7279835c510b5 Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Thu, 14 Aug 2025 17:08:24 -0700 Subject: [PATCH 2/7] Fix resource subscriptions for HTTP --- samples/EverythingServer/Program.cs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index 26b583af1..92e5f3711 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -4,7 +4,6 @@ using EverythingServer.Tools; using Microsoft.Extensions.AI; using ModelContextProtocol; -using ModelContextProtocol.AspNetCore; using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; using OpenTelemetry; @@ -15,7 +14,8 @@ var builder = WebApplication.CreateBuilder(args); -HashSet subscriptions = []; +// Subscriptions tracks resource URIs to McpServer instances +Dictionary> subscriptions = new(); var _minimumLoggingLevel = LoggingLevel.Debug; builder.Services @@ -37,7 +37,11 @@ if (uri is not null) { - subscriptions.Add(uri); + if (!subscriptions.ContainsKey(uri)) + { + subscriptions[uri] = new List(); + } + subscriptions[uri].Add(ctx.Server); await ctx.Server.SampleAsync([ new ChatMessage(ChatRole.System, "You are a helpful test server"), @@ -58,7 +62,11 @@ await ctx.Server.SampleAsync([ var uri = ctx.Params?.Uri; if (uri is not null) { - subscriptions.Remove(uri); + if (subscriptions.ContainsKey(uri)) + { + // Remove ctx.Server from the subscription list + subscriptions[uri].Remove(ctx.Server); + } } return new EmptyResult(); }) From e96c833f9d630e6145de9d7f30c8ab88d20c7a58 Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Thu, 14 Aug 2025 17:14:41 -0700 Subject: [PATCH 3/7] Fix SetLoggingLevel handler / log message generator --- samples/EverythingServer/LoggingUpdateMessageSender.cs | 4 ++-- samples/EverythingServer/Program.cs | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/samples/EverythingServer/LoggingUpdateMessageSender.cs b/samples/EverythingServer/LoggingUpdateMessageSender.cs index 6870a653c..969ae1057 100644 --- a/samples/EverythingServer/LoggingUpdateMessageSender.cs +++ b/samples/EverythingServer/LoggingUpdateMessageSender.cs @@ -4,7 +4,7 @@ namespace EverythingServer; -public class LoggingUpdateMessageSender(IServiceProvider serviceProvider, Func getMinLevel) : BackgroundService +public class LoggingUpdateMessageSender(IServiceProvider serviceProvider) : BackgroundService { readonly Dictionary _loggingLevelMap = new() { @@ -39,7 +39,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) Data = _loggingLevelMap[newLevel], }; - if (newLevel > getMinLevel()) + if (newLevel > server.LoggingLevel) { await server.SendNotificationAsync("notifications/message", message, cancellationToken: stoppingToken); } diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index 92e5f3711..29d4a2527 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -16,7 +16,6 @@ // Subscriptions tracks resource URIs to McpServer instances Dictionary> subscriptions = new(); -var _minimumLoggingLevel = LoggingLevel.Debug; builder.Services .AddMcpServer() @@ -127,13 +126,13 @@ await ctx.Server.SampleAsync([ throw new McpException("Missing required argument 'level'", McpErrorCode.InvalidParams); } - _minimumLoggingLevel = ctx.Params.Level; + // The SDK updates the LoggingLevel field of the IMcpServer await ctx.Server.SendNotificationAsync("notifications/message", new { Level = "debug", Logger = "test-server", - Data = $"Logging level set to {_minimumLoggingLevel}", + Data = $"Logging level set to {ctx.Params.Level}", }, cancellationToken: ct); return new EmptyResult(); @@ -150,8 +149,6 @@ await ctx.Server.SampleAsync([ builder.Services.AddHostedService(); builder.Services.AddHostedService(); -builder.Services.AddSingleton>(_ => () => _minimumLoggingLevel); - var app = builder.Build(); app.UseHttpsRedirection(); From 5159f94d68cb9b2e3537bc32dbdb8512d9852c2d Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Thu, 14 Aug 2025 17:51:06 -0700 Subject: [PATCH 4/7] Fix resource notification generator --- samples/EverythingServer/Program.cs | 2 +- samples/EverythingServer/SubscriptionMessageSender.cs | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index 29d4a2527..c370b6581 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -145,7 +145,7 @@ await ctx.Server.SampleAsync([ .WithLogging(b => b.SetResourceBuilder(resource)) .UseOtlpExporter(); -builder.Services.AddSingleton(subscriptions); +builder.Services.AddSingleton>>(subscriptions); builder.Services.AddHostedService(); builder.Services.AddHostedService(); diff --git a/samples/EverythingServer/SubscriptionMessageSender.cs b/samples/EverythingServer/SubscriptionMessageSender.cs index b287c3bf7..04456f27e 100644 --- a/samples/EverythingServer/SubscriptionMessageSender.cs +++ b/samples/EverythingServer/SubscriptionMessageSender.cs @@ -1,7 +1,7 @@ using ModelContextProtocol; using ModelContextProtocol.Server; -internal class SubscriptionMessageSender(IServiceProvider serviceProvider, HashSet subscriptions) : BackgroundService +internal class SubscriptionMessageSender(IDictionary> subscriptions) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -12,11 +12,9 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - // Try to get the server from the service provider - var server = serviceProvider.GetService(); - if (server != null) + foreach (var (uri, servers) in subscriptions) { - foreach (var uri in subscriptions) + foreach (var server in servers) { await server.SendNotificationAsync("notifications/resource/updated", new From 240ccc857c009ffaabbf3df07871ca73aea3e303 Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Mon, 8 Sep 2025 06:30:22 -0500 Subject: [PATCH 5/7] Use thread-safe data structures to track resource subscriptions --- samples/EverythingServer/Program.cs | 28 +++++++++---------- .../SubscriptionMessageSender.cs | 3 +- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index c370b6581..273769137 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -11,11 +11,14 @@ using OpenTelemetry.Metrics; using OpenTelemetry.Resources; using OpenTelemetry.Trace; +using System.Collections.Concurrent; var builder = WebApplication.CreateBuilder(args); // Subscriptions tracks resource URIs to McpServer instances -Dictionary> subscriptions = new(); +// Use thread-safe data structures since handlers can run in parallel +// even in the context of a single session. +ConcurrentDictionary> subscriptions = new(); builder.Services .AddMcpServer() @@ -32,15 +35,10 @@ .WithResources() .WithSubscribeToResourcesHandler(async (ctx, ct) => { - var uri = ctx.Params?.Uri; - - if (uri is not null) + if (ctx.Params?.Uri is { } uri) { - if (!subscriptions.ContainsKey(uri)) - { - subscriptions[uri] = new List(); - } - subscriptions[uri].Add(ctx.Server); + var bag = subscriptions.GetOrAdd(uri, _ => new ConcurrentBag()); + bag.Add(ctx.Server); await ctx.Server.SampleAsync([ new ChatMessage(ChatRole.System, "You are a helpful test server"), @@ -58,13 +56,13 @@ await ctx.Server.SampleAsync([ }) .WithUnsubscribeFromResourcesHandler(async (ctx, ct) => { - var uri = ctx.Params?.Uri; - if (uri is not null) + if (ctx.Params?.Uri is { } uri) { - if (subscriptions.ContainsKey(uri)) + if (subscriptions.TryGetValue(uri, out var bag)) { - // Remove ctx.Server from the subscription list - subscriptions[uri].Remove(ctx.Server); + // Remove ctx.Server from the subscription bag (ConcurrentBag does not support removal, so recreate) + var newBag = new ConcurrentBag(bag.Where(s => s != ctx.Server)); + subscriptions[uri] = newBag; } } return new EmptyResult(); @@ -145,7 +143,7 @@ await ctx.Server.SampleAsync([ .WithLogging(b => b.SetResourceBuilder(resource)) .UseOtlpExporter(); -builder.Services.AddSingleton>>(subscriptions); +builder.Services.AddSingleton(subscriptions); builder.Services.AddHostedService(); builder.Services.AddHostedService(); diff --git a/samples/EverythingServer/SubscriptionMessageSender.cs b/samples/EverythingServer/SubscriptionMessageSender.cs index 04456f27e..a619caeff 100644 --- a/samples/EverythingServer/SubscriptionMessageSender.cs +++ b/samples/EverythingServer/SubscriptionMessageSender.cs @@ -1,7 +1,8 @@ using ModelContextProtocol; using ModelContextProtocol.Server; -internal class SubscriptionMessageSender(IDictionary> subscriptions) : BackgroundService +using System.Collections.Concurrent; +internal class SubscriptionMessageSender(ConcurrentDictionary> subscriptions) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { From be4d5f0bbf9e4715876c8700d023e4f15a28009d Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Mon, 8 Sep 2025 08:07:16 -0500 Subject: [PATCH 6/7] Add http and appsettings files --- .../EverythingServer/EverythingServer.http | 77 +++++++++++++++++++ .../appsettings.Development.json | 8 ++ samples/EverythingServer/appsettings.json | 9 +++ 3 files changed, 94 insertions(+) create mode 100644 samples/EverythingServer/EverythingServer.http create mode 100644 samples/EverythingServer/appsettings.Development.json create mode 100644 samples/EverythingServer/appsettings.json diff --git a/samples/EverythingServer/EverythingServer.http b/samples/EverythingServer/EverythingServer.http new file mode 100644 index 000000000..4903f9407 --- /dev/null +++ b/samples/EverythingServer/EverythingServer.http @@ -0,0 +1,77 @@ +@HostAddress = http://localhost:3001 + +POST {{HostAddress}}/ +Accept: application/json, text/event-stream +Content-Type: application/json + +{ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "clientInfo": { + "name": "RestClient", + "version": "0.1.0" + }, + "capabilities": {}, + "protocolVersion": "2025-06-18" + } +} + +### + +@SessionId = ZwwM0VFEtKNOMBsP8D2VzQ + +POST {{HostAddress}}/ +Accept: application/json, text/event-stream +Content-Type: application/json +MCP-Protocol-Version: 2025-06-18 +Mcp-Session-Id: {{SessionId}} + +{ + "jsonrpc": "2.0", + "id": 2, + "method": "resources/list" +} + +### + +@resource_uri = test://direct/text/resource + +POST {{HostAddress}}/ +Accept: application/json, text/event-stream +Content-Type: application/json +MCP-Protocol-Version: 2025-06-18 +Mcp-Session-Id: {{SessionId}} + +{ + "jsonrpc": "2.0", + "id": 3, + "method": "resources/subscribe", + "params": { + "uri": "{{resource_uri}}" + } +} + +### + +POST {{HostAddress}}/ +Accept: application/json, text/event-stream +Content-Type: application/json +MCP-Protocol-Version: 2025-06-18 +Mcp-Session-Id: {{SessionId}} + +{ + "jsonrpc": "2.0", + "id": 4, + "method": "resources/unsubscribe", + "params": { + "uri": "{{resource_uri}}" + } +} + +### + +DELETE {{HostAddress}}/ +MCP-Protocol-Version: 2025-06-18 +Mcp-Session-Id: {{SessionId}} diff --git a/samples/EverythingServer/appsettings.Development.json b/samples/EverythingServer/appsettings.Development.json new file mode 100644 index 000000000..0c208ae91 --- /dev/null +++ b/samples/EverythingServer/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/samples/EverythingServer/appsettings.json b/samples/EverythingServer/appsettings.json new file mode 100644 index 000000000..10f68b8c8 --- /dev/null +++ b/samples/EverythingServer/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} From cf79126b2d83717f5bd69962c66ec2b2117f1f2d Mon Sep 17 00:00:00 2001 From: Mike Kistler Date: Fri, 12 Sep 2025 10:46:45 -0500 Subject: [PATCH 7/7] Move subscription management to a dedicated class and add subscription cleanup on session end --- samples/EverythingServer/Program.cs | 41 ++++++--- .../EverythingServer/SubscriptionManager.cs | 91 +++++++++++++++++++ .../SubscriptionMessageSender.cs | 4 +- 3 files changed, 119 insertions(+), 17 deletions(-) create mode 100644 samples/EverythingServer/SubscriptionManager.cs diff --git a/samples/EverythingServer/Program.cs b/samples/EverythingServer/Program.cs index 273769137..71dc3e1ea 100644 --- a/samples/EverythingServer/Program.cs +++ b/samples/EverythingServer/Program.cs @@ -15,14 +15,24 @@ var builder = WebApplication.CreateBuilder(args); -// Subscriptions tracks resource URIs to McpServer instances -// Use thread-safe data structures since handlers can run in parallel -// even in the context of a single session. -ConcurrentDictionary> subscriptions = new(); - builder.Services .AddMcpServer() - .WithHttpTransport() + .WithHttpTransport(options => + { + // Add a RunSessionHandler to remove all subscriptions for the session when it ends + options.RunSessionHandler = async (httpContext, mcpServer, token) => + { + try + { + await mcpServer.RunAsync(token); + } + finally + { + // This code runs when the session ends + SubscriptionManager.RemoveAllSubscriptions(mcpServer); + } + }; + }) .WithTools() .WithTools() .WithTools() @@ -35,10 +45,13 @@ .WithResources() .WithSubscribeToResourcesHandler(async (ctx, ct) => { + if (ctx.Server.SessionId == null) + { + throw new McpException("Cannot add subscription for server with null SessionId"); + } if (ctx.Params?.Uri is { } uri) { - var bag = subscriptions.GetOrAdd(uri, _ => new ConcurrentBag()); - bag.Add(ctx.Server); + SubscriptionManager.AddSubscription(uri, ctx.Server); await ctx.Server.SampleAsync([ new ChatMessage(ChatRole.System, "You are a helpful test server"), @@ -56,14 +69,13 @@ await ctx.Server.SampleAsync([ }) .WithUnsubscribeFromResourcesHandler(async (ctx, ct) => { + if (ctx.Server.SessionId == null) + { + throw new McpException("Cannot remove subscription for server with null SessionId"); + } if (ctx.Params?.Uri is { } uri) { - if (subscriptions.TryGetValue(uri, out var bag)) - { - // Remove ctx.Server from the subscription bag (ConcurrentBag does not support removal, so recreate) - var newBag = new ConcurrentBag(bag.Where(s => s != ctx.Server)); - subscriptions[uri] = newBag; - } + SubscriptionManager.RemoveSubscription(uri, ctx.Server); } return new EmptyResult(); }) @@ -143,7 +155,6 @@ await ctx.Server.SampleAsync([ .WithLogging(b => b.SetResourceBuilder(resource)) .UseOtlpExporter(); -builder.Services.AddSingleton(subscriptions); builder.Services.AddHostedService(); builder.Services.AddHostedService(); diff --git a/samples/EverythingServer/SubscriptionManager.cs b/samples/EverythingServer/SubscriptionManager.cs new file mode 100644 index 000000000..65eab0d8d --- /dev/null +++ b/samples/EverythingServer/SubscriptionManager.cs @@ -0,0 +1,91 @@ +using ModelContextProtocol; +using ModelContextProtocol.Server; + +// This class manages subscriptions to resources by McpServer instances. +// The subscription information must be accessed in a thread-safe manner since handlers +// can run in parallel even in the context of a single session. +static class SubscriptionManager +{ + // Subscriptions tracks resource URIs to bags of McpServer instances (thread-safe via locking) + private static Dictionary> subscriptions = new(); + + // SessionSubscriptions is a secondary index to subscriptions to allow efficient removal of all + // subscriptions for a given session when it ends. (thread-safe via locking) + private static Dictionary /* uris */> sessionSubscriptions = new(); + + private static readonly object _subscriptionsLock = new(); + + public static void AddSubscription(string uri, IMcpServer server) + { + if (server.SessionId == null) + { + throw new McpException("Cannot add subscription for server with null SessionId"); + } + lock (_subscriptionsLock) + { + subscriptions[uri] ??= new List(); + subscriptions[uri].Add(server); + sessionSubscriptions[server.SessionId] ??= new List(); + sessionSubscriptions[server.SessionId].Add(uri); + } + } + + public static void RemoveSubscription(string uri, IMcpServer server) + { + if (server.SessionId == null) + { + throw new McpException("Cannot remove subscription for server with null SessionId"); + } + lock (_subscriptionsLock) + { + if (subscriptions.ContainsKey(uri)) + { + // Remove the server from the list of subscriptions for the URI + subscriptions[uri] = subscriptions[uri].Where(s => s.SessionId != server.SessionId).ToList(); + if (subscriptions[uri]?.Count == 0) + { + subscriptions.Remove(uri); + } + } + // Remove the URI from the list of subscriptions for the session + sessionSubscriptions[server.SessionId]?.Remove(uri); + if (sessionSubscriptions[server.SessionId]?.Count == 0) + { + sessionSubscriptions.Remove(server.SessionId); + } + } + } + + public static IDictionary> GetSubscriptions() + { + lock (_subscriptionsLock) + { + // Return a copy of the subscriptions dictionary to avoid external modification + return subscriptions.ToDictionary(entry => entry.Key, + entry => entry.Value.ToList()); + } + } + + public static void RemoveAllSubscriptions(IMcpServer server) + { + if (server.SessionId is { } sessionId) + { + lock (_subscriptionsLock) + { + // Remove all subscriptions for the session + if (sessionSubscriptions.TryGetValue(sessionId, out var uris)) + { + foreach (var uri in uris) + { + subscriptions[uri] = subscriptions[uri].Where(s => s.SessionId != sessionId).ToList(); + if (subscriptions[uri]?.Count == 0) + { + subscriptions.Remove(uri); + } + } + sessionSubscriptions.Remove(sessionId); + } + } + } + } +} \ No newline at end of file diff --git a/samples/EverythingServer/SubscriptionMessageSender.cs b/samples/EverythingServer/SubscriptionMessageSender.cs index a619caeff..80deb6ab0 100644 --- a/samples/EverythingServer/SubscriptionMessageSender.cs +++ b/samples/EverythingServer/SubscriptionMessageSender.cs @@ -2,7 +2,7 @@ using ModelContextProtocol.Server; using System.Collections.Concurrent; -internal class SubscriptionMessageSender(ConcurrentDictionary> subscriptions) : BackgroundService +internal class SubscriptionMessageSender() : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { @@ -13,7 +13,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - foreach (var (uri, servers) in subscriptions) + foreach (var (uri, servers) in SubscriptionManager.GetSubscriptions()) { foreach (var server in servers) {