Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/DataHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ export default class DataHandler {
args: item.command.args,
};

if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) {
this.redis.emit("moved");
return;
}

this.redis.handleReconnection(err, item);
}

Expand Down
4 changes: 4 additions & 0 deletions lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ export default class ClusterSubscriber {
// Ignore the errors since they're handled in the connection pool.
this.subscriber.on("error", noop);

this.subscriber.on("moved", () => {
this.emitter.emit("forceRefresh");
});

// The node we lost connection to may not come back up in a
// reasonable amount of time (e.g. a slave that's taken down
// for maintainence), we could potentially miss many published
Expand Down
6 changes: 5 additions & 1 deletion lib/cluster/ClusterSubscriberGroup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export default class ClusterSubscriberGroup {
*
* @param cluster
*/
constructor(private cluster: Cluster) {
constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) {

cluster.on("+node", (redis) => {
this._addSubscriber(redis);
Expand All @@ -44,6 +44,10 @@ export default class ClusterSubscriberGroup {
cluster.on("refresh", () => {
this._refreshSlots(cluster);
});

cluster.on("forceRefresh", () => {
refreshSlotsCacheCallback();
});
}


Expand Down
2 changes: 1 addition & 1 deletion lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class Cluster extends Commander {
this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);

if (this.options.shardedSubscribers == true)
this.shardedSubscribers = new ClusterSubscriberGroup(this);
this.shardedSubscribers = new ClusterSubscriberGroup(this, this.refreshSlotsCache.bind(this));

if (
this.options.redisOptions &&
Expand Down
Loading