Skip to content

Commit e817c48

Browse files
authored
Fixed time conversion, reduced memory footprint, enabled s/publish, added node-redis benchmark (#28)
* Fix time conversions on the subscriber side * minor fix * Use only milliseconds * Reduced the memory footprint of the js benchmark * Enable s/publish * Added node-redis benchmark implementation
1 parent 697aef5 commit e817c48

22 files changed

+1224
-160
lines changed

.gitignore

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,7 @@ pubsub-sub-bench
8383
ehthumbs.db
8484
Thumbs.db
8585

86-
87-
# Json Results #
88-
################
89-
*.json
86+
package-lock.json
9087

9188
# Coverage Results #
9289
####################
File renamed without changes.
File renamed without changes.

js/README.md renamed to js/ioredis/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ Supports both **standalone** and **Redis OSS Cluster** modes, with support for `
1010
## 📦 Installation
1111

1212
```bash
13-
cd pubsub-sub-bench/js
13+
cd pubsub-sub-bench/js/ioredis
1414
npm install
File renamed without changes.
File renamed without changes.

js/lib/metrics.js renamed to js/ioredis/lib/metrics.js

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,35 @@
11
const fs = require('fs');
22
const hdr = require('hdr-histogram-js');
33

4+
// Simple accumulator for RTT stats per tick
5+
class RttAccumulator {
6+
constructor() {
7+
this.reset();
8+
}
9+
10+
reset() {
11+
this.sum = 0;
12+
this.count = 0;
13+
}
14+
15+
add(value) {
16+
this.sum += value;
17+
this.count++;
18+
}
19+
20+
getAverage() {
21+
return this.count > 0 ? this.sum / this.count : null;
22+
}
23+
}
24+
25+
function createRttHistogram() {
26+
return hdr.build({
27+
lowestDiscernibleValue: 1,
28+
highestTrackableValue: 10_000_000,
29+
numberOfSignificantValueDigits: 3
30+
});
31+
}
32+
433
function formatRow(row) {
534
const widths = [6, 15, 14, 14, 22, 14];
635
return row.map((val, i) => String(val).padEnd(widths[i] || 10)).join('');
@@ -18,8 +47,8 @@ function updateCLI(
1847
totalSubscribedRef,
1948
totalPublishersRef,
2049
messageRateTs,
21-
rttValues,
22-
rttArchive
50+
rttAccumulator,
51+
rttHistogram
2352
) {
2453
return new Promise((resolve) => {
2554
let prevTime = Date.now();
@@ -66,12 +95,11 @@ function updateCLI(
6695
let avgRttMs = null;
6796

6897
if (measureRTT) {
69-
const tickRttValues = rttValues.splice(0);
70-
if (tickRttValues.length > 0) {
71-
const sum = tickRttValues.reduce((a, b) => a + b, 0n);
72-
const avgRtt = Number(sum) / tickRttValues.length;
73-
avgRttMs = avgRtt / 1000;
98+
if (rttAccumulator.count > 0) {
99+
avgRttMs = rttAccumulator.getAverage();
74100
metrics.push(avgRttMs.toFixed(3));
101+
// Reset accumulator after using the values
102+
rttAccumulator.reset();
75103
} else {
76104
metrics.push('--');
77105
}
@@ -119,10 +147,10 @@ function writeFinalResults(
119147
totalSubscribed,
120148
messageRateTs,
121149
rttValues,
122-
rttArchive,
150+
rttHistogram,
123151
perSecondStats
124152
) {
125-
const duration = (end - start) / 1000;
153+
const duration = (end - start)/1000;
126154
const messageRate = totalMessages / duration;
127155

128156
console.log('#################################################');
@@ -131,7 +159,7 @@ function writeFinalResults(
131159
console.log(`Message Rate: ${messageRate.toFixed(6)} msg/sec`);
132160

133161
const result = {
134-
StartTime: Math.floor(start / 1000),
162+
StartTime: Math.floor(start),
135163
Duration: duration,
136164
Mode: mode,
137165
MessageRate: messageRate,
@@ -148,38 +176,27 @@ function writeFinalResults(
148176
};
149177

150178
if (argv['measure-rtt-latency'] && !mode.includes('publish')) {
151-
const histogram = hdr.build({
152-
lowestDiscernibleValue: 1,
153-
highestTrackableValue: 10_000_000,
154-
numberOfSignificantValueDigits: 3
155-
});
156-
157-
rttArchive.forEach((rtt) => {
158-
const val = Number(rtt);
159-
if (val >= 0) histogram.recordValue(val);
160-
});
161-
162-
const avgRtt = histogram.mean / 1000;
163-
const p50 = histogram.getValueAtPercentile(50) / 1000;
164-
const p95 = histogram.getValueAtPercentile(95) / 1000;
165-
const p99 = histogram.getValueAtPercentile(99) / 1000;
166-
const p999 = histogram.getValueAtPercentile(99.9) / 1000;
179+
const avgRtt = rttHistogram.mean;
180+
const p50 = rttHistogram.getValueAtPercentile(50);
181+
const p95 = rttHistogram.getValueAtPercentile(95);
182+
const p99 = rttHistogram.getValueAtPercentile(99);
183+
const p999 = rttHistogram.getValueAtPercentile(99.9);
167184

168185
result.RTTSummary = {
169186
AvgMs: Number(avgRtt.toFixed(3)),
170187
P50Ms: Number(p50.toFixed(3)),
171188
P95Ms: Number(p95.toFixed(3)),
172189
P99Ms: Number(p99.toFixed(3)),
173190
P999Ms: Number(p999.toFixed(3)),
174-
totalCount: histogram.totalCount
191+
totalCount: rttHistogram.totalCount
175192
};
176193

177-
console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
178-
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
179-
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
180-
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
181-
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
182-
console.log(`Total Messages tracked latency ${histogram.totalCount} messages`);
194+
console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
195+
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
196+
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
197+
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
198+
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
199+
console.log(`Total Messages tracked latency ${rttHistogram.totalCount} messages`);
183200
}
184201

185202
console.log('#################################################');
@@ -192,5 +209,7 @@ function writeFinalResults(
192209

193210
module.exports = {
194211
updateCLI,
195-
writeFinalResults
212+
writeFinalResults,
213+
createRttHistogram,
214+
RttAccumulator
196215
};

js/ioredis/lib/publisher.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
async function publisherRoutine(
2+
clientName,
3+
channels,
4+
mode,
5+
measureRTT,
6+
verbose,
7+
dataSize,
8+
client,
9+
isRunningRef,
10+
totalMessagesRef,
11+
rateLimiter
12+
) {
13+
if (verbose) {
14+
console.log(
15+
`Publisher ${clientName} started. Mode: ${mode} | Channels: ${channels.length} | Payload: ${
16+
measureRTT ? 'RTT timestamp' : `fixed size ${dataSize} bytes`
17+
}`
18+
);
19+
}
20+
21+
const payload = !measureRTT ? 'A'.repeat(dataSize) : '';
22+
const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher
23+
24+
try {
25+
while (isRunningRef.value) {
26+
for (const channel of channels) {
27+
try {
28+
// Apply rate limiting if configured
29+
if (rateLimiter) {
30+
await rateLimiter.removeTokens(1);
31+
}
32+
33+
let msg = payload;
34+
if (measureRTT) {
35+
msg = Date.now().toString();
36+
}
37+
38+
if (mode === 'spublish') {
39+
await duplicatedClient.spublish(channel, msg);
40+
} else {
41+
await duplicatedClient.publish(channel, msg);
42+
}
43+
totalMessagesRef.value++;
44+
} catch (err) {
45+
console.error(`Error publishing to channel ${channel}:`, err);
46+
}
47+
}
48+
}
49+
} finally {
50+
// Clean shutdown - disconnect the client
51+
if (verbose) {
52+
console.log(`Publisher ${clientName} shutting down...`);
53+
}
54+
try {
55+
duplicatedClient.disconnect();
56+
if (verbose) {
57+
console.log(`Publisher ${clientName} disconnected successfully`);
58+
}
59+
} catch (err) {
60+
console.error(`Error disconnecting publisher ${clientName}:`, err);
61+
}
62+
}
63+
}
64+
65+
module.exports = { publisherRoutine };

0 commit comments

Comments
 (0)