Skip to content
Merged
11 changes: 11 additions & 0 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
import NIOConcurrencyHelpers
import NIOCore
import NIOPosix
import NIOSSL
Expand Down Expand Up @@ -46,6 +47,7 @@ public struct ValkeyConnection: Sendable {
let logger: Logger
let channel: Channel
let configuration: ValkeyClientConfiguration
let isClosed: NIOLockedValueBox<Bool>

/// Initialize Client
private init(
Expand All @@ -56,6 +58,7 @@ public struct ValkeyConnection: Sendable {
self.channel = channel
self.configuration = configuration
self.logger = logger
self.isClosed = .init(false)
}

public static func connect(
Expand All @@ -70,6 +73,14 @@ public struct ValkeyConnection: Sendable {
return connection
}

public func close() -> EventLoopFuture<Void> {
if self.isClosed.withLockedValue({ $0 }) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need a compAndExchange here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point missed that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could even use an Atomic for this ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an atomic now. I did look at that initially, but just realised why that wasn't working ValkeyConnection was a struct. I changed it to be a final class which I think is more correct.

return channel.eventLoop.makeSucceededFuture(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save an allocation ;)

Suggested change
return channel.eventLoop.makeSucceededFuture(())
return channel.eventLoop.makeSucceededVoidFuture()

}
self.channel.close(mode: .all, promise: nil)
return self.channel.closeFuture
}

@discardableResult public func send<Command: RESPCommand>(command: Command) async throws -> Command.Response {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use @discardableResult here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need an @inlinable

var encoder = RESPCommandEncoder()
command.encode(into: &encoder)
Comment on lines 110 to 111
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in an ideal case we keep the CommandEncoder in the channel handler and reuse the same one over and over there, so that we can reuse the internal buffer again and again, which should lead to zero allocations in the long term :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works if we can go for handler.write(command: command) here.

Expand Down
10 changes: 9 additions & 1 deletion Sources/Valkey/ValkeyClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ extension ValkeyClient {
eventLoopGroup: self.eventLoopGroup,
logger: logger
)
return try await operation(valkeyConnection)
let value: Value
do {
value = try await operation(valkeyConnection)
} catch {
try? await valkeyConnection.close().get()
throw error
}
try await valkeyConnection.close().get()
return value
}
}