Skip to content

Conversation

@adam-fowler
Copy link
Collaborator

Added ValkeyCommandHandler
This also allows for pipelining all commands
Add test running 100 requests concurrently

@adam-fowler adam-fowler force-pushed the remove-nio-async-channel branch from b20def2 to 70c0d48 Compare April 3, 2025 09:34
Comment on lines 34 to 38
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let message = unwrapOutboundIn(data)
commands.append(message.promise)
context.writeAndFlush(wrapOutboundOut(message.buffer), promise: promise)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ideal case the ValkeyConnection holds a reference to the channel and the ValkeyCommandHandler. This way in order to send a command, the ValkeyConnection doesn't need to write to the ChannelPipeline instead it can hand it directly to the ValkeyCommandHandler. This allows even passing on the original command.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a ChannelHandlerContext to write to the ValkeyCommandHandler.

Comment on lines +77 to +85
do {
try self.decoder.process(buffer: buffer) { token in
self.handleToken(context: context, token: token)
}
} catch let error as RESPParsingError {
self.handleError(context: context, error: error)
} catch {
preconditionFailure("Expected to only get RESPParsingError from the RESPTokenDecoder.")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use this as an example why the protocol should maybe support typed throws and open an issue on NIO?

Comment on lines +19 to +22
enum ValkeyRequest: Sendable {
case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>)
case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>])
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about abstracting it this way?

Suggested change
enum ValkeyRequest: Sendable {
case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>)
case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>])
}
enum ValkeyPromise<Result> {
case nio(EventLoopPromise<Result>)
case async(CheckedContinuation<Result, any Error>)
func succeed(_ result: Result) {
// ...
}
}
enum ValkeyRequest: Sendable {
case single(buffer: ByteBuffer, promise: ValkeyPromise <RESPToken>)
case multiple(buffer: ByteBuffer, promises: [ValkeyPromise<RESPToken>])
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we can save an allocation because for a/a we don't need to go promise -> continuation. We can go straight to continuation. But we can also theoretically offer an ELF api for super perf critical apps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we happy to forego cancellation? Pushing continuations around makes it a lot harder to support cancellation. Avoiding resuming continuations would be a lot harder.

I guess you could close the connection to cancel a command (well as long that is the only command running on that connection).

}

public func close() -> EventLoopFuture<Void> {
if self.isClosed.withLockedValue({ $0 }) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need a compAndExchange here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point missed that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could even use an Atomic for this ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an atomic now. I did look at that initially, but just realised why that wasn't working ValkeyConnection was a struct. I changed it to be a final class which I think is more correct.

Also make ValkeyConnection a final class

public func close() -> EventLoopFuture<Void> {
guard self.isClosed.compareExchange(expected: false, desired: true, successOrdering: .relaxed, failureOrdering: .relaxed).exchanged else {
return channel.eventLoop.makeSucceededFuture(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save an allocation ;)

Suggested change
return channel.eventLoop.makeSucceededFuture(())
return channel.eventLoop.makeSucceededVoidFuture()

return self.channel.closeFuture
}

@discardableResult public func send<Command: RESPCommand>(command: Command) async throws -> Command.Response {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use @discardableResult here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need an @inlinable

Comment on lines 85 to 86
var encoder = RESPCommandEncoder()
command.encode(into: &encoder)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in an ideal case we keep the CommandEncoder in the channel handler and reuse the same one over and over there, so that we can reuse the internal buffer again and again, which should lead to zero allocations in the long term :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works if we can go for handler.write(command: command) here.

private let channel: Channel
private var commands: Deque<EventLoopPromise<RESPToken>>
private var decoder: NIOSingleStepByteToMessageProcessor<RESPTokenDecoder>
private var context: ChannelHandlerContext?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can consider making this !. If it isn't available when you need it, you want to crash anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want something more informative than just a crash though

@adam-fowler adam-fowler merged commit ceb0464 into main Apr 3, 2025
2 checks passed
@adam-fowler adam-fowler deleted the remove-nio-async-channel branch April 3, 2025 18:35
adam-fowler added a commit that referenced this pull request Jul 11, 2025
* Use ChannelHandler to process requests

* resume all the continuations with error when handler removed

* Store promises in handler instead of continuations

* RESP3 upgrade

* Add multiplexing test

* Support pipelined commands

* Merge decoder into ValkeyChannelHandler

* Improve testMultiplexingPipelinedRequests

* Close connection after we have used it

* Use atomic for isClosed

Also make ValkeyConnection a final class

* Write directly to ValkeyChannelHandler

* Just store eventloop

* PR comments
adam-fowler added a commit that referenced this pull request Jul 14, 2025
* Use ChannelHandler to process requests

* resume all the continuations with error when handler removed

* Store promises in handler instead of continuations

* RESP3 upgrade

* Add multiplexing test

* Support pipelined commands

* Merge decoder into ValkeyChannelHandler

* Improve testMultiplexingPipelinedRequests

* Close connection after we have used it

* Use atomic for isClosed

Also make ValkeyConnection a final class

* Write directly to ValkeyChannelHandler

* Just store eventloop

* PR comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants