Skip to content

Commit 8f52163

Browse files
Force slots refresh on MOVED error when using ssubscribe
1 parent e451c62 commit 8f52163

File tree

4 files changed

+23
-3
lines changed

4 files changed

+23
-3
lines changed

lib/DataHandler.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ export default class DataHandler {
7474
args: item.command.args,
7575
};
7676

77+
if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) {
78+
this.redis.emit("moved");
79+
return;
80+
}
81+
7782
this.redis.handleReconnection(err, item);
7883
}
7984

@@ -119,7 +124,11 @@ export default class DataHandler {
119124
case "message":
120125
if (this.redis.listeners("message").length > 0) {
121126
// Check if there're listeners to avoid unnecessary `toString()`.
122-
this.redis.emit("message", reply[1].toString(), reply[2] ? reply[2].toString() : '');
127+
this.redis.emit(
128+
"message",
129+
reply[1].toString(),
130+
reply[2] ? reply[2].toString() : ""
131+
);
123132
}
124133
this.redis.emit("messageBuffer", reply[1], reply[2]);
125134
break;

lib/cluster/ClusterSubscriber.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ export default class ClusterSubscriber {
9595
// Ignore the errors since they're handled in the connection pool.
9696
this.subscriber.on("error", noop);
9797

98+
this.subscriber.on("moved", () => {
99+
this.emitter.emit("forceRefresh");
100+
});
101+
98102
// Re-subscribe previous channels
99103
const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
100104
if (lastActiveSubscriber) {

lib/cluster/ClusterSubscriberGroup.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export default class ClusterSubscriberGroup {
2929
*
3030
* @param cluster
3131
*/
32-
constructor(private cluster: Cluster) {
32+
constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) {
3333
cluster.on("+node", (redis) => {
3434
this._addSubscriber(redis);
3535
});
@@ -41,6 +41,10 @@ export default class ClusterSubscriberGroup {
4141
cluster.on("refresh", () => {
4242
this._refreshSlots(cluster);
4343
});
44+
45+
cluster.on("forceRefresh", () => {
46+
refreshSlotsCacheCallback();
47+
});
4448
}
4549

4650
/**

lib/cluster/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ class Cluster extends EventEmitter {
110110
this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);
111111

112112
if (this.options.shardedSubscribers == true)
113-
this.shardedSubscribers = new ClusterSubscriberGroup(this);
113+
this.shardedSubscribers = new ClusterSubscriberGroup(
114+
this,
115+
this.refreshSlotsCache.bind(this)
116+
);
114117

115118
// validate options
116119
if (

0 commit comments

Comments
 (0)