Skip to content

Commit ff2c1dd

Browse files
cluster slots fix
1 parent b47b88b commit ff2c1dd

File tree

3 files changed

+35
-37
lines changed

3 files changed

+35
-37
lines changed

js/ioredis/lib/publisher.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ async function publisherRoutine(
88
client,
99
isRunningRef,
1010
totalMessagesRef,
11-
rateLimiter
11+
rateLimiter,
12+
skipDuplicate = false
1213
) {
1314
if (verbose) {
1415
console.log(
@@ -32,7 +33,8 @@ async function publisherRoutine(
3233
paddingPayload = 'A'.repeat(dataSize);
3334
}
3435

35-
const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher
36+
// For cluster node clients, don't duplicate to preserve cluster routing
37+
const duplicatedClient = skipDuplicate ? client : client.duplicate();
3638

3739
try {
3840
if (measureRTT) {

js/ioredis/lib/redisManager.js

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -85,53 +85,41 @@ async function runBenchmark(argv) {
8585
// Get the cluster slots mapping to determine which node serves which slots
8686
const slotsMapping = await cluster.cluster('SLOTS');
8787

88-
// Build a map from "host:port" to the actual node client
89-
const nodeMap = new Map();
90-
for (const node of nodes) {
91-
const key = `${node.options.host}:${node.options.port}`;
92-
nodeMap.set(key, node);
93-
clients.push(node);
94-
}
95-
9688
// slotsMapping format: [[startSlot, endSlot, [host, port, nodeId], ...], ...]
97-
// For each slot range, map slots to the corresponding node client
89+
// For each slot range, create a direct standalone connection to the master node
90+
const nodeClientsMap = new Map(); // Map from "host:port" to standalone client
91+
9892
for (const slotRange of slotsMapping) {
9993
const startSlot = slotRange[0];
10094
const endSlot = slotRange[1];
10195
const masterInfo = slotRange[2]; // [host, port, nodeId]
102-
const host = masterInfo[0];
96+
const host = masterInfo[0]; // Use internal IP from CLUSTER SLOTS
10397
const port = masterInfo[1];
10498

105-
// Find the node client for this host:port
99+
// Create or reuse a standalone client for this node using internal IP
106100
const nodeKey = `${host}:${port}`;
107-
let nodeClient = nodeMap.get(nodeKey);
108-
109-
if (!nodeClient) {
110-
// If not found by exact match, try to find by port only
111-
// (useful when cluster returns internal IPs but we connect via external IP)
112-
for (const [key, client] of nodeMap.entries()) {
113-
if (key.endsWith(`:${port}`)) {
114-
nodeClient = client;
115-
console.log(`Matched node ${nodeKey} to ${key} by port`);
116-
break;
117-
}
118-
}
119-
}
120-
101+
let nodeClient = nodeClientsMap.get(nodeKey);
121102
if (!nodeClient) {
122-
console.warn(`Warning: No node client found for ${nodeKey}, using first available node`);
123-
nodeClient = nodes[0];
103+
console.log(`Creating standalone client for node ${host}:${port} (slots ${startSlot}-${endSlot})`);
104+
nodeClient = new Redis({
105+
...redisOptions,
106+
host,
107+
port
108+
});
109+
nodeClientsMap.set(nodeKey, nodeClient);
110+
clients.push(nodeClient);
124111
}
125112

126-
// Map all slots in this range to this node's client
113+
// Map all slots in this range to this node's standalone client
127114
for (let slot = startSlot; slot <= endSlot; slot++) {
128115
slotClientMap.set(slot, nodeClient);
129116
}
130117
}
131118

132-
nodeAddresses = nodes.map(node => `${node.options.host}:${node.options.port}`);
119+
nodeAddresses = Array.from(nodeClientsMap.keys());
133120

134-
console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes: ${nodeAddresses.join(', ')}`);
121+
console.log(`Cluster mode - created ${nodeClientsMap.size} standalone node clients`);
122+
console.log(`Cluster mode - node addresses: ${nodeAddresses.join(', ')}`);
135123
console.log(`Cluster mode - mapped ${slotClientMap.size} slots to node clients`);
136124
} else {
137125
const client = new Redis(redisOptions);
@@ -200,6 +188,8 @@ async function runBenchmark(argv) {
200188
console.log(`Publisher ${clientId} targeting channels ${channels}`);
201189
}
202190

191+
const skipDuplicate = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers'];
192+
203193
promises.push(
204194
publisherRoutine(
205195
publisherName,
@@ -210,7 +200,9 @@ async function runBenchmark(argv) {
210200
argv['data-size'],
211201
client,
212202
isRunningRef,
213-
totalMessagesRef
203+
totalMessagesRef,
204+
null, // rateLimiter
205+
skipDuplicate
214206
)
215207
);
216208

@@ -245,6 +237,8 @@ async function runBenchmark(argv) {
245237
console.log(`Reconnect interval for ${subscriberName}: ${reconnectInterval}ms`);
246238
}
247239

240+
const skipDuplicate = argv.mode.startsWith('s') && argv['oss-cluster-api-distribute-subscribers'];
241+
248242
promises.push(
249243
subscriberRoutine(
250244
subscriberName,
@@ -261,7 +255,8 @@ async function runBenchmark(argv) {
261255
totalSubscribedRef,
262256
totalConnectsRef,
263257
argv.verbose,
264-
argv.clients
258+
argv.clients,
259+
skipDuplicate
265260
)
266261
);
267262
}

js/ioredis/lib/subscriber.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ async function subscriberRoutine(
1313
totalSubscribedRef,
1414
totalConnectsRef,
1515
verbose,
16-
totalClients
16+
totalClients,
17+
skipDuplicate = false
1718
) {
1819
let pubsub = null;
1920
let reconnectTimer = null;
@@ -35,9 +36,9 @@ async function subscriberRoutine(
3536
await pubsub.quit();
3637
}
3738

38-
// Duplicate connection afresh.
39+
// For cluster node clients, don't duplicate to preserve cluster routing
3940
// For cluster clients, duplicate() creates a new cluster-aware client
40-
pubsub = client.duplicate();
41+
pubsub = skipDuplicate ? client : client.duplicate();
4142

4243
// Set up error logging.
4344
pubsub.on('error', (err) => {

0 commit comments

Comments
 (0)