-
Notifications
You must be signed in to change notification settings - Fork 15
Remove NIOAsyncChannel #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b20def2 to
70c0d48
Compare
| func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) { | ||
| let message = unwrapOutboundIn(data) | ||
| commands.append(message.promise) | ||
| context.writeAndFlush(wrapOutboundOut(message.buffer), promise: promise) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an ideal case the ValkeyConnection holds a reference to the channel and the ValkeyCommandHandler. This way in order to send a command, the ValkeyConnection doesn't need to write to the ChannelPipeline instead it can hand it directly to the ValkeyCommandHandler. This allows even passing on the original command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need a ChannelHandlerContext to write to the ValkeyCommandHandler.
| do { | ||
| try self.decoder.process(buffer: buffer) { token in | ||
| self.handleToken(context: context, token: token) | ||
| } | ||
| } catch let error as RESPParsingError { | ||
| self.handleError(context: context, error: error) | ||
| } catch { | ||
| preconditionFailure("Expected to only get RESPParsingError from the RESPTokenDecoder.") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use this as an example why the protocol should maybe support typed throws and open an issue on NIO?
| enum ValkeyRequest: Sendable { | ||
| case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>) | ||
| case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>]) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about abstracting it this way?
| enum ValkeyRequest: Sendable { | |
| case single(buffer: ByteBuffer, promise: EventLoopPromise<RESPToken>) | |
| case multiple(buffer: ByteBuffer, promises: [EventLoopPromise<RESPToken>]) | |
| } | |
| enum ValkeyPromise<Result> { | |
| case nio(EventLoopPromise<Result>) | |
| case async(CheckedContinuation<Result, any Error>) | |
| func succeed(_ result: Result) { | |
| // ... | |
| } | |
| } | |
| enum ValkeyRequest: Sendable { | |
| case single(buffer: ByteBuffer, promise: ValkeyPromise <RESPToken>) | |
| case multiple(buffer: ByteBuffer, promises: [ValkeyPromise<RESPToken>]) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way we can save an allocation because for a/a we don't need to go promise -> continuation. We can go straight to continuation. But we can also theoretically offer an ELF api for super perf critical apps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we happy to forego cancellation? Pushing continuations around makes it a lot harder to support cancellation. Avoiding resuming continuations would be a lot harder.
I guess you could close the connection to cancel a command (well as long that is the only command running on that connection).
| } | ||
|
|
||
| public func close() -> EventLoopFuture<Void> { | ||
| if self.isClosed.withLockedValue({ $0 }) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need a compAndExchange here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point missed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could even use an Atomic for this ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an atomic now. I did look at that initially, but just realised why that wasn't working ValkeyConnection was a struct. I changed it to be a final class which I think is more correct.
Also make ValkeyConnection a final class
|
|
||
| public func close() -> EventLoopFuture<Void> { | ||
| guard self.isClosed.compareExchange(expected: false, desired: true, successOrdering: .relaxed, failureOrdering: .relaxed).exchanged else { | ||
| return channel.eventLoop.makeSucceededFuture(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Save an allocation ;)
| return channel.eventLoop.makeSucceededFuture(()) | |
| return channel.eventLoop.makeSucceededVoidFuture() |
| return self.channel.closeFuture | ||
| } | ||
|
|
||
| @discardableResult public func send<Command: RESPCommand>(command: Command) async throws -> Command.Response { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use @discardableResult here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will need an @inlinable
| var encoder = RESPCommandEncoder() | ||
| command.encode(into: &encoder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in an ideal case we keep the CommandEncoder in the channel handler and reuse the same one over and over there, so that we can reuse the internal buffer again and again, which should lead to zero allocations in the long term :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only works if we can go for handler.write(command: command) here.
| private let channel: Channel | ||
| private var commands: Deque<EventLoopPromise<RESPToken>> | ||
| private var decoder: NIOSingleStepByteToMessageProcessor<RESPTokenDecoder> | ||
| private var context: ChannelHandlerContext? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can consider making this !. If it isn't available when you need it, you want to crash anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want something more informative than just a crash though
* Use ChannelHandler to process requests * resume all the continuations with error when handler removed * Store promises in handler instead of continuations * RESP3 upgrade * Add multiplexing test * Support pipelined commands * Merge decoder into ValkeyChannelHandler * Improve testMultiplexingPipelinedRequests * Close connection after we have used it * Use atomic for isClosed Also make ValkeyConnection a final class * Write directly to ValkeyChannelHandler * Just store eventloop * PR comments
* Use ChannelHandler to process requests * resume all the continuations with error when handler removed * Store promises in handler instead of continuations * RESP3 upgrade * Add multiplexing test * Support pipelined commands * Merge decoder into ValkeyChannelHandler * Improve testMultiplexingPipelinedRequests * Close connection after we have used it * Use atomic for isClosed Also make ValkeyConnection a final class * Write directly to ValkeyChannelHandler * Just store eventloop * PR comments
Added ValkeyCommandHandler
This also allows for pipelining all commands
Add test running 100 requests concurrently