Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 280 additions & 0 deletions Sources/Valkey/Connection/ValkeyConnection+transactions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/// NOTE: THIS FILE IS AUTO-GENERATED BY dev/generate-transaction-commands.sh

import NIOCore

extension ValkeyConnection {

@inlinable
public func transaction<C0: ValkeyCommand>(_ c0: C0) async throws -> (Result<C0.Response, Error>) {
guard let responses = try await self.pipeline(MULTI(), ValkeyRawResponseCommand(c0), EXEC()).2.get() else {
throw ValkeyClientError(.transactionAborted)
}
return responses.decodeElementResults()
}

@inlinable
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand>(
_ c0: C0,
_ c1: C1
) async throws -> (Result<C0.Response, Error>, Result<C1.Response, Error>) {
guard let responses = try await self.pipeline(MULTI(), ValkeyRawResponseCommand(c0), ValkeyRawResponseCommand(c1), EXEC()).3.get() else {
throw ValkeyClientError(.transactionAborted)
}
return responses.decodeElementResults()
}

@inlinable
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand>(
_ c0: C0,
_ c1: C1,
_ c2: C2
) async throws -> (Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
EXEC()
).4.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand, C3: ValkeyCommand>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3
) async throws -> (Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
EXEC()
).5.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand, C3: ValkeyCommand, C4: ValkeyCommand>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3,
_ c4: C4
) async throws -> (
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>
) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
ValkeyRawResponseCommand(c4),
EXEC()
).6.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand, C3: ValkeyCommand, C4: ValkeyCommand, C5: ValkeyCommand>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3,
_ c4: C4,
_ c5: C5
) async throws -> (
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
Result<C5.Response, Error>
) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
ValkeyRawResponseCommand(c4),
ValkeyRawResponseCommand(c5),
EXEC()
).7.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<
C0: ValkeyCommand,
C1: ValkeyCommand,
C2: ValkeyCommand,
C3: ValkeyCommand,
C4: ValkeyCommand,
C5: ValkeyCommand,
C6: ValkeyCommand
>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3,
_ c4: C4,
_ c5: C5,
_ c6: C6
) async throws -> (
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
Result<C5.Response, Error>, Result<C6.Response, Error>
) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
ValkeyRawResponseCommand(c4),
ValkeyRawResponseCommand(c5),
ValkeyRawResponseCommand(c6),
EXEC()
).8.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<
C0: ValkeyCommand,
C1: ValkeyCommand,
C2: ValkeyCommand,
C3: ValkeyCommand,
C4: ValkeyCommand,
C5: ValkeyCommand,
C6: ValkeyCommand,
C7: ValkeyCommand
>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3,
_ c4: C4,
_ c5: C5,
_ c6: C6,
_ c7: C7
) async throws -> (
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
Result<C5.Response, Error>, Result<C6.Response, Error>, Result<C7.Response, Error>
) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
ValkeyRawResponseCommand(c4),
ValkeyRawResponseCommand(c5),
ValkeyRawResponseCommand(c6),
ValkeyRawResponseCommand(c7),
EXEC()
).9.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<
C0: ValkeyCommand,
C1: ValkeyCommand,
C2: ValkeyCommand,
C3: ValkeyCommand,
C4: ValkeyCommand,
C5: ValkeyCommand,
C6: ValkeyCommand,
C7: ValkeyCommand,
C8: ValkeyCommand
>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3,
_ c4: C4,
_ c5: C5,
_ c6: C6,
_ c7: C7,
_ c8: C8
) async throws -> (
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
Result<C5.Response, Error>, Result<C6.Response, Error>, Result<C7.Response, Error>, Result<C8.Response, Error>
) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
ValkeyRawResponseCommand(c4),
ValkeyRawResponseCommand(c5),
ValkeyRawResponseCommand(c6),
ValkeyRawResponseCommand(c7),
ValkeyRawResponseCommand(c8),
EXEC()
).10.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}

@inlinable
public func transaction<
C0: ValkeyCommand,
C1: ValkeyCommand,
C2: ValkeyCommand,
C3: ValkeyCommand,
C4: ValkeyCommand,
C5: ValkeyCommand,
C6: ValkeyCommand,
C7: ValkeyCommand,
C8: ValkeyCommand,
C9: ValkeyCommand
>(
_ c0: C0,
_ c1: C1,
_ c2: C2,
_ c3: C3,
_ c4: C4,
_ c5: C5,
_ c6: C6,
_ c7: C7,
_ c8: C8,
_ c9: C9
) async throws -> (
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
Result<C5.Response, Error>, Result<C6.Response, Error>, Result<C7.Response, Error>, Result<C8.Response, Error>, Result<C9.Response, Error>
) {
guard
let responses = try await self.pipeline(
MULTI(),
ValkeyRawResponseCommand(c0),
ValkeyRawResponseCommand(c1),
ValkeyRawResponseCommand(c2),
ValkeyRawResponseCommand(c3),
ValkeyRawResponseCommand(c4),
ValkeyRawResponseCommand(c5),
ValkeyRawResponseCommand(c6),
ValkeyRawResponseCommand(c7),
ValkeyRawResponseCommand(c8),
ValkeyRawResponseCommand(c9),
EXEC()
).11.get()
else { throw ValkeyClientError(.transactionAborted) }
return responses.decodeElementResults()
}
}
41 changes: 41 additions & 0 deletions Sources/Valkey/RESP/RESPTokenDecodable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ extension RESPToken: RESPTokenDecodable {
try Value(fromRESP: self)
}

/// Convert RESP3Token to a Result containing the type to convert to or any error found while converting
///
/// This function also checks for RESP error types and returns them if found
///
/// - Parameter type: Type to convert to
/// - Returns: Result contaoining either the Value or an error
@usableFromInline
func decodeResult<Value: RESPTokenDecodable>(as type: Value.Type = Value.self) -> Result<Value, Error> {
switch self.identifier {
case .simpleError, .bulkError:
return .failure(ValkeyClientError(.commandError, message: self.errorString.map { String(buffer: $0) }))
default:
do {
return try .success(Value(fromRESP: self))
} catch {
return .failure(error)
}
}
}

@inlinable
public init(fromRESP token: RESPToken) throws {
self = token
Expand Down Expand Up @@ -310,6 +330,27 @@ extension RESPToken.Array: RESPTokenDecodable {
var iterator = self.makeIterator()
return try (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self))
}

/// Convert RESP3Token Array to a tuple of values
/// - Parameter as: Tuple of types to convert to
/// - Throws: RESPDecodeError
/// - Returns: Tuple of decoded values
@inlinable
public func decodeElementResults<each Value: RESPTokenDecodable>(
as: (repeat (each Value)).Type = (repeat (each Value)).self
) -> (repeat Result<(each Value), Error>) {
func decodeOptionalRESPToken<T: RESPTokenDecodable>(_ token: RESPToken?, as: T.Type) -> Result<T, Error> {
switch token {
case .some(let value):
return value.decodeResult(as: T.self)
case .none:
// TODO: Fixup error when we have a decoding error
return .failure(RESPParsingError(code: .unexpectedType, buffer: token?.base ?? .init()))
}
}
var iterator = self.makeIterator()
return (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self))
}
}

extension RESPToken.Map: RESPTokenDecodable {
Expand Down
8 changes: 8 additions & 0 deletions Sources/Valkey/ValkeyClientError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
case commandError
case subscriptionError
case unsolicitedToken
case transactionAborted
case tokenDoesNotExist
}

fileprivate let value: _Internal
Expand All @@ -35,6 +37,10 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
public static var subscriptionError: Self { .init(.subscriptionError) }
/// Received an unsolicited token from the server
public static var unsolicitedToken: Self { .init(.unsolicitedToken) }
/// Transaction was aborted because a watched key was touched
public static var transactionAborted: Self { .init(.transactionAborted) }
/// Expected token to exist. Throw when iterating an array of tokens that is too short
public static var tokenDoesNotExist: Self { .init(.tokenDoesNotExist) }
}

public let errorCode: ErrorCode
Expand All @@ -50,6 +56,8 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
case .commandError: self.message ?? "Valkey command returned an error"
case .subscriptionError: self.message ?? "Received invalid subscription push event"
case .unsolicitedToken: self.message ?? "Received unsolicited token from Valkey server"
case .transactionAborted: self.message ?? "Transaction was aborted because a watched key was touched"
case .tokenDoesNotExist: self.message ?? "Expected token does not exist."
}
}
}
20 changes: 20 additions & 0 deletions Sources/Valkey/ValkeyCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,23 @@ extension ValkeyCommand {
/// Default to no keys affected
public var keysAffected: [ValkeyKey] { [] }
}

/// Wrapper for Valkey command that returns the response as a `RESPToken`
@usableFromInline
struct ValkeyRawResponseCommand<Command: ValkeyCommand>: ValkeyCommand {
@usableFromInline
let command: Command

@inlinable
init(_ command: Command) {
self.command = command
}

@usableFromInline
var keysAffected: [ValkeyKey] { command.keysAffected }

@inlinable
func encode(into commandEncoder: inout ValkeyCommandEncoder) {
self.command.encode(into: &commandEncoder)
}
}
Loading