Skip to content

Commit 1455236

Browse files
Force slots refresh on MOVED error when using ssubscribe
1 parent 40ae7ee commit 1455236

File tree

4 files changed

+15
-2
lines changed

4 files changed

+15
-2
lines changed

lib/DataHandler.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ export default class DataHandler {
8080
args: item.command.args,
8181
};
8282

83+
if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) {
84+
this.redis.emit("moved");
85+
return;
86+
}
87+
8388
this.redis.handleReconnection(err, item);
8489
}
8590

lib/cluster/ClusterSubscriber.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ export default class ClusterSubscriber {
160160
// Ignore the errors since they're handled in the connection pool.
161161
this.subscriber.on("error", noop);
162162

163+
this.subscriber.on("moved", () => {
164+
this.emitter.emit("forceRefresh");
165+
});
166+
163167
// The node we lost connection to may not come back up in a
164168
// reasonable amount of time (e.g. a slave that's taken down
165169
// for maintainence), we could potentially miss many published

lib/cluster/ClusterSubscriberGroup.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export default class ClusterSubscriberGroup {
3131
*
3232
* @param cluster
3333
*/
34-
constructor(private cluster: Cluster) {
34+
constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) {
3535

3636
cluster.on("+node", (redis) => {
3737
this._addSubscriber(redis);
@@ -44,6 +44,10 @@ export default class ClusterSubscriberGroup {
4444
cluster.on("refresh", () => {
4545
this._refreshSlots(cluster);
4646
});
47+
48+
cluster.on("forceRefresh", () => {
49+
refreshSlotsCacheCallback();
50+
});
4751
}
4852

4953

lib/cluster/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class Cluster extends Commander {
126126
this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);
127127

128128
if (this.options.shardedSubscribers == true)
129-
this.shardedSubscribers = new ClusterSubscriberGroup(this);
129+
this.shardedSubscribers = new ClusterSubscriberGroup(this, this.refreshSlotsCache.bind(this));
130130

131131
if (
132132
this.options.redisOptions &&

0 commit comments

Comments
 (0)