Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ pubsub-sub-bench
ehthumbs.db
Thumbs.db


# Json Results #
################
*.json
package-lock.json

# Coverage Results #
####################
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion js/README.md → js/ioredis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ Supports both **standalone** and **Redis OSS Cluster** modes, with support for `
## 📦 Installation

```bash
cd pubsub-sub-bench/js
cd pubsub-sub-bench/js/ioredis
npm install
File renamed without changes.
File renamed without changes.
87 changes: 53 additions & 34 deletions js/lib/metrics.js → js/ioredis/lib/metrics.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
const fs = require('fs');
const hdr = require('hdr-histogram-js');

// Simple accumulator for RTT stats per tick
class RttAccumulator {
constructor() {
this.reset();
}

reset() {
this.sum = 0;
this.count = 0;
}

add(value) {
this.sum += value;
this.count++;
}

getAverage() {
return this.count > 0 ? this.sum / this.count : null;
}
}

function createRttHistogram() {
return hdr.build({
lowestDiscernibleValue: 1,
highestTrackableValue: 10_000_000,
numberOfSignificantValueDigits: 3
});
}

function formatRow(row) {
const widths = [6, 15, 14, 14, 22, 14];
return row.map((val, i) => String(val).padEnd(widths[i] || 10)).join('');
Expand All @@ -18,8 +47,8 @@ function updateCLI(
totalSubscribedRef,
totalPublishersRef,
messageRateTs,
rttValues,
rttArchive
rttAccumulator,
rttHistogram
) {
return new Promise((resolve) => {
let prevTime = Date.now();
Expand Down Expand Up @@ -66,12 +95,11 @@ function updateCLI(
let avgRttMs = null;

if (measureRTT) {
const tickRttValues = rttValues.splice(0);
if (tickRttValues.length > 0) {
const sum = tickRttValues.reduce((a, b) => a + b, 0n);
const avgRtt = Number(sum) / tickRttValues.length;
avgRttMs = avgRtt / 1000;
if (rttAccumulator.count > 0) {
avgRttMs = rttAccumulator.getAverage();
metrics.push(avgRttMs.toFixed(3));
// Reset accumulator after using the values
rttAccumulator.reset();
} else {
metrics.push('--');
}
Expand Down Expand Up @@ -119,10 +147,10 @@ function writeFinalResults(
totalSubscribed,
messageRateTs,
rttValues,
rttArchive,
rttHistogram,
perSecondStats
) {
const duration = (end - start) / 1000;
const duration = (end - start)/1000;
const messageRate = totalMessages / duration;

console.log('#################################################');
Expand All @@ -131,7 +159,7 @@ function writeFinalResults(
console.log(`Message Rate: ${messageRate.toFixed(6)} msg/sec`);

const result = {
StartTime: Math.floor(start / 1000),
StartTime: Math.floor(start),
Duration: duration,
Mode: mode,
MessageRate: messageRate,
Expand All @@ -148,38 +176,27 @@ function writeFinalResults(
};

if (argv['measure-rtt-latency'] && !mode.includes('publish')) {
const histogram = hdr.build({
lowestDiscernibleValue: 1,
highestTrackableValue: 10_000_000,
numberOfSignificantValueDigits: 3
});

rttArchive.forEach((rtt) => {
const val = Number(rtt);
if (val >= 0) histogram.recordValue(val);
});

const avgRtt = histogram.mean / 1000;
const p50 = histogram.getValueAtPercentile(50) / 1000;
const p95 = histogram.getValueAtPercentile(95) / 1000;
const p99 = histogram.getValueAtPercentile(99) / 1000;
const p999 = histogram.getValueAtPercentile(99.9) / 1000;
const avgRtt = rttHistogram.mean;
const p50 = rttHistogram.getValueAtPercentile(50);
const p95 = rttHistogram.getValueAtPercentile(95);
const p99 = rttHistogram.getValueAtPercentile(99);
const p999 = rttHistogram.getValueAtPercentile(99.9);

result.RTTSummary = {
AvgMs: Number(avgRtt.toFixed(3)),
P50Ms: Number(p50.toFixed(3)),
P95Ms: Number(p95.toFixed(3)),
P99Ms: Number(p99.toFixed(3)),
P999Ms: Number(p999.toFixed(3)),
totalCount: histogram.totalCount
totalCount: rttHistogram.totalCount
};

console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
console.log(`Total Messages tracked latency ${histogram.totalCount} messages`);
console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`);
console.log(`P50 RTT ${p50.toFixed(3)} ms`);
console.log(`P95 RTT ${p95.toFixed(3)} ms`);
console.log(`P99 RTT ${p99.toFixed(3)} ms`);
console.log(`P999 RTT ${p999.toFixed(3)} ms`);
console.log(`Total Messages tracked latency ${rttHistogram.totalCount} messages`);
}

console.log('#################################################');
Expand All @@ -192,5 +209,7 @@ function writeFinalResults(

module.exports = {
updateCLI,
writeFinalResults
writeFinalResults,
createRttHistogram,
RttAccumulator
};
65 changes: 65 additions & 0 deletions js/ioredis/lib/publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
async function publisherRoutine(
clientName,
channels,
mode,
measureRTT,
verbose,
dataSize,
client,
isRunningRef,
totalMessagesRef,
rateLimiter
) {
if (verbose) {
console.log(
`Publisher ${clientName} started. Mode: ${mode} | Channels: ${channels.length} | Payload: ${
measureRTT ? 'RTT timestamp' : `fixed size ${dataSize} bytes`
}`
);
}

const payload = !measureRTT ? 'A'.repeat(dataSize) : '';
const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher

try {
while (isRunningRef.value) {
for (const channel of channels) {
try {
// Apply rate limiting if configured
if (rateLimiter) {
await rateLimiter.removeTokens(1);
}

let msg = payload;
if (measureRTT) {
msg = Date.now().toString();
}

if (mode === 'spublish') {
await duplicatedClient.spublish(channel, msg);
} else {
await duplicatedClient.publish(channel, msg);
}
totalMessagesRef.value++;
} catch (err) {
console.error(`Error publishing to channel ${channel}:`, err);
}
}
}
} finally {
// Clean shutdown - disconnect the client
if (verbose) {
console.log(`Publisher ${clientName} shutting down...`);
}
try {
duplicatedClient.disconnect();
if (verbose) {
console.log(`Publisher ${clientName} disconnected successfully`);
}
} catch (err) {
console.error(`Error disconnecting publisher ${clientName}:`, err);
}
}
}

module.exports = { publisherRoutine };
Loading