Skip to content

Commit 369f94b

Browse files
Merge pull request #27 from redis-performance/js.version
JS version port
2 parents 8d6d1f5 + e9fbddf commit 369f94b

File tree

11 files changed

+789
-1
lines changed

11 files changed

+789
-1
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pubsub-sub-bench
2424
.idea
2525
.vscode
2626
.project
27-
bin/
27+
#bin/
2828

2929
# Binaries for programs and plugins
3030
*.exe

js/.gitignore

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Node modules
2+
node_modules/
3+
npm-debug.log*
4+
yarn-debug.log*
5+
yarn-error.log*
6+
7+
# Output files
8+
*.log
9+
*.out
10+
*.tmp
11+
*.pid
12+
*.json
13+
14+
# OS junk
15+
.DS_Store
16+
Thumbs.db
17+
18+
# Environment & config
19+
.env
20+
.env.local
21+
.env.*
22+
23+
# Editor folders
24+
.vscode/
25+
.idea/
26+
*.swp
27+
28+
# Benchmark result output
29+
results/
30+
benchmark-*.json
31+
32+
# Transpiled stuff (if used later)
33+
dist/
34+
build/
35+

js/.prettierrc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"semi": true,
3+
"singleQuote": true,
4+
"printWidth": 100,
5+
"trailingComma": "none"
6+
}

js/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# pubsub-sub-bench (JavaScript Edition)
2+
3+
High-performance **Redis Pub/Sub benchmark tool**, written in Node.js.
4+
Supports both **standalone** and **Redis OSS Cluster** modes, with support for `PUBLISH`, `SPUBLISH`, `SUBSCRIBE`, and `SSUBSCRIBE`.
5+
6+
> Part of the [redis-performance/pubsub-sub-bench](https://github.com/redis-performance/pubsub-sub-bench) suite.
7+
8+
---
9+
10+
## 📦 Installation
11+
12+
```bash
13+
cd pubsub-sub-bench/js
14+
npm install

js/bin/pubsub-sub-bench.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/usr/bin/env node
2+
3+
const { parseArgs } = require('../lib/config');
4+
const { runBenchmark } = require('../lib/redisManager');
5+
6+
(async () => {
7+
const argv = parseArgs();
8+
9+
try {
10+
await runBenchmark(argv);
11+
} catch (err) {
12+
console.error('Error in main execution:', err);
13+
process.exit(1);
14+
}
15+
})();

js/lib/config.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const yargs = require('yargs');
2+
3+
function parseArgs() {
4+
return yargs
5+
.option('host', { description: 'Redis host', default: '127.0.0.1' })
6+
.option('port', { description: 'Redis port', default: '6379' })
7+
.option('a', { description: 'Password for Redis Auth', default: '' })
8+
.option('user', { description: 'ACL-style AUTH username', default: '' })
9+
.option('data-size', { description: 'Payload size in bytes', default: 128 })
10+
.option('mode', {
11+
description: 'Mode: subscribe | ssubscribe | publish | spublish',
12+
default: 'subscribe'
13+
})
14+
.option('subscribers-placement-per-channel', {
15+
description: 'dense | sparse',
16+
default: 'dense'
17+
})
18+
.option('channel-minimum', { description: 'Min channel ID', default: 1 })
19+
.option('channel-maximum', { description: 'Max channel ID', default: 100 })
20+
.option('subscribers-per-channel', { description: 'Subscribers per channel', default: 1 })
21+
.option('clients', { description: 'Number of connections', default: 50 })
22+
.option('min-number-channels-per-subscriber', { default: 1 })
23+
.option('max-number-channels-per-subscriber', { default: 1 })
24+
.option('min-reconnect-interval', { default: 0 })
25+
.option('max-reconnect-interval', { default: 0 })
26+
.option('messages', { default: 0 })
27+
.option('json-out-file', { default: '' })
28+
.option('client-update-tick', { default: 1 })
29+
.option('test-time', { default: 0 })
30+
.option('rand-seed', { default: 12345 })
31+
.option('subscriber-prefix', { default: 'channel-' })
32+
.option('oss-cluster-api-distribute-subscribers', { default: false })
33+
.option('slot-refresh-interval', { default: -1 })
34+
.option('print-messages', { default: false })
35+
.option('verbose', { default: false })
36+
.option('measure-rtt-latency', { default: false })
37+
.option('redis-timeout', { default: 120000 })
38+
.option('pool-size', { default: 0 })
39+
.help().argv;
40+
}
41+
42+
module.exports = { parseArgs };

js/lib/metrics.js

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
const fs = require('fs');
2+
const hdr = require('hdr-histogram-js');
3+
4+
function formatRow(row) {
5+
const widths = [6, 15, 14, 14, 22, 14];
6+
return row.map((val, i) => String(val).padEnd(widths[i] || 10)).join('');
7+
}
8+
9+
function updateCLI(
10+
updateInterval,
11+
messageLimit,
12+
testTime,
13+
measureRTT,
14+
mode,
15+
isRunningRef,
16+
totalMessagesRef,
17+
totalConnectsRef,
18+
totalSubscribedRef,
19+
totalPublishersRef,
20+
messageRateTs,
21+
rttValues,
22+
rttArchive
23+
) {
24+
return new Promise((resolve) => {
25+
let prevTime = Date.now();
26+
let prevMessageCount = 0;
27+
let prevConnectCount = 0;
28+
let startTime = Date.now();
29+
let resolved = false;
30+
31+
console.log('Starting benchmark...');
32+
33+
const header = ['Time', 'Total Messages', 'Message Rate', 'Connect Rate'];
34+
header.push(mode.includes('subscribe') ? 'Active Subscriptions' : 'Active Publishers');
35+
if (measureRTT) header.push('Avg RTT (ms)');
36+
console.log(formatRow(header));
37+
const perSecondStats = [];
38+
39+
const interval = setInterval(() => {
40+
const now = Date.now();
41+
const elapsed = (now - prevTime) / 1000;
42+
43+
const messageRate = (totalMessagesRef.value - prevMessageCount) / elapsed;
44+
const connectRate = (totalConnectsRef.value - prevConnectCount) / elapsed;
45+
46+
if (prevMessageCount === 0 && totalMessagesRef.value !== 0) {
47+
startTime = Date.now();
48+
}
49+
50+
if (totalMessagesRef.value !== 0) {
51+
messageRateTs.push(messageRate);
52+
}
53+
54+
prevMessageCount = totalMessagesRef.value;
55+
prevConnectCount = totalConnectsRef.value;
56+
prevTime = now;
57+
58+
const metrics = [
59+
Math.floor((now - startTime) / 1000),
60+
totalMessagesRef.value,
61+
messageRate.toFixed(2),
62+
connectRate.toFixed(2),
63+
mode.includes('subscribe') ? totalSubscribedRef.value : totalPublishersRef.value
64+
];
65+
66+
let avgRttMs = null;
67+
68+
if (measureRTT) {
69+
const tickRttValues = rttValues.splice(0);
70+
if (tickRttValues.length > 0) {
71+
const sum = tickRttValues.reduce((a, b) => a + b, 0n);
72+
const avgRtt = Number(sum) / tickRttValues.length;
73+
avgRttMs = avgRtt / 1000;
74+
metrics.push(avgRttMs.toFixed(3));
75+
} else {
76+
metrics.push('--');
77+
}
78+
}
79+
80+
perSecondStats.push({
81+
second: Math.floor((now - startTime) / 1000),
82+
messages: totalMessagesRef.value,
83+
messageRate: Number(messageRate.toFixed(2)),
84+
avgRttMs: avgRttMs !== null ? Number(avgRttMs.toFixed(3)) : null
85+
});
86+
87+
console.log(formatRow(metrics));
88+
89+
const shouldStop =
90+
(messageLimit > 0 && totalMessagesRef.value >= messageLimit) ||
91+
(testTime > 0 && now - startTime >= testTime * 1000 && totalMessagesRef.value !== 0);
92+
93+
if (shouldStop && !resolved) {
94+
resolved = true;
95+
clearInterval(interval);
96+
isRunningRef.value = false;
97+
resolve({ startTime, now, perSecondStats });
98+
}
99+
}, updateInterval * 1000);
100+
101+
process.on('SIGINT', () => {
102+
if (!resolved) {
103+
console.log('\nReceived Ctrl-C - shutting down');
104+
clearInterval(interval);
105+
isRunningRef.value = false;
106+
resolved = true;
107+
resolve({ startTime, now: Date.now(), perSecondStats, sigint: true });
108+
}
109+
});
110+
});
111+
}
112+
113+
function writeFinalResults(
114+
start,
115+
end,
116+
argv,
117+
mode,
118+
totalMessages,
119+
totalSubscribed,
120+
messageRateTs,
121+
rttValues,
122+
rttArchive,
123+
perSecondStats
124+
) {
125+
const duration = (end - start) / 1000;
126+
const messageRate = totalMessages / duration;
127+
128+
console.log('#################################################');
129+
console.log(`Mode: ${mode}`);
130+
console.log(`Total Duration: ${duration.toFixed(6)} Seconds`);
131+
console.log(`Message Rate: ${messageRate.toFixed(6)} msg/sec`);
132+
133+
const result = {
134+
StartTime: Math.floor(start / 1000),
135+
Duration: duration,
136+
Mode: mode,
137+
MessageRate: messageRate,
138+
TotalMessages: totalMessages,
139+
TotalSubscriptions: totalSubscribed,
140+
ChannelMin: argv['channel-minimum'],
141+
ChannelMax: argv['channel-maximum'],
142+
SubscribersPerChannel: argv['subscribers-per-channel'],
143+
MessagesPerChannel: argv['messages'],
144+
MessageRateTs: messageRateTs,
145+
OSSDistributedSlots: argv['oss-cluster-api-distribute-subscribers'],
146+
Addresses: [`${argv.host}:${argv.port}`],
147+
PerSecondStats: perSecondStats
148+
};
149+
150+
if (argv['measure-rtt-latency'] && !mode.includes('publish')) {
151+
const histogram = hdr.build({
152+
lowestDiscernibleValue: 1,
153+
highestTrackableValue: 10_000_000,
154+
numberOfSignificantValueDigits: 3
155+
});
156+
157+
rttArchive.forEach((rtt) => {
158+
const val = Number(rtt);
159+
if (val >= 0) histogram.recordValue(val);
160+
});
161+
162+
const avgRtt = histogram.mean / 1000;
163+
const p50 = histogram.getValueAtPercentile(50) / 1000;
164+
const p95 = histogram.getValueAtPercentile(95) / 1000;
165+
const p99 = histogram.getValueAtPercentile(99) / 1000;
166+
const p999 = histogram.getValueAtPercentile(99.9) / 1000;
167+
168+
result.RTTSummary = {
169+
AvgMs: Number(avgRtt.toFixed(3)),
170+
P50Ms: Number(p50.toFixed(3)),
171+
P95Ms: Number(p95.toFixed(3)),
172+
P99Ms: Number(p99.toFixed(3)),
173+
P999Ms: Number(p999.toFixed(3)),
174+
totalCount: histogram.totalCount
175+
};
176+
177+
console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
178+
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
179+
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
180+
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
181+
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
182+
console.log(`Total Messages tracked latency ${histogram.totalCount} messages`);
183+
}
184+
185+
console.log('#################################################');
186+
187+
if (argv['json-out-file']) {
188+
fs.writeFileSync(argv['json-out-file'], JSON.stringify(result, null, 2));
189+
console.log(`Results written to ${argv['json-out-file']}`);
190+
}
191+
}
192+
193+
module.exports = {
194+
updateCLI,
195+
writeFinalResults
196+
};

js/lib/publisher.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
async function publisherRoutine(
2+
clientName,
3+
channels,
4+
mode,
5+
measureRTT,
6+
verbose,
7+
dataSize,
8+
client,
9+
isRunningRef,
10+
totalMessagesRef
11+
) {
12+
if (verbose) {
13+
console.log(
14+
`Publisher ${clientName} started. Mode: ${mode} | Channels: ${channels.length} | Payload: ${
15+
measureRTT ? 'RTT timestamp' : `fixed size ${dataSize} bytes`
16+
}`
17+
);
18+
}
19+
20+
const payload = !measureRTT ? 'A'.repeat(dataSize) : '';
21+
22+
while (isRunningRef.value) {
23+
let msg = payload;
24+
if (measureRTT) {
25+
msg = process.hrtime.bigint() / 1000;
26+
}
27+
28+
for (const channel of channels) {
29+
try {
30+
if (mode === 'spublish') {
31+
await client.spublish(channel, msg);
32+
} else {
33+
await client.publish(channel, msg);
34+
}
35+
totalMessagesRef.value++;
36+
} catch (err) {
37+
console.error(`Error publishing to channel ${channel}:`, err);
38+
}
39+
}
40+
}
41+
}
42+
43+
module.exports = { publisherRoutine };

0 commit comments

Comments
 (0)