Skip to content

Commit 98f90fa

Browse files
committed
fix cluster client
1 parent 3089448 commit 98f90fa

File tree

1 file changed

+41
-135
lines changed

1 file changed

+41
-135
lines changed

js/node-redis/lib/redisManager.js

Lines changed: 41 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// filepath: /Users/hristo.temelski/code/etc/pubsub-sub-bench/js/node-redis/lib/redisManager.js
21
const { createClient, createCluster } = require('redis');
32
const { publisherRoutine } = require('./publisher');
43
const { subscriberRoutine } = require('./subscriber');
@@ -31,89 +30,46 @@ async function runBenchmark(argv) {
3130
// Create histogram for RTT recording
3231
const rttHistogram = argv['measure-rtt-latency'] ? createRttHistogram() : null;
3332

34-
// Configure Redis client options
35-
const redisOptions = {
36-
socket: {
37-
host: argv.host,
38-
port: argv.port,
39-
connectTimeout: argv['redis-timeout'],
40-
reconnectStrategy: false // disable auto-reconnect
41-
},
42-
password: argv.a || undefined,
43-
username: argv.user || undefined
44-
};
45-
46-
let clientsMap = new Map();
33+
let client;
4734
let nodeAddresses = [];
48-
let isClusterMode = false;
4935

5036
if (argv['oss-cluster-api-distribute-subscribers']) {
51-
isClusterMode = true;
5237
console.log('Using Redis Cluster mode');
53-
54-
try {
55-
// Create a test cluster client to get nodes
56-
const testCluster = createCluster({
57-
rootNodes: [
58-
{
59-
socket: {
60-
host: argv.host,
61-
port: argv.port
62-
},
63-
password: argv.a || undefined,
64-
username: argv.user || undefined
65-
}
66-
]
67-
});
68-
69-
await testCluster.connect();
70-
71-
// Get cluster slots information to determine node distribution
72-
const slots = await testCluster.clusterSlots();
73-
await testCluster.quit();
74-
75-
if (!slots || Object.keys(slots).length === 0) {
76-
throw new Error('Cluster has no slot assignments. Check node health.');
77-
}
78-
79-
// Create a map of slots to nodes
80-
for (const slotRange of Object.values(slots)) {
81-
const [startSlot, endSlot] = slotRange.slots;
82-
const { host, port } = slotRange.master;
83-
84-
// Create a standalone client for each node
85-
const nodeId = `${host}:${port}`;
86-
if (!clientsMap.has(nodeId)) {
87-
const nodeOptions = {
88-
socket: {
89-
host,
90-
port,
91-
connectTimeout: argv['redis-timeout'],
92-
reconnectStrategy: false
93-
},
94-
password: argv.a || undefined,
95-
username: argv.user || undefined
96-
};
97-
98-
nodeAddresses.push(nodeId);
99-
clientsMap.set(nodeId, nodeOptions);
100-
}
101-
102-
// Map each slot to this node
103-
for (let slot = startSlot; slot <= endSlot; slot++) {
104-
clientsMap.set(`slot:${slot}`, nodeId);
38+
client = createCluster({
39+
rootNodes: [{
40+
socket: {
41+
host: argv.host,
42+
port: argv.port
43+
},
44+
password: argv.a || undefined,
45+
username: argv.user || undefined
46+
}],
47+
defaults: {
48+
socket: {
49+
connectTimeout: argv['redis-timeout'],
50+
reconnectStrategy: false // disable auto-reconnect
10551
}
10652
}
107-
108-
console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`);
109-
} catch (err) {
110-
console.error('Failed to initialize cluster mode:', err);
111-
process.exit(1);
112-
}
53+
});
54+
55+
await client.connect();
56+
nodeAddresses = [`${argv.host}:${argv.port}`];
57+
console.log('Cluster mode - connecting through cluster client');
11358
} else {
11459
// Single node mode
115-
nodeAddresses.push(`${argv.host}:${argv.port}`);
116-
clientsMap.set(nodeAddresses[0], redisOptions);
60+
client = createClient({
61+
socket: {
62+
host: argv.host,
63+
port: argv.port,
64+
connectTimeout: argv['redis-timeout'],
65+
reconnectStrategy: false // disable auto-reconnect
66+
},
67+
password: argv.a || undefined,
68+
username: argv.user || undefined
69+
});
70+
71+
await client.connect();
72+
nodeAddresses = [`${argv.host}:${argv.port}`];
11773
console.log('Standalone mode - using single Redis instance');
11874
}
11975

@@ -130,61 +86,6 @@ async function runBenchmark(argv) {
13086

13187
const promises = [];
13288

133-
// Helper function to get a slot for a key (simplified version)
134-
function getSlot(key) {
135-
const crc16tab = [
136-
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
137-
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
138-
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
139-
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
140-
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
141-
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
142-
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
143-
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
144-
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
145-
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
146-
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
147-
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
148-
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
149-
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
150-
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
151-
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
152-
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
153-
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
154-
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
155-
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
156-
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
157-
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
158-
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
159-
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
160-
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
161-
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
162-
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
163-
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
164-
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
165-
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
166-
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
167-
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0
168-
];
169-
170-
let crc = 0;
171-
for (let i = 0; i < key.length; i++) {
172-
crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ key.charCodeAt(i)) & 0xff]) & 0xffff;
173-
}
174-
return crc % 16384; // 16384 slots in Redis Cluster
175-
}
176-
177-
// Helper to get Redis options for a specific channel
178-
function getRedisOptionsForChannel(channel) {
179-
if (!isClusterMode) {
180-
return redisOptions;
181-
}
182-
183-
const slot = getSlot(channel);
184-
const nodeId = clientsMap.get(`slot:${slot}`);
185-
return clientsMap.get(nodeId);
186-
}
187-
18889
function randomInt(min, max) {
18990
if (min === max) return min;
19091
return Math.floor(Math.random() * (max - min + 1)) + min;
@@ -220,7 +121,6 @@ async function runBenchmark(argv) {
220121
}
221122

222123
const publisherName = `publisher#${clientId}`;
223-
const channelOptions = getRedisOptionsForChannel(channels[0]);
224124

225125
if (argv.verbose) {
226126
console.log(`Publisher ${clientId} targeting channels ${channels}`);
@@ -234,7 +134,7 @@ async function runBenchmark(argv) {
234134
argv['measure-rtt-latency'],
235135
argv.verbose,
236136
argv['data-size'],
237-
channelOptions,
137+
client,
238138
isRunningRef,
239139
totalMessagesRef
240140
)
@@ -259,7 +159,6 @@ async function runBenchmark(argv) {
259159
}
260160

261161
const subscriberName = `subscriber#${clientId}`;
262-
const channelOptions = getRedisOptionsForChannel(channels[0]);
263162

264163
const reconnectInterval = randomInt(
265164
argv['min-reconnect-interval'],
@@ -282,7 +181,7 @@ async function runBenchmark(argv) {
282181
argv['print-messages'],
283182
reconnectInterval,
284183
argv['measure-rtt-latency'],
285-
channelOptions,
184+
client,
286185
isRunningRef,
287186
rttAccumulator,
288187
rttHistogram,
@@ -339,6 +238,13 @@ async function runBenchmark(argv) {
339238
console.error('Benchmark error:', err);
340239
}
341240

241+
// Clean up and disconnect the main client
242+
try {
243+
await client.quit();
244+
} catch (err) {
245+
console.error('Error disconnecting main client:', err);
246+
}
247+
342248
// Clean exit
343249
process.exit(0);
344250
}

0 commit comments

Comments
 (0)