Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 45 additions & 65 deletions Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,6 @@ extension CLUSTER.SLOTS {
public typealias Response = [ValkeyClusterSlotRange]
}

package struct ValkeyClusterParseError: Error, Equatable {
package enum Reason: Error {
case clusterDescriptionTokenIsNotAnArray
case shardTokenIsNotAnArrayOrMap
case nodesTokenIsNotAnArray
case nodeTokenIsNotAnArrayOrMap
case slotsTokenIsNotAnArray
case invalidNodeRole
case invalidNodeHealth
case missingRequiredValueForNode
case shardIsMissingHashSlots
case shardIsMissingNode
}

package var reason: Reason
package var token: RESPToken

package init(reason: Reason, token: RESPToken) {
self.reason = reason
self.token = token
}
}

/// A description of a Valkey cluster.
///
/// A description is return when you call ``ValkeyClientProtocol/clusterShards()``.
Expand Down Expand Up @@ -204,11 +181,7 @@ public struct ValkeyClusterDescription: Hashable, Sendable, RESPTokenDecodable {
/// Creates a cluster description from the response token you provide.
/// - Parameter respToken: The response token.
public init(fromRESP respToken: RESPToken) throws {
do {
self = try Self.makeClusterDescription(respToken: respToken)
} catch {
throw ValkeyClusterParseError(reason: error, token: respToken)
}
self = try Self.makeClusterDescription(respToken: respToken)
}

/// Creates a cluster description from a list of shards you provide.
Expand Down Expand Up @@ -611,28 +584,21 @@ public struct ValkeyClusterSlotRange: Hashable, Sendable, RESPTokenDecodable {
}

extension ValkeyClusterDescription {
fileprivate static func makeClusterDescription(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterDescription {
fileprivate static func makeClusterDescription(respToken: RESPToken) throws(RESPDecodeError) -> ValkeyClusterDescription {
guard case .array(let shardsToken) = respToken.value else {
throw .clusterDescriptionTokenIsNotAnArray
throw RESPDecodeError.tokenMismatch(expected: [.array], token: respToken)
}
let shards = try shardsToken.map { shardToken throws(ValkeyClusterParseError.Reason) in
let shards = try shardsToken.map { shardToken throws(RESPDecodeError) in
try ValkeyClusterDescription.Shard(shardToken)
}
return ValkeyClusterDescription(shards)
}
}

extension HashSlots {
fileprivate init(_ iterator: inout RESPToken.Array.Iterator) throws(ValkeyClusterParseError.Reason) {
guard let token = iterator.next() else {
throw .slotsTokenIsNotAnArray
}
self = try HashSlots(token)
}

fileprivate init(_ token: RESPToken) throws(ValkeyClusterParseError.Reason) {
fileprivate init(_ token: RESPToken) throws(RESPDecodeError) {
guard case .array(let array) = token.value else {
throw .slotsTokenIsNotAnArray
throw RESPDecodeError.tokenMismatch(expected: [.array], token: token)
}

var slotRanges = [ClosedRange<HashSlot>]()
Expand All @@ -648,31 +614,25 @@ extension HashSlots {
slotRanges.append(ClosedRange<HashSlot>(uncheckedBounds: (start, end)))
}

if slotRanges.isEmpty { throw RESPDecodeError.invalidArraySize(array, minExpectedSize: 1) }
self = slotRanges
}
}

extension [ValkeyClusterDescription.Node] {
fileprivate init(_ iterator: inout RESPToken.Array.Iterator) throws(ValkeyClusterParseError.Reason) {
guard let token = iterator.next() else {
throw .nodesTokenIsNotAnArray
}
self = try Self(token)
}

fileprivate init(_ token: RESPToken) throws(ValkeyClusterParseError.Reason) {
fileprivate init(_ token: RESPToken) throws(RESPDecodeError) {
guard case .array(let array) = token.value else {
throw .nodesTokenIsNotAnArray
throw RESPDecodeError.tokenMismatch(expected: [.array], token: token)
}

self = try array.map { token throws(ValkeyClusterParseError.Reason) in
self = try array.map { token throws(RESPDecodeError) in
try ValkeyClusterDescription.Node(token)
}
}
}

extension ValkeyClusterDescription.Shard {
fileprivate init(_ token: RESPToken) throws(ValkeyClusterParseError.Reason) {
fileprivate init(_ token: RESPToken) throws(RESPDecodeError) {
switch token.value {
case .array(let array):
self = try Self.makeFromTokenSequence(MapStyleArray(underlying: array))
Expand All @@ -688,13 +648,13 @@ extension ValkeyClusterDescription.Shard {
self = try Self.makeFromTokenSequence(mapped)

default:
throw ValkeyClusterParseError.Reason.shardTokenIsNotAnArrayOrMap
throw RESPDecodeError.tokenMismatch(expected: [.array, .map], token: token)
}
}

fileprivate static func makeFromTokenSequence<TokenSequence: Sequence>(
_ sequence: TokenSequence
) throws(ValkeyClusterParseError.Reason) -> Self where TokenSequence.Element == (String, RESPToken) {
) throws(RESPDecodeError) -> Self where TokenSequence.Element == (String, RESPToken) {
var slotRanges = HashSlots()
var nodes: [ValkeyClusterDescription.Node] = []

Expand All @@ -711,18 +671,24 @@ extension ValkeyClusterDescription.Shard {
}
}

if nodes.isEmpty { throw .shardIsMissingNode }
if slotRanges.isEmpty { throw .shardIsMissingHashSlots }

return .init(slots: slotRanges, nodes: nodes)
}
}

extension ValkeyClusterDescription.Node {
fileprivate init(_ token: RESPToken) throws(ValkeyClusterParseError.Reason) {
fileprivate init(_ token: RESPToken) throws(RESPDecodeError) {
switch token.value {
case .array(let array):
self = try Self.makeFromTokenSequence(MapStyleArray(underlying: array))
do {
self = try Self.makeFromTokenSequence(MapStyleArray(underlying: array))
} catch {
switch error {
case .decodeError(let error):
throw error
case .missingRequiredValue:
throw RESPDecodeError(.missingToken, token: token, message: "Missing required token for Node")
}
}

case .map(let map):
let mapped = map.lazy.compactMap { (keyNode, value) -> (String, RESPToken)? in
Expand All @@ -732,16 +698,30 @@ extension ValkeyClusterDescription.Node {
return nil
}
}
self = try Self.makeFromTokenSequence(mapped)
do {
self = try Self.makeFromTokenSequence(mapped)
} catch {
switch error {
case .decodeError(let error):
throw error
case .missingRequiredValue:
throw RESPDecodeError(.missingToken, token: token, message: "Missing required token for Node")
}
}

default:
throw .nodeTokenIsNotAnArrayOrMap
throw RESPDecodeError.tokenMismatch(expected: [.array, .map], token: token)
}
}

fileprivate enum TokenSequenceError: Error {
case decodeError(RESPDecodeError)
case missingRequiredValue
}

fileprivate static func makeFromTokenSequence<TokenSequence: Sequence>(
_ sequence: TokenSequence
) throws(ValkeyClusterParseError.Reason) -> Self where TokenSequence.Element == (String, RESPToken) {
) throws(TokenSequenceError) -> Self where TokenSequence.Element == (String, RESPToken) {
var id: String?
var port: Int64?
var tlsPort: Int64?
Expand Down Expand Up @@ -769,7 +749,7 @@ extension ValkeyClusterDescription.Node {
endpoint = try? String(fromRESP: nodeVal)
case "role":
guard let roleString = try? String(fromRESP: nodeVal), let roleValue = ValkeyClusterDescription.Node.Role(rawValue: roleString) else {
throw .invalidNodeRole
throw .decodeError(RESPDecodeError(.unexpectedToken, token: nodeVal, message: "Invalid Role String"))
}
role = roleValue

Expand All @@ -779,7 +759,7 @@ extension ValkeyClusterDescription.Node {
guard let healthString = try? String(fromRESP: nodeVal),
let healthValue = ValkeyClusterDescription.Node.Health(rawValue: healthString)
else {
throw .invalidNodeHealth
throw .decodeError(RESPDecodeError(.unexpectedToken, token: nodeVal, message: "Invalid Node Health String"))
}
health = healthValue

Expand All @@ -791,12 +771,12 @@ extension ValkeyClusterDescription.Node {
guard let id = id, let ip = ip, let endpoint = endpoint, let role = role,
let replicationOffset = replicationOffset, let health = health
else {
throw .missingRequiredValueForNode
throw .missingRequiredValue
}

// we need at least port or tlsport
if port == nil && tlsPort == nil {
throw .missingRequiredValueForNode
throw .missingRequiredValue
}

return ValkeyClusterDescription.Node(
Expand Down
11 changes: 10 additions & 1 deletion Sources/Valkey/RESP/RESPDecodeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//
/// Error returned when decoding a RESPToken.
/// Error thrown when decoding RESPTokens
public struct RESPDecodeError: Error {
public struct RESPDecodeError: Error, Equatable {
/// Error code for decode error
public struct ErrorCode: Sendable, Equatable, CustomStringConvertible {
fileprivate enum Code: Sendable, Equatable {
Expand Down Expand Up @@ -81,6 +81,15 @@ public struct RESPDecodeError: Error {
message: message
)
}
/// Does not match the expected array size
public static func invalidArraySize(_ token: RESPToken, expectedSize: Int? = nil, minExpectedSize: Int? = nil) -> Self {
switch token.value {
case .array(let array):
return invalidArraySize(array, expectedSize: expectedSize, minExpectedSize: minExpectedSize)
default:
return .tokenMismatch(expected: [.array], token: token)
}
}
/// Token associated with key is missing
public static func missingToken(key: String, token: RESPToken) -> Self {
.init(.missingToken, token: token, message: "Expected map to contain token with key \"\(key)\"")
Expand Down
10 changes: 5 additions & 5 deletions Tests/ValkeyTests/Cluster/ValkeyClusterDescriptionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ struct ValkeyClusterDescriptionTests {
])
let token = RESPToken(val)

#expect(throws: ValkeyClusterParseError(reason: .invalidNodeHealth, token: token)) {
#expect(throws: RESPDecodeError(.unexpectedToken, token: .init(.bulkString("invalid-health-state")), message: "Invalid Node Health String")) {
_ = try ValkeyClusterDescription(fromRESP: token)
}
}
Expand All @@ -112,7 +112,7 @@ struct ValkeyClusterDescriptionTests {
func testSlotsAreNotAnArray() throws {
// Non-array token for cluster description
let singleValueToken = RESPToken(RESP3Value.bulkString("not-an-array"))
#expect(throws: ValkeyClusterParseError.self) {
#expect(throws: RESPDecodeError.tokenMismatch(expected: [.array], token: .init(.bulkString("not-an-array")))) {
_ = try ValkeyClusterDescription(fromRESP: singleValueToken)
}

Expand Down Expand Up @@ -145,7 +145,7 @@ struct ValkeyClusterDescriptionTests {
])
)

#expect(throws: ValkeyClusterParseError(reason: .slotsTokenIsNotAnArray, token: invalidSlotsToken)) {
#expect(throws: RESPDecodeError.tokenMismatch(expected: [.array], token: .init(.bulkString("not-an-array")))) {
try ValkeyClusterDescription(fromRESP: invalidSlotsToken)
}

Expand All @@ -161,7 +161,7 @@ struct ValkeyClusterDescriptionTests {
])
)

#expect(throws: ValkeyClusterParseError(reason: .nodesTokenIsNotAnArray, token: invalidNodesToken)) {
#expect(throws: RESPDecodeError.tokenMismatch(expected: [.array], token: .init(.bulkString("not-an-array")))) {
_ = try ValkeyClusterDescription(fromRESP: invalidNodesToken)
}
}
Expand Down Expand Up @@ -197,7 +197,7 @@ struct ValkeyClusterDescriptionTests {
let token = RESPToken(valWithMultipleErrors)

// The error we expect to see first is the invalid role
#expect(throws: ValkeyClusterParseError(reason: .invalidNodeRole, token: token)) {
#expect(throws: RESPDecodeError(.unexpectedToken, token: .init(.bulkString("invalid-role")), message: "Invalid Role String")) {
_ = try ValkeyClusterDescription(fromRESP: token)
}
}
Expand Down
Loading