@@ -44,12 +44,12 @@ public struct ServerAddress: Sendable, Equatable {
4444@_documentation ( visibility: internal)
4545public struct RedisConnection : Sendable {
4646 enum Request {
47- case command( RESPCommand )
48- case pipelinedCommands( [ RESPCommand ] )
47+ case command( ByteBuffer )
48+ case pipelinedCommands( ByteBuffer , Int )
4949 }
5050 enum Response {
5151 case token( RESPToken )
52- case pipelinedResponse( [ RESPToken ] )
52+ case pipelinedResponse( [ Result < RESPToken , Error > ] )
5353 }
5454 typealias RequestStreamElement = ( Request , CheckedContinuation < Response , Error > )
5555 /// Logger used by Server
@@ -96,7 +96,7 @@ public struct RedisConnection: Sendable {
9696 do {
9797 switch request {
9898 case . command( let command) :
99- try await outbound. write ( command. buffer )
99+ try await outbound. write ( command)
100100 let response = try await inboundIterator. next ( )
101101 if let response {
102102 continuation. resume ( returning: . token( response) )
@@ -109,22 +109,26 @@ public struct RedisConnection: Sendable {
109109 )
110110 )
111111 }
112- case . pipelinedCommands( let commands) :
113- try await outbound. write ( contentsOf: commands. map { $0. buffer } )
114- var responses : [ RESPToken ] = . init( )
115- for _ in 0 ..< commands. count {
116- let response = try await inboundIterator. next ( )
117- if let response {
118- responses. append ( response)
119- } else {
120- requestContinuation. finish ( )
121- continuation. resume (
122- throwing: RedisClientError (
123- . connectionClosed,
124- message: " The connection to the Redis database was unexpectedly closed. "
112+ case . pipelinedCommands( let commands, let count) :
113+ try await outbound. write ( commands)
114+ var responses : [ Result < RESPToken , Error > ] = . init( )
115+ for _ in 0 ..< count {
116+ do {
117+ let response = try await inboundIterator. next ( )
118+ if let response {
119+ responses. append ( . success( response) )
120+ } else {
121+ requestContinuation. finish ( )
122+ continuation. resume (
123+ throwing: RedisClientError (
124+ . connectionClosed,
125+ message: " The connection to the Redis database was unexpectedly closed. "
126+ )
125127 )
126- )
127- return
128+ return
129+ }
130+ } catch {
131+ responses. append ( . failure( error) )
128132 }
129133 }
130134 continuation. resume ( returning: . pipelinedResponse( responses) )
@@ -153,14 +157,11 @@ public struct RedisConnection: Sendable {
153157 }
154158 }
155159
156- @discardableResult public func send( command: RESPCommand ) async throws -> RESPToken {
157- if logger. logLevel <= . debug {
158- var buffer = command. buffer
159- let sending = try [ String] ( from: RESPToken ( consuming: & buffer) !) . joined ( separator: " " )
160- self . logger. debug ( " send: \( sending) " )
161- }
160+ @discardableResult public func send< Command: RedisCommand > ( command: Command ) async throws -> Command . Response {
161+ var encoder = RedisCommandEncoder ( )
162+ command. encode ( into: & encoder)
162163 let response : Response = try await withCheckedThrowingContinuation { continuation in
163- switch requestContinuation. yield ( ( . command( command ) , continuation) ) {
164+ switch requestContinuation. yield ( ( . command( encoder . buffer ) , continuation) ) {
164165 case . enqueued:
165166 break
166167 case . dropped, . terminated:
@@ -175,12 +176,21 @@ public struct RedisConnection: Sendable {
175176 }
176177 }
177178 guard case . token( let token) = response else { preconditionFailure ( " Expected a single response " ) }
178- return token
179+ return try . init ( from : token)
179180 }
180181
181- @discardableResult public func pipeline( _ commands: [ RESPCommand ] ) async throws -> [ RESPToken ] {
182+ @discardableResult public func pipeline< each Command : RedisCommand > (
183+ _ commands: repeat each Command
184+ ) async throws -> ( repeat ( each Command ) . Response) {
185+ var count = 0
186+ var encoder = RedisCommandEncoder ( )
187+ for command in repeat each commands {
188+ command. encode ( into: & encoder)
189+ count += 1
190+ }
191+
182192 let response : Response = try await withCheckedThrowingContinuation { continuation in
183- switch requestContinuation. yield ( ( . pipelinedCommands( commands ) , continuation) ) {
193+ switch requestContinuation. yield ( ( . pipelinedCommands( encoder . buffer , count ) , continuation) ) {
184194 case . enqueued:
185195 break
186196 case . dropped, . terminated:
@@ -195,21 +205,19 @@ public struct RedisConnection: Sendable {
195205 }
196206 }
197207 guard case . pipelinedResponse( let tokens) = response else { preconditionFailure ( " Expected a single response " ) }
198- return tokens
199- }
200208
201- @discardableResult public func send< each Arg : RESPRenderable > ( _ command: repeat each Arg ) async throws -> RESPToken {
202- let command = RESPCommand ( repeat each command)
203- return try await self . send ( command: command)
209+ var index = AutoIncrementingInteger ( )
210+ return try ( repeat ( each Command) . Response ( from: tokens [ index. next ( ) ] . get ( ) ) )
204211 }
205212
206213 /// Try to upgrade to RESP3
207214 private func resp3Upgrade(
208215 outbound: NIOAsyncChannelOutboundWriter < ByteBuffer > ,
209216 inboundIterator: inout NIOAsyncChannelInboundStream < RESPToken > . AsyncIterator
210217 ) async throws {
211- let helloCommand = RESPCommand ( " HELLO " , " 3 " )
212- try await outbound. write ( helloCommand. buffer)
218+ var encoder = RedisCommandEncoder ( )
219+ encoder. encodeArray ( " HELLO " , 3 )
220+ try await outbound. write ( encoder. buffer)
213221 let response = try await inboundIterator. next ( )
214222 guard let response else {
215223 throw RedisClientError ( . connectionClosed, message: " The connection to the Redis database was unexpectedly closed. " )
@@ -317,3 +325,11 @@ extension ClientBootstrap: ClientBootstrapProtocol {}
317325#if canImport(Network)
318326extension NIOTSConnectionBootstrap : ClientBootstrapProtocol { }
319327#endif
328+
329+ private struct AutoIncrementingInteger {
330+ var value : Int = 0
331+ mutating func next( ) -> Int {
332+ value += 1
333+ return value - 1
334+ }
335+ }
0 commit comments