Skip to content

Commit 59446fa

Browse files
committed
Connection as actor.
1 parent c9e28ff commit 59446fa

File tree

5 files changed

+135
-105
lines changed

5 files changed

+135
-105
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,38 @@ import DequeModule
1616
import Logging
1717
import NIOCore
1818

19+
@usableFromInline
20+
enum ValkeyPromise<T: Sendable>: Sendable {
21+
case nio(EventLoopPromise<T>)
22+
case swift(CheckedContinuation<T, any Error>)
23+
24+
func succeed(_ t: T) {
25+
switch self {
26+
case .nio(let eventLoopPromise):
27+
eventLoopPromise.succeed(t)
28+
case .swift(let checkedContinuation):
29+
checkedContinuation.resume(returning: t)
30+
}
31+
}
32+
33+
func fail(_ e: Error) {
34+
switch self {
35+
case .nio(let eventLoopPromise):
36+
eventLoopPromise.fail(e)
37+
case .swift(let checkedContinuation):
38+
checkedContinuation.resume(throwing: e)
39+
}
40+
}
41+
}
42+
1943
@usableFromInline
2044
enum ValkeyRequest: Sendable {
2145
case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>)
2246
case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>])
2347
}
2448

2549
@usableFromInline
26-
final class ValkeyChannelHandler: ChannelDuplexHandler {
50+
final class ValkeyChannelHandler: ChannelInboundHandler {
2751
@usableFromInline
2852
typealias OutboundIn = ValkeyRequest
2953
@usableFromInline
@@ -33,50 +57,73 @@ final class ValkeyChannelHandler: ChannelDuplexHandler {
3357

3458
@usableFromInline
3559
let eventLoop: EventLoop
36-
private var commands: Deque<EventLoopPromise<RESPToken>>
60+
@usableFromInline
61+
/* private */ var commands: Deque<ValkeyPromise<RESPToken>>
62+
@usableFromInline
63+
/* private */ var commandEncoder: RESPCommandEncoder
64+
@usableFromInline
65+
/* private */ var context: ChannelHandlerContext?
66+
3767
private var decoder: NIOSingleStepByteToMessageProcessor<RESPTokenDecoder>
38-
private var context: ChannelHandlerContext?
3968
private let logger: Logger
4069

41-
init(channel: Channel, logger: Logger) {
42-
self.eventLoop = channel.eventLoop
70+
init(eventLoop: EventLoop, logger: Logger) {
71+
self.eventLoop = eventLoop
4372
self.commands = .init()
73+
self.commandEncoder = RESPCommandEncoder()
4474
self.decoder = NIOSingleStepByteToMessageProcessor(RESPTokenDecoder())
4575
self.context = nil
4676
self.logger = logger
4777
}
4878

49-
/// Write valkey command/commands to channel
50-
/// - Parameters:
51-
/// - request: Valkey command request
52-
/// - promise: Promise to fulfill when command is complete
5379
@inlinable
54-
func write(request: ValkeyRequest) {
55-
if self.eventLoop.inEventLoop {
56-
self._write(request: request)
57-
} else {
58-
eventLoop.execute {
59-
self._write(request: request)
60-
}
80+
func send<Command: RESPCommand>(
81+
command: Command,
82+
isolation: isolated (any Actor)? = #isolation
83+
) async throws -> Command.Response {
84+
let token = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<RESPToken, any Error>) in
85+
self._write(command, promise: .swift(checkedContinuation))
6186
}
87+
88+
return try Command.Response.init(from: token)
6289
}
6390

64-
@usableFromInline
65-
func _write(request: ValkeyRequest) {
91+
@inlinable
92+
public func pipeline<each Command: RESPCommand>(
93+
_ commands: repeat each Command,
94+
isolation: isolated (any Actor)? = #isolation
95+
) async throws -> (repeat (each Command).Response) {
96+
var promises = [EventLoopFuture<RESPToken>]()
6697
guard let context = self.context else {
6798
preconditionFailure("Trying to use valkey connection before it is setup")
6899
}
69-
switch request {
70-
case .single(let buffer, let tokenPromise):
71-
self.commands.append(tokenPromise)
72-
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
73-
74-
case .multiple(let buffer, let tokenPromises):
75-
for tokenPromise in tokenPromises {
76-
self.commands.append(tokenPromise)
77-
}
78-
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
100+
101+
self.commandEncoder.reset()
102+
for command in repeat each commands {
103+
command.encode(into: &self.commandEncoder)
104+
let promise = self.eventLoop.makePromise(of: RESPToken.self)
105+
self.commands.append(.nio(promise))
106+
promises.append(promise.futureResult)
107+
}
108+
109+
context.writeAndFlush(self.wrapOutboundOut(self.commandEncoder.buffer), promise: nil)
110+
111+
// get response from channel handler
112+
var index = AutoIncrementingInteger()
113+
return try await (repeat (each Command).Response(from: promises[index.next()].get()))
114+
}
115+
116+
@inlinable
117+
func _write(_ command: some RESPCommand, promise: ValkeyPromise<RESPToken>) {
118+
self.eventLoop.assertInEventLoop()
119+
guard let context = self.context else {
120+
preconditionFailure("Trying to use valkey connection before it is setup")
79121
}
122+
123+
self.commandEncoder.reset()
124+
command.encode(into: &self.commandEncoder)
125+
self.commands.append(promise)
126+
context.writeAndFlush(self.wrapOutboundOut(self.commandEncoder.buffer), promise: nil)
80127
}
81128

82129
@usableFromInline
@@ -137,7 +184,3 @@ final class ValkeyChannelHandler: ChannelDuplexHandler {
137184
promise.fail(error)
138185
}
139186
}
140-
141-
// The ValkeyChannelHandler needs to be Sendable so the ValkeyConnection can pass it
142-
// around at initialisation
143-
extension ValkeyChannelHandler: @unchecked Sendable {}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 53 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ public struct ServerAddress: Sendable, Equatable {
4242
}
4343

4444
/// Single connection to a Valkey database
45-
public final class ValkeyConnection: Sendable {
45+
public final actor ValkeyConnection: Sendable {
46+
nonisolated public let unownedExecutor: UnownedSerialExecutor
47+
4648
/// Logger used by Server
4749
let logger: Logger
4850
@usableFromInline
@@ -59,6 +61,7 @@ public final class ValkeyConnection: Sendable {
5961
configuration: ValkeyClientConfiguration,
6062
logger: Logger
6163
) {
64+
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
6265
self.channel = channel
6366
self.channelHandler = channelHandler
6467
self.configuration = configuration
@@ -77,16 +80,17 @@ public final class ValkeyConnection: Sendable {
7780
public static func connect(
7881
address: ServerAddress,
7982
configuration: ValkeyClientConfiguration,
80-
eventLoopGroup: EventLoopGroup = MultiThreadedEventLoopGroup.singleton,
83+
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
8184
logger: Logger
8285
) async throws -> ValkeyConnection {
83-
let (channel, channelHandler) = try await makeClient(
84-
address: address,
85-
eventLoopGroup: eventLoopGroup,
86-
configuration: configuration,
87-
logger: logger
88-
)
89-
let connection = ValkeyConnection(channel: channel, channelHandler: channelHandler, configuration: configuration, logger: logger)
86+
let future = if eventLoop.inEventLoop {
87+
self._makeClient(address: address, eventLoop: eventLoop, configuration: configuration, logger: logger)
88+
} else {
89+
eventLoop.flatSubmit {
90+
self._makeClient(address: address, eventLoop: eventLoop, configuration: configuration, logger: logger)
91+
}
92+
}
93+
let connection = try await future.get()
9094
if configuration.respVersion == .v3 {
9195
try await connection.resp3Upgrade()
9296
}
@@ -109,12 +113,7 @@ public final class ValkeyConnection: Sendable {
109113

110114
@inlinable
111115
public func send<Command: RESPCommand>(command: Command) async throws -> Command.Response {
112-
var encoder = RESPCommandEncoder()
113-
command.encode(into: &encoder)
114-
115-
let promise = channel.eventLoop.makePromise(of: RESPToken.self)
116-
channelHandler.write(request: ValkeyRequest.single(buffer: encoder.buffer, promise: promise))
117-
return try await .init(from: promise.futureResult.get())
116+
try await self.channelHandler.send(command: command)
118117
}
119118

120119
/// Pipeline a series of commands to Valkey connection
@@ -126,90 +125,73 @@ public final class ValkeyConnection: Sendable {
126125
public func pipeline<each Command: RESPCommand>(
127126
_ commands: repeat each Command
128127
) async throws -> (repeat (each Command).Response) {
129-
// this currently allocates a promise for every command. We could collpase this down to one promise
130-
var promises: [EventLoopPromise<RESPToken>] = []
131-
var encoder = RESPCommandEncoder()
132-
for command in repeat each commands {
133-
command.encode(into: &encoder)
134-
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
135-
}
136-
// write directly to channel handler
137-
channelHandler.write(request: ValkeyRequest.multiple(buffer: encoder.buffer, promises: promises))
138-
// get response from channel handler
139-
var index = AutoIncrementingInteger()
140-
return try await (repeat (each Command).Response(from: promises[index.next()].futureResult.get()))
128+
try await self.channelHandler.pipeline(repeat each commands)
141129
}
142130

143131
/// Try to upgrade to RESP3
144132
private func resp3Upgrade() async throws {
145133
_ = try await send(command: HELLO(arguments: .init(protover: 3, auth: nil, clientname: nil)))
146134
}
147135

148-
/// Create Valkey connection and return channel connection is running on and the Valkey channel handler
149-
private static func makeClient(
136+
private static func _makeClient(
150137
address: ServerAddress,
151-
eventLoopGroup: EventLoopGroup,
138+
eventLoop: EventLoop,
152139
configuration: ValkeyClientConfiguration,
153140
logger: Logger
154-
) async throws -> (Channel, ValkeyChannelHandler) {
141+
) -> EventLoopFuture<ValkeyConnection> {
155142
// get bootstrap
156-
let bootstrap: ClientBootstrapProtocol
143+
eventLoop.assertInEventLoop()
144+
145+
let bootstrap: NIOClientTCPBootstrapProtocol
157146
#if canImport(Network)
158-
if let tsBootstrap = createTSBootstrap(eventLoopGroup: eventLoopGroup, tlsOptions: nil) {
147+
if let tsBootstrap = createTSBootstrap(eventLoopGroup: eventLoop, tlsOptions: nil) {
159148
bootstrap = tsBootstrap
160149
} else {
161150
#if os(iOS) || os(tvOS)
162151
self.logger.warning(
163152
"Running BSD sockets on iOS or tvOS is not recommended. Please use NIOTSEventLoopGroup, to run with the Network framework"
164153
)
165154
#endif
166-
bootstrap = self.createSocketsBootstrap(eventLoopGroup: eventLoopGroup)
155+
bootstrap = self.createSocketsBootstrap(eventLoopGroup: eventLoop)
167156
}
168157
#else
169-
bootstrap = self.createSocketsBootstrap(eventLoopGroup: eventLoopGroup)
158+
bootstrap = self.createSocketsBootstrap(eventLoopGroup: eventLoop)
170159
#endif
171160

172-
// connect
173-
let channel: Channel
174-
let channelHandler: ValkeyChannelHandler
175-
do {
176-
switch address.value {
177-
case .hostname(let host, let port):
178-
(channel, channelHandler) =
179-
try await bootstrap
180-
.connect(host: host, port: port) { channel in
181-
setupChannel(channel, configuration: configuration, logger: logger)
182-
}
161+
let connect = bootstrap.channelInitializer { channel in
162+
do {
163+
let sync = channel.pipeline.syncOperations
164+
if case .enable(let sslContext, let tlsServerName) = configuration.tls.base {
165+
try sync.addHandler(NIOSSLClientHandler(context: sslContext, serverHostname: tlsServerName))
166+
}
167+
let valkeyChannelHandler = ValkeyChannelHandler(
168+
eventLoop: channel.eventLoop,
169+
logger: logger
170+
)
171+
try sync.addHandler(valkeyChannelHandler)
172+
return eventLoop.makeSucceededVoidFuture()
173+
} catch {
174+
return eventLoop.makeFailedFuture(error)
175+
}
176+
}
177+
178+
let future: EventLoopFuture<Channel>
179+
switch address.value {
180+
case .hostname(let host, let port):
181+
future = connect.connect(host: host, port: port)
182+
future.whenSuccess { _ in
183183
logger.debug("Client connnected to \(host):\(port)")
184-
case .unixDomainSocket(let path):
185-
(channel, channelHandler) =
186-
try await bootstrap
187-
.connect(unixDomainSocketPath: path) { channel in
188-
setupChannel(channel, configuration: configuration, logger: logger)
189-
}
184+
}
185+
case .unixDomainSocket(let path):
186+
future = connect.connect(unixDomainSocketPath: path)
187+
future.whenSuccess { _ in
190188
logger.debug("Client connnected to socket path \(path)")
191189
}
192-
return (channel, channelHandler)
193-
} catch {
194-
throw error
195190
}
196-
}
197191

198-
private static func setupChannel(
199-
_ channel: Channel,
200-
configuration: ValkeyClientConfiguration,
201-
logger: Logger
202-
) -> EventLoopFuture<(Channel, ValkeyChannelHandler)> {
203-
channel.eventLoop.makeCompletedFuture {
204-
if case .enable(let sslContext, let tlsServerName) = configuration.tls.base {
205-
try channel.pipeline.syncOperations.addHandler(NIOSSLClientHandler(context: sslContext, serverHostname: tlsServerName))
206-
}
207-
let valkeyChannelHandler = ValkeyChannelHandler(
208-
channel: channel,
209-
logger: logger
210-
)
211-
try channel.pipeline.syncOperations.addHandler(valkeyChannelHandler)
212-
return (channel, valkeyChannelHandler)
192+
return future.flatMapThrowing { channel in
193+
let handler = try channel.pipeline.syncOperations.handler(type: ValkeyChannelHandler.self)
194+
return ValkeyConnection(channel: channel, channelHandler: handler, configuration: configuration, logger: logger)
213195
}
214196
}
215197

Sources/Valkey/RESP/RESPCommand.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import NIOCore
1616

1717
/// A RESP command that can be executed on a connection.
1818
public protocol RESPCommand: Sendable {
19-
associatedtype Response: RESPTokenRepresentable = RESPToken
19+
associatedtype Response: RESPTokenRepresentable & Sendable = RESPToken
2020

2121
func encode(into commandEncoder: inout RESPCommandEncoder)
2222
}

Sources/Valkey/RESP/RESPCommandEncoder.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,9 @@ public struct RESPCommandEncoder {
7575
mutating func moveWriterIndex(to index: Int) {
7676
buffer.moveWriterIndex(to: index)
7777
}
78+
79+
@inlinable
80+
mutating func reset() {
81+
self.buffer.clear()
82+
}
7883
}

Sources/Valkey/ValkeyClient.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ extension ValkeyClient {
6767
let valkeyConnection = try await ValkeyConnection.connect(
6868
address: self.serverAddress,
6969
configuration: self.configuration,
70-
eventLoopGroup: self.eventLoopGroup,
70+
eventLoop: self.eventLoopGroup.any(),
7171
logger: logger
7272
)
7373
let value: Value

0 commit comments

Comments
 (0)