Skip to content

Commit 3ef36dc

Browse files
authored
feat: added the option for disabling slots refresh, fixed concurrent calls to refreshSlotsCache (#1972)
* Added the option for disabling slots refresh * Fixed an issue with calling refreshSlotsCache constantly * minor fix * refactored refreshSlotsCache for better consistency * Updated readme
1 parent b47d6f1 commit 3ef36dc

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ cluster.get("foo", (err, res) => {
10071007
state stabilized after a failover, so adding a delay before resending can prevent a ping pong effect.
10081008
- `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node.
10091009
- `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`).
1010-
- `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`).
1010+
- `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`), setting it to a negative value will disable the slots refresh.
10111011
10121012
### Read-write splitting
10131013

lib/cluster/index.ts

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class Cluster extends EventEmitter {
7070
private reconnectTimeout: NodeJS.Timer;
7171
private status: ClusterStatus;
7272
private isRefreshing = false;
73+
private _refreshSlotsCacheCallbacks = [];
7374
public isCluster = true;
7475
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
7576
private _groupsIds: { [key: string]: number } = {};
@@ -168,7 +169,7 @@ class Cluster extends EventEmitter {
168169
}
169170

170171
resetNodesRefreshInterval() {
171-
if (this.slotsTimer) {
172+
if (this.slotsTimer || this.options.slotsRefreshInterval < 0) {
172173
return;
173174
}
174175
const nextRound = () => {
@@ -284,14 +285,13 @@ class Cluster extends EventEmitter {
284285
this.once("close", closeListener);
285286
this.once("close", this.handleCloseEvent.bind(this));
286287

287-
this.refreshSlotsCache(
288-
function (err) {
289-
if (err && err.message === "Failed to refresh slots cache.") {
290-
Redis.prototype.silentEmit.call(this, "error", err);
291-
this.connectionPool.reset([]);
292-
}
293-
}.bind(this)
294-
);
288+
this.refreshSlotsCache((err) => {
289+
if (err && err.message === "Failed to refresh slots cache.") {
290+
Redis.prototype.silentEmit.call(this, "error", err);
291+
this.connectionPool.reset([]);
292+
}
293+
});
294+
295295
this.subscriber.start();
296296

297297
if (this.options.shardedSubscribers) {
@@ -522,27 +522,29 @@ class Cluster extends EventEmitter {
522522
* @memberof Cluster
523523
*/
524524
private refreshSlotsCache(callback?: CallbackFunction<void>): void {
525+
if (typeof callback === "function") {
526+
this._refreshSlotsCacheCallbacks.push(callback);
527+
}
528+
525529
if (this.isRefreshing) {
526-
if (typeof callback === "function") {
527-
process.nextTick(callback);
528-
}
529530
return;
530531
}
531532
this.isRefreshing = true;
532533

533-
const _this = this;
534-
const wrapper = function (error?: Error) {
535-
_this.isRefreshing = false;
536-
if (typeof callback === "function") {
534+
const wrapper = (error?: Error) => {
535+
this.isRefreshing = false;
536+
for (const callback of this._refreshSlotsCacheCallbacks) {
537537
callback(error);
538538
}
539+
540+
this._refreshSlotsCacheCallbacks = [];
539541
};
540542

541543
const nodes = shuffle(this.connectionPool.getNodes());
542544

543545
let lastNodeError = null;
544546

545-
function tryNode(index) {
547+
const tryNode = (index: number) => {
546548
if (index === nodes.length) {
547549
const error = new ClusterAllFailedError(
548550
"Failed to refresh slots cache.",
@@ -553,24 +555,24 @@ class Cluster extends EventEmitter {
553555
const node = nodes[index];
554556
const key = `${node.options.host}:${node.options.port}`;
555557
debug("getting slot cache from %s", key);
556-
_this.getInfoFromNode(node, function (err) {
557-
switch (_this.status) {
558+
this.getInfoFromNode(node, (err) => {
559+
switch (this.status) {
558560
case "close":
559561
case "end":
560562
return wrapper(new Error("Cluster is disconnected."));
561563
case "disconnecting":
562564
return wrapper(new Error("Cluster is disconnecting."));
563565
}
564566
if (err) {
565-
_this.emit("node error", err, key);
567+
this.emit("node error", err, key);
566568
lastNodeError = err;
567569
tryNode(index + 1);
568570
} else {
569-
_this.emit("refresh");
571+
this.emit("refresh");
570572
wrapper();
571573
}
572574
});
573-
}
575+
};
574576

575577
tryNode(0);
576578
}

0 commit comments

Comments
 (0)