Skip to content

Commit 8abff88

Browse files
authored
Merge pull request #1 from htemelski/reduce_memory_footprint
Reduced the memory footprint of the js benchmark
2 parents d481e62 + 5ad049f commit 8abff88

File tree

4 files changed

+234
-97
lines changed

4 files changed

+234
-97
lines changed

js/lib/metrics.js

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,35 @@
11
const fs = require('fs');
22
const hdr = require('hdr-histogram-js');
33

4+
// Simple accumulator for RTT stats per tick
5+
class RttAccumulator {
6+
constructor() {
7+
this.reset();
8+
}
9+
10+
reset() {
11+
this.sum = 0;
12+
this.count = 0;
13+
}
14+
15+
add(value) {
16+
this.sum += value;
17+
this.count++;
18+
}
19+
20+
getAverage() {
21+
return this.count > 0 ? this.sum / this.count : null;
22+
}
23+
}
24+
25+
function createRttHistogram() {
26+
return hdr.build({
27+
lowestDiscernibleValue: 1,
28+
highestTrackableValue: 10_000_000,
29+
numberOfSignificantValueDigits: 3
30+
});
31+
}
32+
433
function formatRow(row) {
534
const widths = [6, 15, 14, 14, 22, 14];
635
return row.map((val, i) => String(val).padEnd(widths[i] || 10)).join('');
@@ -18,8 +47,8 @@ function updateCLI(
1847
totalSubscribedRef,
1948
totalPublishersRef,
2049
messageRateTs,
21-
rttValues,
22-
rttArchive
50+
rttAccumulator,
51+
rttHistogram
2352
) {
2453
return new Promise((resolve) => {
2554
let prevTime = Date.now();
@@ -66,12 +95,11 @@ function updateCLI(
6695
let avgRttMs = null;
6796

6897
if (measureRTT) {
69-
const tickRttValues = rttValues.splice(0);
70-
if (tickRttValues.length > 0) {
71-
const sum = tickRttValues.reduce((a, b) => a + b, 0);
72-
const avgRtt = Number(sum) / tickRttValues.length;
73-
avgRttMs = avgRtt;
98+
if (rttAccumulator.count > 0) {
99+
avgRttMs = rttAccumulator.getAverage();
74100
metrics.push(avgRttMs.toFixed(3));
101+
// Reset accumulator after using the values
102+
rttAccumulator.reset();
75103
} else {
76104
metrics.push('--');
77105
}
@@ -119,10 +147,10 @@ function writeFinalResults(
119147
totalSubscribed,
120148
messageRateTs,
121149
rttValues,
122-
rttArchive,
150+
rttHistogram,
123151
perSecondStats
124152
) {
125-
const duration = (end - start);
153+
const duration = (end - start)/1000;
126154
const messageRate = totalMessages / duration;
127155

128156
console.log('#################################################');
@@ -148,38 +176,27 @@ function writeFinalResults(
148176
};
149177

150178
if (argv['measure-rtt-latency'] && !mode.includes('publish')) {
151-
const histogram = hdr.build({
152-
lowestDiscernibleValue: 1,
153-
highestTrackableValue: 10_000_000,
154-
numberOfSignificantValueDigits: 3
155-
});
156-
157-
rttArchive.forEach((rtt) => {
158-
const val = Number(rtt);
159-
if (val >= 0) histogram.recordValue(val);
160-
});
161-
162-
const avgRtt = histogram.mean ;
163-
const p50 = histogram.getValueAtPercentile(50);
164-
const p95 = histogram.getValueAtPercentile(95);
165-
const p99 = histogram.getValueAtPercentile(99);
166-
const p999 = histogram.getValueAtPercentile(99.9);
179+
const avgRtt = rttHistogram.mean;
180+
const p50 = rttHistogram.getValueAtPercentile(50);
181+
const p95 = rttHistogram.getValueAtPercentile(95);
182+
const p99 = rttHistogram.getValueAtPercentile(99);
183+
const p999 = rttHistogram.getValueAtPercentile(99.9);
167184

168185
result.RTTSummary = {
169186
AvgMs: Number(avgRtt.toFixed(3)),
170187
P50Ms: Number(p50.toFixed(3)),
171188
P95Ms: Number(p95.toFixed(3)),
172189
P99Ms: Number(p99.toFixed(3)),
173190
P999Ms: Number(p999.toFixed(3)),
174-
totalCount: histogram.totalCount
191+
totalCount: rttHistogram.totalCount
175192
};
176193

177-
console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
178-
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
179-
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
180-
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
181-
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
182-
console.log(`Total Messages tracked latency ${histogram.totalCount} messages`);
194+
console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
195+
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
196+
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
197+
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
198+
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
199+
console.log(`Total Messages tracked latency ${rttHistogram.totalCount} messages`);
183200
}
184201

185202
console.log('#################################################');
@@ -192,5 +209,7 @@ function writeFinalResults(
192209

193210
module.exports = {
194211
updateCLI,
195-
writeFinalResults
212+
writeFinalResults,
213+
createRttHistogram,
214+
RttAccumulator
196215
};

js/lib/publisher.js

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ async function publisherRoutine(
77
dataSize,
88
client,
99
isRunningRef,
10-
totalMessagesRef
10+
totalMessagesRef,
11+
rateLimiter
1112
) {
1213
if (verbose) {
1314
console.log(
@@ -18,25 +19,46 @@ async function publisherRoutine(
1819
}
1920

2021
const payload = !measureRTT ? 'A'.repeat(dataSize) : '';
22+
const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher
2123

22-
while (isRunningRef.value) {
23-
let msg = payload;
24-
if (measureRTT) {
25-
msg = Date.now();
26-
}
24+
try {
25+
while (isRunningRef.value) {
26+
for (const channel of channels) {
27+
try {
28+
// Apply rate limiting if configured
29+
if (rateLimiter) {
30+
await rateLimiter.removeTokens(1);
31+
}
32+
33+
let msg = payload;
34+
if (measureRTT) {
35+
msg = Date.now().toString();
36+
}
2737

28-
for (const channel of channels) {
29-
try {
30-
if (mode === 'spublish') {
31-
await client.spublish(channel, msg);
32-
} else {
33-
await client.publish(channel, msg);
38+
if (mode === 'spublish') {
39+
await duplicatedClient.spublish(channel, msg);
40+
} else {
41+
await duplicatedClient.publish(channel, msg);
42+
}
43+
totalMessagesRef.value++;
44+
} catch (err) {
45+
console.error(`Error publishing to channel ${channel}:`, err);
3446
}
35-
totalMessagesRef.value++;
36-
} catch (err) {
37-
console.error(`Error publishing to channel ${channel}:`, err);
3847
}
3948
}
49+
} finally {
50+
// Clean shutdown - disconnect the client
51+
if (verbose) {
52+
console.log(`Publisher ${clientName} shutting down...`);
53+
}
54+
try {
55+
duplicatedClient.disconnect();
56+
if (verbose) {
57+
console.log(`Publisher ${clientName} disconnected successfully`);
58+
}
59+
} catch (err) {
60+
console.error(`Error disconnecting publisher ${clientName}:`, err);
61+
}
4062
}
4163
}
4264

0 commit comments

Comments
 (0)