@@ -80,6 +80,7 @@ struct SubscriptionTests {
8080 }
8181 }
8282
83+ /// Test a single subscription can subscribe to multiple channels
8384 @Test
8485 func testSubscribeMultipleChannels( ) async throws {
8586 let channel = NIOAsyncTestingChannel ( )
@@ -111,6 +112,45 @@ struct SubscriptionTests {
111112 }
112113 }
113114
115+ /// Test when unsubscribing from one channel, that you will still receive messages from other channels you are
116+ /// still subscribed to
117+ @Test
118+ func testUnsubscribeFromOneChannel( ) async throws {
119+ let channel = NIOAsyncTestingChannel ( )
120+ let logger = Logger ( label: " test " )
121+ let connection = try await ValkeyConnection . setupChannel ( channel, configuration: . init( ) , logger: logger)
122+
123+ try await withThrowingTaskGroup ( of: Void . self) { group in
124+ group. addTask {
125+ let subscription = try await connection. subscribe ( to: " test1 " , " test2 " , " test3 " )
126+ _ = try await connection. unsubscribe ( channel: [ " test2 " ] )
127+ var iterator = subscription. makeAsyncIterator ( )
128+ #expect( try await iterator. next ( ) == . init( channel: " test1 " , message: " 1 " ) )
129+ #expect( try await iterator. next ( ) == . init( channel: " test3 " , message: " 3 " ) )
130+ }
131+ group. addTask {
132+ var outbound = try await channel. waitForOutboundWrite ( as: ByteBuffer . self)
133+ // expect SUBSCRIBE command
134+ #expect( String ( buffer: outbound) == " *4 \r \n $9 \r \n SUBSCRIBE \r \n $5 \r \n test1 \r \n $5 \r \n test2 \r \n $5 \r \n test3 \r \n " )
135+ // push 3 subscribes (one for each channel)
136+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $9 \r \n subscribe \r \n $5 \r \n test1 \r \n :1 \r \n " ) )
137+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $9 \r \n subscribe \r \n $5 \r \n test2 \r \n :2 \r \n " ) )
138+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $9 \r \n subscribe \r \n $5 \r \n test3 \r \n :3 \r \n " ) )
139+ outbound = try await channel. waitForOutboundWrite ( as: ByteBuffer . self)
140+ // expect UNSUBSCRIBE command
141+ #expect( String ( buffer: outbound) == " *2 \r \n $11 \r \n UNSUBSCRIBE \r \n $5 \r \n test2 \r \n " )
142+ // push unsubscribe
143+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $11 \r \n unsubscribe \r \n $5 \r \n test2 \r \n :0 \r \n " ) )
144+ // push 3 messages
145+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $7 \r \n message \r \n $5 \r \n test1 \r \n $1 \r \n 1 \r \n " ) )
146+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $7 \r \n message \r \n $5 \r \n test2 \r \n $1 \r \n 2 \r \n " ) )
147+ try await channel. writeInbound ( ByteBuffer ( string: " >3 \r \n $7 \r \n message \r \n $5 \r \n test3 \r \n $1 \r \n 3 \r \n " ) )
148+ }
149+ try await group. waitForAll ( )
150+ }
151+ }
152+
153+ /// Test you can have multiple subscriptions running on one connection
114154 @Test
115155 func testMultipleSubscriptions( ) async throws {
116156 let channel = NIOAsyncTestingChannel ( )
0 commit comments