From 8f52163855bea4df320d3ac4da77757d0a46038b Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Mon, 16 Jun 2025 12:53:11 +0300 Subject: [PATCH 1/2] Force slots refresh on MOVED error when using ssubscribe --- lib/DataHandler.ts | 11 ++++++++++- lib/cluster/ClusterSubscriber.ts | 4 ++++ lib/cluster/ClusterSubscriberGroup.ts | 6 +++++- lib/cluster/index.ts | 5 ++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/lib/DataHandler.ts b/lib/DataHandler.ts index 68a8c967..d0e00a8f 100644 --- a/lib/DataHandler.ts +++ b/lib/DataHandler.ts @@ -74,6 +74,11 @@ export default class DataHandler { args: item.command.args, }; + if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) { + this.redis.emit("moved"); + return; + } + this.redis.handleReconnection(err, item); } @@ -119,7 +124,11 @@ export default class DataHandler { case "message": if (this.redis.listeners("message").length > 0) { // Check if there're listeners to avoid unnecessary `toString()`. - this.redis.emit("message", reply[1].toString(), reply[2] ? reply[2].toString() : ''); + this.redis.emit( + "message", + reply[1].toString(), + reply[2] ? reply[2].toString() : "" + ); } this.redis.emit("messageBuffer", reply[1], reply[2]); break; diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index a46e3344..f1865b4c 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -95,6 +95,10 @@ export default class ClusterSubscriber { // Ignore the errors since they're handled in the connection pool. this.subscriber.on("error", noop); + this.subscriber.on("moved", () => { + this.emitter.emit("forceRefresh"); + }); + // Re-subscribe previous channels const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] }; if (lastActiveSubscriber) { diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index 6e7ef4d3..c0ff76da 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -29,7 +29,7 @@ export default class ClusterSubscriberGroup { * * @param cluster */ - constructor(private cluster: Cluster) { + constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) { cluster.on("+node", (redis) => { this._addSubscriber(redis); }); @@ -41,6 +41,10 @@ export default class ClusterSubscriberGroup { cluster.on("refresh", () => { this._refreshSlots(cluster); }); + + cluster.on("forceRefresh", () => { + refreshSlotsCacheCallback(); + }); } /** diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 627f2f82..5957f049 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -110,7 +110,10 @@ class Cluster extends EventEmitter { this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options); if (this.options.shardedSubscribers == true) - this.shardedSubscribers = new ClusterSubscriberGroup(this); + this.shardedSubscribers = new ClusterSubscriberGroup( + this, + this.refreshSlotsCache.bind(this) + ); // validate options if ( From 4462de6bb458d551dead381724def3ed6e6af5ee Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Tue, 17 Jun 2025 11:08:01 +0300 Subject: [PATCH 2/2] enabled existing cluster tests --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 93cb19a0..94d189f2 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "built/" ], "scripts": { - "test": "TS_NODE_TRANSPILE_ONLY=true TS_NODE_LOG_ERROR=true NODE_ENV=test mocha \"test/helpers/**/*.ts\" \"test/unit/**/*.ts\" \"test/functional/**/*.ts\"", + "test": "TS_NODE_TRANSPILE_ONLY=true TS_NODE_LOG_ERROR=true NODE_ENV=test mocha \"test/helpers/**/*.ts\" \"test/unit/**/*.ts\" \"test/functional/**/*.ts\" \"test/cluster/**/*.ts\"", "test-single": "TS_NODE_TRANSPILE_ONLY=true TS_NODE_LOG_ERROR=true NODE_ENV=test mocha \"test/helpers/*.ts\" $1", "lint": "eslint --ext .js,.ts .", "format": "prettier --write \"{,!(node_modules)/**/}*.{js,ts}\"",