Skip to content

Commit b67ba32

Browse files
authored
Transactions (#39)
* Transactions * Add wrapping TransactionCommand to avoid Response conversion * Add RESPToken.convertingWithErrors * transaction returns array of Results * Add tests for pipeline/transaction errors * Redo transactions, to iterate result array * Edits after rebasing renaming of symbols * ValkeyRawResponseCommand
1 parent 0a3614b commit b67ba32

File tree

8 files changed

+545
-1
lines changed

8 files changed

+545
-1
lines changed
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/// NOTE: THIS FILE IS AUTO-GENERATED BY dev/generate-transaction-commands.sh
2+
3+
import NIOCore
4+
5+
extension ValkeyConnection {
6+
7+
@inlinable
8+
public func transaction<C0: ValkeyCommand>(_ c0: C0) async throws -> (Result<C0.Response, Error>) {
9+
guard let responses = try await self.pipeline(MULTI(), ValkeyRawResponseCommand(c0), EXEC()).2.get() else {
10+
throw ValkeyClientError(.transactionAborted)
11+
}
12+
return responses.decodeElementResults()
13+
}
14+
15+
@inlinable
16+
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand>(
17+
_ c0: C0,
18+
_ c1: C1
19+
) async throws -> (Result<C0.Response, Error>, Result<C1.Response, Error>) {
20+
guard let responses = try await self.pipeline(MULTI(), ValkeyRawResponseCommand(c0), ValkeyRawResponseCommand(c1), EXEC()).3.get() else {
21+
throw ValkeyClientError(.transactionAborted)
22+
}
23+
return responses.decodeElementResults()
24+
}
25+
26+
@inlinable
27+
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand>(
28+
_ c0: C0,
29+
_ c1: C1,
30+
_ c2: C2
31+
) async throws -> (Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>) {
32+
guard
33+
let responses = try await self.pipeline(
34+
MULTI(),
35+
ValkeyRawResponseCommand(c0),
36+
ValkeyRawResponseCommand(c1),
37+
ValkeyRawResponseCommand(c2),
38+
EXEC()
39+
).4.get()
40+
else { throw ValkeyClientError(.transactionAborted) }
41+
return responses.decodeElementResults()
42+
}
43+
44+
@inlinable
45+
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand, C3: ValkeyCommand>(
46+
_ c0: C0,
47+
_ c1: C1,
48+
_ c2: C2,
49+
_ c3: C3
50+
) async throws -> (Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>) {
51+
guard
52+
let responses = try await self.pipeline(
53+
MULTI(),
54+
ValkeyRawResponseCommand(c0),
55+
ValkeyRawResponseCommand(c1),
56+
ValkeyRawResponseCommand(c2),
57+
ValkeyRawResponseCommand(c3),
58+
EXEC()
59+
).5.get()
60+
else { throw ValkeyClientError(.transactionAborted) }
61+
return responses.decodeElementResults()
62+
}
63+
64+
@inlinable
65+
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand, C3: ValkeyCommand, C4: ValkeyCommand>(
66+
_ c0: C0,
67+
_ c1: C1,
68+
_ c2: C2,
69+
_ c3: C3,
70+
_ c4: C4
71+
) async throws -> (
72+
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>
73+
) {
74+
guard
75+
let responses = try await self.pipeline(
76+
MULTI(),
77+
ValkeyRawResponseCommand(c0),
78+
ValkeyRawResponseCommand(c1),
79+
ValkeyRawResponseCommand(c2),
80+
ValkeyRawResponseCommand(c3),
81+
ValkeyRawResponseCommand(c4),
82+
EXEC()
83+
).6.get()
84+
else { throw ValkeyClientError(.transactionAborted) }
85+
return responses.decodeElementResults()
86+
}
87+
88+
@inlinable
89+
public func transaction<C0: ValkeyCommand, C1: ValkeyCommand, C2: ValkeyCommand, C3: ValkeyCommand, C4: ValkeyCommand, C5: ValkeyCommand>(
90+
_ c0: C0,
91+
_ c1: C1,
92+
_ c2: C2,
93+
_ c3: C3,
94+
_ c4: C4,
95+
_ c5: C5
96+
) async throws -> (
97+
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
98+
Result<C5.Response, Error>
99+
) {
100+
guard
101+
let responses = try await self.pipeline(
102+
MULTI(),
103+
ValkeyRawResponseCommand(c0),
104+
ValkeyRawResponseCommand(c1),
105+
ValkeyRawResponseCommand(c2),
106+
ValkeyRawResponseCommand(c3),
107+
ValkeyRawResponseCommand(c4),
108+
ValkeyRawResponseCommand(c5),
109+
EXEC()
110+
).7.get()
111+
else { throw ValkeyClientError(.transactionAborted) }
112+
return responses.decodeElementResults()
113+
}
114+
115+
@inlinable
116+
public func transaction<
117+
C0: ValkeyCommand,
118+
C1: ValkeyCommand,
119+
C2: ValkeyCommand,
120+
C3: ValkeyCommand,
121+
C4: ValkeyCommand,
122+
C5: ValkeyCommand,
123+
C6: ValkeyCommand
124+
>(
125+
_ c0: C0,
126+
_ c1: C1,
127+
_ c2: C2,
128+
_ c3: C3,
129+
_ c4: C4,
130+
_ c5: C5,
131+
_ c6: C6
132+
) async throws -> (
133+
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
134+
Result<C5.Response, Error>, Result<C6.Response, Error>
135+
) {
136+
guard
137+
let responses = try await self.pipeline(
138+
MULTI(),
139+
ValkeyRawResponseCommand(c0),
140+
ValkeyRawResponseCommand(c1),
141+
ValkeyRawResponseCommand(c2),
142+
ValkeyRawResponseCommand(c3),
143+
ValkeyRawResponseCommand(c4),
144+
ValkeyRawResponseCommand(c5),
145+
ValkeyRawResponseCommand(c6),
146+
EXEC()
147+
).8.get()
148+
else { throw ValkeyClientError(.transactionAborted) }
149+
return responses.decodeElementResults()
150+
}
151+
152+
@inlinable
153+
public func transaction<
154+
C0: ValkeyCommand,
155+
C1: ValkeyCommand,
156+
C2: ValkeyCommand,
157+
C3: ValkeyCommand,
158+
C4: ValkeyCommand,
159+
C5: ValkeyCommand,
160+
C6: ValkeyCommand,
161+
C7: ValkeyCommand
162+
>(
163+
_ c0: C0,
164+
_ c1: C1,
165+
_ c2: C2,
166+
_ c3: C3,
167+
_ c4: C4,
168+
_ c5: C5,
169+
_ c6: C6,
170+
_ c7: C7
171+
) async throws -> (
172+
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
173+
Result<C5.Response, Error>, Result<C6.Response, Error>, Result<C7.Response, Error>
174+
) {
175+
guard
176+
let responses = try await self.pipeline(
177+
MULTI(),
178+
ValkeyRawResponseCommand(c0),
179+
ValkeyRawResponseCommand(c1),
180+
ValkeyRawResponseCommand(c2),
181+
ValkeyRawResponseCommand(c3),
182+
ValkeyRawResponseCommand(c4),
183+
ValkeyRawResponseCommand(c5),
184+
ValkeyRawResponseCommand(c6),
185+
ValkeyRawResponseCommand(c7),
186+
EXEC()
187+
).9.get()
188+
else { throw ValkeyClientError(.transactionAborted) }
189+
return responses.decodeElementResults()
190+
}
191+
192+
@inlinable
193+
public func transaction<
194+
C0: ValkeyCommand,
195+
C1: ValkeyCommand,
196+
C2: ValkeyCommand,
197+
C3: ValkeyCommand,
198+
C4: ValkeyCommand,
199+
C5: ValkeyCommand,
200+
C6: ValkeyCommand,
201+
C7: ValkeyCommand,
202+
C8: ValkeyCommand
203+
>(
204+
_ c0: C0,
205+
_ c1: C1,
206+
_ c2: C2,
207+
_ c3: C3,
208+
_ c4: C4,
209+
_ c5: C5,
210+
_ c6: C6,
211+
_ c7: C7,
212+
_ c8: C8
213+
) async throws -> (
214+
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
215+
Result<C5.Response, Error>, Result<C6.Response, Error>, Result<C7.Response, Error>, Result<C8.Response, Error>
216+
) {
217+
guard
218+
let responses = try await self.pipeline(
219+
MULTI(),
220+
ValkeyRawResponseCommand(c0),
221+
ValkeyRawResponseCommand(c1),
222+
ValkeyRawResponseCommand(c2),
223+
ValkeyRawResponseCommand(c3),
224+
ValkeyRawResponseCommand(c4),
225+
ValkeyRawResponseCommand(c5),
226+
ValkeyRawResponseCommand(c6),
227+
ValkeyRawResponseCommand(c7),
228+
ValkeyRawResponseCommand(c8),
229+
EXEC()
230+
).10.get()
231+
else { throw ValkeyClientError(.transactionAborted) }
232+
return responses.decodeElementResults()
233+
}
234+
235+
@inlinable
236+
public func transaction<
237+
C0: ValkeyCommand,
238+
C1: ValkeyCommand,
239+
C2: ValkeyCommand,
240+
C3: ValkeyCommand,
241+
C4: ValkeyCommand,
242+
C5: ValkeyCommand,
243+
C6: ValkeyCommand,
244+
C7: ValkeyCommand,
245+
C8: ValkeyCommand,
246+
C9: ValkeyCommand
247+
>(
248+
_ c0: C0,
249+
_ c1: C1,
250+
_ c2: C2,
251+
_ c3: C3,
252+
_ c4: C4,
253+
_ c5: C5,
254+
_ c6: C6,
255+
_ c7: C7,
256+
_ c8: C8,
257+
_ c9: C9
258+
) async throws -> (
259+
Result<C0.Response, Error>, Result<C1.Response, Error>, Result<C2.Response, Error>, Result<C3.Response, Error>, Result<C4.Response, Error>,
260+
Result<C5.Response, Error>, Result<C6.Response, Error>, Result<C7.Response, Error>, Result<C8.Response, Error>, Result<C9.Response, Error>
261+
) {
262+
guard
263+
let responses = try await self.pipeline(
264+
MULTI(),
265+
ValkeyRawResponseCommand(c0),
266+
ValkeyRawResponseCommand(c1),
267+
ValkeyRawResponseCommand(c2),
268+
ValkeyRawResponseCommand(c3),
269+
ValkeyRawResponseCommand(c4),
270+
ValkeyRawResponseCommand(c5),
271+
ValkeyRawResponseCommand(c6),
272+
ValkeyRawResponseCommand(c7),
273+
ValkeyRawResponseCommand(c8),
274+
ValkeyRawResponseCommand(c9),
275+
EXEC()
276+
).11.get()
277+
else { throw ValkeyClientError(.transactionAborted) }
278+
return responses.decodeElementResults()
279+
}
280+
}

Sources/Valkey/RESP/RESPTokenDecodable.swift

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,26 @@ extension RESPToken: RESPTokenDecodable {
2929
try Value(fromRESP: self)
3030
}
3131

32+
/// Convert RESP3Token to a Result containing the type to convert to or any error found while converting
33+
///
34+
/// This function also checks for RESP error types and returns them if found
35+
///
36+
/// - Parameter type: Type to convert to
37+
/// - Returns: Result contaoining either the Value or an error
38+
@usableFromInline
39+
func decodeResult<Value: RESPTokenDecodable>(as type: Value.Type = Value.self) -> Result<Value, Error> {
40+
switch self.identifier {
41+
case .simpleError, .bulkError:
42+
return .failure(ValkeyClientError(.commandError, message: self.errorString.map { String(buffer: $0) }))
43+
default:
44+
do {
45+
return try .success(Value(fromRESP: self))
46+
} catch {
47+
return .failure(error)
48+
}
49+
}
50+
}
51+
3252
@inlinable
3353
public init(fromRESP token: RESPToken) throws {
3454
self = token
@@ -310,6 +330,27 @@ extension RESPToken.Array: RESPTokenDecodable {
310330
var iterator = self.makeIterator()
311331
return try (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self))
312332
}
333+
334+
/// Convert RESP3Token Array to a tuple of values
335+
/// - Parameter as: Tuple of types to convert to
336+
/// - Throws: RESPDecodeError
337+
/// - Returns: Tuple of decoded values
338+
@inlinable
339+
public func decodeElementResults<each Value: RESPTokenDecodable>(
340+
as: (repeat (each Value)).Type = (repeat (each Value)).self
341+
) -> (repeat Result<(each Value), Error>) {
342+
func decodeOptionalRESPToken<T: RESPTokenDecodable>(_ token: RESPToken?, as: T.Type) -> Result<T, Error> {
343+
switch token {
344+
case .some(let value):
345+
return value.decodeResult(as: T.self)
346+
case .none:
347+
// TODO: Fixup error when we have a decoding error
348+
return .failure(RESPParsingError(code: .unexpectedType, buffer: token?.base ?? .init()))
349+
}
350+
}
351+
var iterator = self.makeIterator()
352+
return (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self))
353+
}
313354
}
314355

315356
extension RESPToken.Map: RESPTokenDecodable {

Sources/Valkey/ValkeyClientError.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
2020
case commandError
2121
case subscriptionError
2222
case unsolicitedToken
23+
case transactionAborted
24+
case tokenDoesNotExist
2325
}
2426

2527
fileprivate let value: _Internal
@@ -35,6 +37,10 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
3537
public static var subscriptionError: Self { .init(.subscriptionError) }
3638
/// Received an unsolicited token from the server
3739
public static var unsolicitedToken: Self { .init(.unsolicitedToken) }
40+
/// Transaction was aborted because a watched key was touched
41+
public static var transactionAborted: Self { .init(.transactionAborted) }
42+
/// Expected token to exist. Throw when iterating an array of tokens that is too short
43+
public static var tokenDoesNotExist: Self { .init(.tokenDoesNotExist) }
3844
}
3945

4046
public let errorCode: ErrorCode
@@ -50,6 +56,8 @@ public struct ValkeyClientError: Error, CustomStringConvertible, Equatable {
5056
case .commandError: self.message ?? "Valkey command returned an error"
5157
case .subscriptionError: self.message ?? "Received invalid subscription push event"
5258
case .unsolicitedToken: self.message ?? "Received unsolicited token from Valkey server"
59+
case .transactionAborted: self.message ?? "Transaction was aborted because a watched key was touched"
60+
case .tokenDoesNotExist: self.message ?? "Expected token does not exist."
5361
}
5462
}
5563
}

Sources/Valkey/ValkeyCommand.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,23 @@ extension ValkeyCommand {
3333
/// Default to no keys affected
3434
public var keysAffected: [ValkeyKey] { [] }
3535
}
36+
37+
/// Wrapper for Valkey command that returns the response as a `RESPToken`
38+
@usableFromInline
39+
struct ValkeyRawResponseCommand<Command: ValkeyCommand>: ValkeyCommand {
40+
@usableFromInline
41+
let command: Command
42+
43+
@inlinable
44+
init(_ command: Command) {
45+
self.command = command
46+
}
47+
48+
@usableFromInline
49+
var keysAffected: [ValkeyKey] { command.keysAffected }
50+
51+
@inlinable
52+
func encode(into commandEncoder: inout ValkeyCommandEncoder) {
53+
self.command.encode(into: &commandEncoder)
54+
}
55+
}

0 commit comments

Comments
 (0)