Skip to content

Commit ddfc671

Browse files
committed
Add isolated parameter to subscribe functions
1 parent e440522 commit ddfc671

File tree

4 files changed

+29
-13
lines changed

4 files changed

+29
-13
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
127127
}
128128
}
129129

130-
/// Add subscription, and call SUBSCRIBE command
130+
/// Add subscription, and call SUBSCRIBE command if required
131131
func subscribe(
132132
command: some RESPCommand,
133133
continuation: ValkeySubscriptionSequence.Continuation,
@@ -150,8 +150,8 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
150150
}
151151
.map { _ in subscriptionID }
152152

153-
case .doNothing(let subscription):
154-
return self.eventLoop.makeSucceededFuture(subscription.id)
153+
case .doNothing(let subscriptionID):
154+
return self.eventLoop.makeSucceededFuture(subscriptionID)
155155
}
156156
}
157157

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ extension ValkeyConnection {
2727
/// - process: Closure that is called with subscription async sequence
2828
/// - Returns: Return value of closure
2929
@inlinable
30-
public func subscribe<Value>(
30+
public func subscribe<Value: Sendable>(
3131
to channels: String...,
32+
isolation: isolated (any Actor)? = #isolation,
3233
process: (ValkeySubscriptionSequence) async throws -> Value
3334
) async throws -> Value {
3435
try await self.subscribe(to: channels, process: process)
@@ -46,7 +47,11 @@ extension ValkeyConnection {
4647
/// - channels: list of channels to subscribe to
4748
/// - process: Closure that is called with subscription async sequence
4849
/// - Returns: Return value of closure
49-
public func subscribe<Value>(to channels: [String], process: (ValkeySubscriptionSequence) async throws -> Value) async throws -> Value {
50+
public func subscribe<Value: Sendable>(
51+
to channels: [String],
52+
isolation: isolated (any Actor)? = #isolation,
53+
process: (ValkeySubscriptionSequence) async throws -> Value
54+
) async throws -> Value {
5055
let command = SUBSCRIBE(channel: channels)
5156
let (id, stream) = try await subscribe(command: command, filters: channels.map { .channel($0) })
5257
let value: Value
@@ -72,8 +77,9 @@ extension ValkeyConnection {
7277
/// - process: Closure that is called with subscription async sequence
7378
/// - Returns: Return value of closure
7479
@inlinable
75-
public func psubscribe<Value>(
80+
public func psubscribe<Value: Sendable>(
7681
to patterns: String...,
82+
isolation: isolated (any Actor)? = #isolation,
7783
process: (ValkeySubscriptionSequence) async throws -> Value
7884
) async throws -> Value {
7985
try await self.psubscribe(to: patterns, process: process)
@@ -91,7 +97,11 @@ extension ValkeyConnection {
9197
/// - process: Closure that is called with subscription async sequence
9298
/// - Returns: Return value of closure
9399
@inlinable
94-
public func psubscribe<Value>(to patterns: [String], process: (ValkeySubscriptionSequence) async throws -> Value) async throws -> Value {
100+
public func psubscribe<Value: Sendable>(
101+
to patterns: [String],
102+
isolation: isolated (any Actor)? = #isolation,
103+
process: (ValkeySubscriptionSequence) async throws -> Value
104+
) async throws -> Value {
95105
let command = PSUBSCRIBE(pattern: patterns)
96106
let (id, stream) = try await subscribe(command: command, filters: patterns.map { .pattern($0) })
97107
let value: Value
@@ -117,8 +127,9 @@ extension ValkeyConnection {
117127
/// - process: Closure that is called with subscription async sequence
118128
/// - Returns: Return value of closure
119129
@inlinable
120-
public func ssubscribe<Value>(
130+
public func ssubscribe<Value: Sendable>(
121131
to shardchannel: String...,
132+
isolation: isolated (any Actor)? = #isolation,
122133
process: (ValkeySubscriptionSequence) async throws -> Value
123134
) async throws -> Value {
124135
try await self.ssubscribe(to: shardchannel, process: process)
@@ -136,7 +147,11 @@ extension ValkeyConnection {
136147
/// - process: Closure that is called with subscription async sequence
137148
/// - Returns: Return value of closure
138149
@inlinable
139-
public func ssubscribe<Value>(to shardchannel: [String], process: (ValkeySubscriptionSequence) async throws -> Value) async throws -> Value {
150+
public func ssubscribe<Value: Sendable>(
151+
to shardchannel: [String],
152+
isolation: isolated (any Actor)? = #isolation,
153+
process: (ValkeySubscriptionSequence) async throws -> Value
154+
) async throws -> Value {
140155
let command = SSUBSCRIBE(shardchannel: shardchannel)
141156
let (id, stream) = try await subscribe(command: command, filters: shardchannel.map { .shardChannel($0) })
142157
let value: Value

Sources/Valkey/Subscriptions/ValkeySubscriptionCommands.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import DequeModule
1717
/// A subscription command does not return any values instead it pushes messages for each
1818
/// channel/pattern that has been subscribed/unsubscribed to. This struct catches each
1919
/// push notification and at the point we have received all the pushes required it returns
20-
/// the associated value
20+
/// the associated command. This can then be used to indicate the subscribe/unsubscribe has
21+
/// been successful
2122
struct ValkeySubscriptionCommandStack {
2223
struct SubscribeCommand {
2324
var filters: [ValkeySubscriptionFilter]

Sources/Valkey/Subscriptions/ValkeySubscriptions.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ struct ValkeySubscriptions {
9494
}
9595

9696
enum SubscribeAction {
97-
case doNothing(ValkeySubscription)
97+
case doNothing(Int)
9898
case subscribe(ValkeySubscription, [ValkeySubscriptionFilter])
9999
}
100100

@@ -109,12 +109,12 @@ struct ValkeySubscriptions {
109109
let id = Self.getSubscriptionID()
110110
let subscription = ValkeySubscription(id: id, continuation: continuation, filters: filters, logger: self.logger)
111111
subscriptionIDMap[id] = subscription
112-
var action = SubscribeAction.doNothing(subscription)
112+
var action = SubscribeAction.doNothing(id)
113113
for filter in filters {
114114
switch subscriptionMap[filter, default: .init()].add(subscription: subscription) {
115115
case .subscribe:
116116
switch action {
117-
case .doNothing(let subscription):
117+
case .doNothing:
118118
action = .subscribe(subscription, [filter])
119119
case .subscribe(let subscription, var filters):
120120
filters.append(filter)

0 commit comments

Comments
 (0)