Skip to content

Commit 7b03bdd

Browse files
authored
Remove NIOAsyncChannel (#8)
* Use ChannelHandler to process requests * resume all the continuations with error when handler removed * Store promises in handler instead of continuations * RESP3 upgrade * Add multiplexing test * Support pipelined commands * Merge decoder into ValkeyChannelHandler * Improve testMultiplexingPipelinedRequests * Close connection after we have used it * Use atomic for isClosed Also make ValkeyConnection a final class * Write directly to ValkeyChannelHandler * Just store eventloop * PR comments
1 parent 6643490 commit 7b03bdd

File tree

6 files changed

+314
-190
lines changed

6 files changed

+314
-190
lines changed

Package.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import PackageDescription
55

66
let package = Package(
77
name: "swift-valkey",
8-
platforms: [.macOS(.v13)],
8+
platforms: [.macOS(.v15)],
99
products: [
1010
.library(name: "Valkey", targets: ["Valkey"])
1111
],
1212
dependencies: [
13+
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"),
1314
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
1415
.package(url: "https://github.com/apple/swift-nio.git", from: "2.79.0"),
1516
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"),
@@ -19,6 +20,7 @@ let package = Package(
1920
.target(
2021
name: "Valkey",
2122
dependencies: [
23+
.product(name: "DequeModule", package: "swift-collections"),
2224
.product(name: "Logging", package: "swift-log"),
2325
.product(name: "NIOCore", package: "swift-nio"),
2426
.product(name: "NIOPosix", package: "swift-nio"),
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-valkey project
4+
//
5+
// Copyright (c) 2025 the swift-valkey authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See swift-valkey/CONTRIBUTORS.txt for the list of swift-valkey authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import DequeModule
16+
import Logging
17+
import NIOCore
18+
19+
@usableFromInline
20+
enum ValkeyRequest: Sendable {
21+
case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>)
22+
case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>])
23+
}
24+
25+
@usableFromInline
26+
final class ValkeyChannelHandler: ChannelDuplexHandler {
27+
@usableFromInline
28+
typealias OutboundIn = ValkeyRequest
29+
@usableFromInline
30+
typealias OutboundOut = ByteBuffer
31+
@usableFromInline
32+
typealias InboundIn = ByteBuffer
33+
34+
@usableFromInline
35+
let eventLoop: EventLoop
36+
private var commands: Deque<EventLoopPromise<RESPToken>>
37+
private var decoder: NIOSingleStepByteToMessageProcessor<RESPTokenDecoder>
38+
private var context: ChannelHandlerContext?
39+
private let logger: Logger
40+
41+
init(channel: Channel, logger: Logger) {
42+
self.eventLoop = channel.eventLoop
43+
self.commands = .init()
44+
self.decoder = NIOSingleStepByteToMessageProcessor(RESPTokenDecoder())
45+
self.context = nil
46+
self.logger = logger
47+
}
48+
49+
/// Write valkey command/commands to channel
50+
/// - Parameters:
51+
/// - request: Valkey command request
52+
/// - promise: Promise to fulfill when command is complete
53+
@inlinable
54+
func write(request: ValkeyRequest) {
55+
if self.eventLoop.inEventLoop {
56+
self._write(request: request)
57+
} else {
58+
eventLoop.execute {
59+
self._write(request: request)
60+
}
61+
}
62+
}
63+
64+
@usableFromInline
65+
func _write(request: ValkeyRequest) {
66+
guard let context = self.context else {
67+
preconditionFailure("Trying to use valkey connection before it is setup")
68+
}
69+
switch request {
70+
case .single(let buffer, let tokenPromise):
71+
self.commands.append(tokenPromise)
72+
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
73+
74+
case .multiple(let buffer, let tokenPromises):
75+
for tokenPromise in tokenPromises {
76+
self.commands.append(tokenPromise)
77+
}
78+
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
79+
}
80+
}
81+
82+
@usableFromInline
83+
func handlerAdded(context: ChannelHandlerContext) {
84+
self.context = context
85+
}
86+
87+
@usableFromInline
88+
func handlerRemoved(context: ChannelHandlerContext) {
89+
self.context = nil
90+
while let promise = commands.popFirst() {
91+
promise.fail(ValkeyClientError.init(.connectionClosed))
92+
}
93+
}
94+
95+
@usableFromInline
96+
func channelInactive(context: ChannelHandlerContext) {
97+
do {
98+
try self.decoder.finishProcessing(seenEOF: true) { token in
99+
self.handleToken(context: context, token: token)
100+
}
101+
} catch let error as RESPParsingError {
102+
self.handleError(context: context, error: error)
103+
} catch {
104+
preconditionFailure("Expected to only get RESPParsingError from the RESPTokenDecoder.")
105+
}
106+
107+
self.logger.trace("Channel inactive.")
108+
}
109+
110+
@usableFromInline
111+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
112+
let buffer = self.unwrapInboundIn(data)
113+
114+
do {
115+
try self.decoder.process(buffer: buffer) { token in
116+
self.handleToken(context: context, token: token)
117+
}
118+
} catch let error as RESPParsingError {
119+
self.handleError(context: context, error: error)
120+
} catch {
121+
preconditionFailure("Expected to only get RESPParsingError from the RESPTokenDecoder.")
122+
}
123+
}
124+
125+
func handleToken(context: ChannelHandlerContext, token: RESPToken) {
126+
guard let promise = commands.popFirst() else {
127+
preconditionFailure("Unexpected response")
128+
}
129+
promise.succeed(token)
130+
}
131+
132+
func handleError(context: ChannelHandlerContext, error: Error) {
133+
self.logger.debug("ValkeyCommandHandler: ERROR \(error)")
134+
guard let promise = commands.popFirst() else {
135+
preconditionFailure("Unexpected response")
136+
}
137+
promise.fail(error)
138+
}
139+
}
140+
141+
// The ValkeyChannelHandler needs to be Sendable so the ValkeyConnection can pass it
142+
// around at initialisation
143+
extension ValkeyChannelHandler: @unchecked Sendable {}

0 commit comments

Comments
 (0)