Skip to content

Commit 6616445

Browse files
authored
Valkey promise (#16)
* Add Valkey promise * Use checkedContinuation for send
1 parent 59ad88e commit 6616445

File tree

2 files changed

+44
-18
lines changed

2 files changed

+44
-18
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,34 @@ import DequeModule
1616
import Logging
1717
import NIOCore
1818

19+
@usableFromInline
20+
enum ValkeyPromise<T: Sendable>: Sendable {
21+
case nio(EventLoopPromise<T>)
22+
case swift(CheckedContinuation<T, any Error>)
23+
24+
func succeed(_ t: T) {
25+
switch self {
26+
case .nio(let eventLoopPromise):
27+
eventLoopPromise.succeed(t)
28+
case .swift(let checkedContinuation):
29+
checkedContinuation.resume(returning: t)
30+
}
31+
}
32+
33+
func fail(_ e: Error) {
34+
switch self {
35+
case .nio(let eventLoopPromise):
36+
eventLoopPromise.fail(e)
37+
case .swift(let checkedContinuation):
38+
checkedContinuation.resume(throwing: e)
39+
}
40+
}
41+
}
42+
1943
@usableFromInline
2044
enum ValkeyRequest: Sendable {
21-
case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>)
22-
case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>])
45+
case single(buffer: ByteBuffer, promise: ValkeyPromise<RESPToken>)
46+
case multiple(buffer: ByteBuffer, promises: [ValkeyPromise<RESPToken>])
2347
}
2448

2549
@usableFromInline
@@ -31,7 +55,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
3155

3256
@usableFromInline
3357
let eventLoop: EventLoop
34-
private var commands: Deque<EventLoopPromise<RESPToken>>
58+
private var commands: Deque<ValkeyPromise<RESPToken>>
3559
private var decoder: NIOSingleStepByteToMessageProcessor<RESPTokenDecoder>
3660
private var context: ChannelHandlerContext?
3761
private let logger: Logger

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ public final class ValkeyConnection: Sendable {
8080
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
8181
logger: Logger
8282
) async throws -> ValkeyConnection {
83-
let future = if eventLoop.inEventLoop {
84-
self._makeClient(address: address, eventLoop: eventLoop, configuration: configuration, logger: logger)
85-
} else {
86-
eventLoop.flatSubmit {
83+
let future =
84+
if eventLoop.inEventLoop {
8785
self._makeClient(address: address, eventLoop: eventLoop, configuration: configuration, logger: logger)
86+
} else {
87+
eventLoop.flatSubmit {
88+
self._makeClient(address: address, eventLoop: eventLoop, configuration: configuration, logger: logger)
89+
}
8890
}
89-
}
9091
let connection = try await future.get()
9192
if configuration.respVersion == .v3 {
9293
try await connection.resp3Upgrade()
@@ -112,17 +113,18 @@ public final class ValkeyConnection: Sendable {
112113
public func send<Command: RESPCommand>(command: Command) async throws -> Command.Response {
113114
var encoder = RESPCommandEncoder()
114115
command.encode(into: &encoder)
115-
let result = encoder.buffer
116+
let buffer = encoder.buffer
116117

117-
let promise = channel.eventLoop.makePromise(of: RESPToken.self)
118-
if self.channel.eventLoop.inEventLoop {
119-
self.channelHandler.value.write(request: ValkeyRequest.single(buffer: result, promise: promise))
120-
} else {
121-
self.channel.eventLoop.execute {
122-
self.channelHandler.value.write(request: ValkeyRequest.single(buffer: result, promise: promise))
118+
let result = try await withCheckedThrowingContinuation { continuation in
119+
if self.channel.eventLoop.inEventLoop {
120+
self.channelHandler.value.write(request: ValkeyRequest.single(buffer: buffer, promise: .swift(continuation)))
121+
} else {
122+
self.channel.eventLoop.execute {
123+
self.channelHandler.value.write(request: ValkeyRequest.single(buffer: buffer, promise: .swift(continuation)))
124+
}
123125
}
124126
}
125-
return try await .init(from: promise.futureResult.get())
127+
return try .init(from: result)
126128
}
127129

128130
/// Pipeline a series of commands to Valkey connection
@@ -145,10 +147,10 @@ public final class ValkeyConnection: Sendable {
145147
let promises = mpromises
146148
// write directly to channel handler
147149
if self.channel.eventLoop.inEventLoop {
148-
self.channelHandler.value.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises))
150+
self.channelHandler.value.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }))
149151
} else {
150152
self.channel.eventLoop.execute {
151-
self.channelHandler.value.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises))
153+
self.channelHandler.value.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }))
152154
}
153155
}
154156

0 commit comments

Comments
 (0)