diff --git a/.gitignore b/.gitignore index 6bf9353..569eee8 100644 --- a/.gitignore +++ b/.gitignore @@ -83,10 +83,7 @@ pubsub-sub-bench ehthumbs.db Thumbs.db - -# Json Results # -################ -*.json +package-lock.json # Coverage Results # #################### diff --git a/js/.gitignore b/js/ioredis/.gitignore similarity index 100% rename from js/.gitignore rename to js/ioredis/.gitignore diff --git a/js/.prettierrc b/js/ioredis/.prettierrc similarity index 100% rename from js/.prettierrc rename to js/ioredis/.prettierrc diff --git a/js/README.md b/js/ioredis/README.md similarity index 92% rename from js/README.md rename to js/ioredis/README.md index f52cc98..8358016 100644 --- a/js/README.md +++ b/js/ioredis/README.md @@ -10,5 +10,5 @@ Supports both **standalone** and **Redis OSS Cluster** modes, with support for ` ## ๐Ÿ“ฆ Installation ```bash -cd pubsub-sub-bench/js +cd pubsub-sub-bench/js/ioredis npm install \ No newline at end of file diff --git a/js/bin/pubsub-sub-bench.js b/js/ioredis/bin/pubsub-sub-bench.js similarity index 100% rename from js/bin/pubsub-sub-bench.js rename to js/ioredis/bin/pubsub-sub-bench.js diff --git a/js/lib/config.js b/js/ioredis/lib/config.js similarity index 100% rename from js/lib/config.js rename to js/ioredis/lib/config.js diff --git a/js/lib/metrics.js b/js/ioredis/lib/metrics.js similarity index 75% rename from js/lib/metrics.js rename to js/ioredis/lib/metrics.js index c59d6b8..fa873ac 100644 --- a/js/lib/metrics.js +++ b/js/ioredis/lib/metrics.js @@ -1,6 +1,35 @@ const fs = require('fs'); const hdr = require('hdr-histogram-js'); +// Simple accumulator for RTT stats per tick +class RttAccumulator { + constructor() { + this.reset(); + } + + reset() { + this.sum = 0; + this.count = 0; + } + + add(value) { + this.sum += value; + this.count++; + } + + getAverage() { + return this.count > 0 ? this.sum / this.count : null; + } +} + +function createRttHistogram() { + return hdr.build({ + lowestDiscernibleValue: 1, + highestTrackableValue: 10_000_000, + numberOfSignificantValueDigits: 3 + }); +} + function formatRow(row) { const widths = [6, 15, 14, 14, 22, 14]; return row.map((val, i) => String(val).padEnd(widths[i] || 10)).join(''); @@ -18,8 +47,8 @@ function updateCLI( totalSubscribedRef, totalPublishersRef, messageRateTs, - rttValues, - rttArchive + rttAccumulator, + rttHistogram ) { return new Promise((resolve) => { let prevTime = Date.now(); @@ -66,12 +95,11 @@ function updateCLI( let avgRttMs = null; if (measureRTT) { - const tickRttValues = rttValues.splice(0); - if (tickRttValues.length > 0) { - const sum = tickRttValues.reduce((a, b) => a + b, 0n); - const avgRtt = Number(sum) / tickRttValues.length; - avgRttMs = avgRtt / 1000; + if (rttAccumulator.count > 0) { + avgRttMs = rttAccumulator.getAverage(); metrics.push(avgRttMs.toFixed(3)); + // Reset accumulator after using the values + rttAccumulator.reset(); } else { metrics.push('--'); } @@ -119,10 +147,10 @@ function writeFinalResults( totalSubscribed, messageRateTs, rttValues, - rttArchive, + rttHistogram, perSecondStats ) { - const duration = (end - start) / 1000; + const duration = (end - start)/1000; const messageRate = totalMessages / duration; console.log('#################################################'); @@ -131,7 +159,7 @@ function writeFinalResults( console.log(`Message Rate: ${messageRate.toFixed(6)} msg/sec`); const result = { - StartTime: Math.floor(start / 1000), + StartTime: Math.floor(start), Duration: duration, Mode: mode, MessageRate: messageRate, @@ -148,22 +176,11 @@ function writeFinalResults( }; if (argv['measure-rtt-latency'] && !mode.includes('publish')) { - const histogram = hdr.build({ - lowestDiscernibleValue: 1, - highestTrackableValue: 10_000_000, - numberOfSignificantValueDigits: 3 - }); - - rttArchive.forEach((rtt) => { - const val = Number(rtt); - if (val >= 0) histogram.recordValue(val); - }); - - const avgRtt = histogram.mean / 1000; - const p50 = histogram.getValueAtPercentile(50) / 1000; - const p95 = histogram.getValueAtPercentile(95) / 1000; - const p99 = histogram.getValueAtPercentile(99) / 1000; - const p999 = histogram.getValueAtPercentile(99.9) / 1000; + const avgRtt = rttHistogram.mean; + const p50 = rttHistogram.getValueAtPercentile(50); + const p95 = rttHistogram.getValueAtPercentile(95); + const p99 = rttHistogram.getValueAtPercentile(99); + const p999 = rttHistogram.getValueAtPercentile(99.9); result.RTTSummary = { AvgMs: Number(avgRtt.toFixed(3)), @@ -171,15 +188,15 @@ function writeFinalResults( P95Ms: Number(p95.toFixed(3)), P99Ms: Number(p99.toFixed(3)), P999Ms: Number(p999.toFixed(3)), - totalCount: histogram.totalCount + totalCount: rttHistogram.totalCount }; - console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`); - console.log(`P50 RTT ${p50.toFixed(3)} ms`); - console.log(`P95 RTT ${p95.toFixed(3)} ms`); - console.log(`P99 RTT ${p99.toFixed(3)} ms`); - console.log(`P999 RTT ${p999.toFixed(3)} ms`); - console.log(`Total Messages tracked latency ${histogram.totalCount} messages`); + console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`); + console.log(`P50 RTT ${p50.toFixed(3)} ms`); + console.log(`P95 RTT ${p95.toFixed(3)} ms`); + console.log(`P99 RTT ${p99.toFixed(3)} ms`); + console.log(`P999 RTT ${p999.toFixed(3)} ms`); + console.log(`Total Messages tracked latency ${rttHistogram.totalCount} messages`); } console.log('#################################################'); @@ -192,5 +209,7 @@ function writeFinalResults( module.exports = { updateCLI, - writeFinalResults + writeFinalResults, + createRttHistogram, + RttAccumulator }; diff --git a/js/ioredis/lib/publisher.js b/js/ioredis/lib/publisher.js new file mode 100644 index 0000000..8c65a5a --- /dev/null +++ b/js/ioredis/lib/publisher.js @@ -0,0 +1,65 @@ +async function publisherRoutine( + clientName, + channels, + mode, + measureRTT, + verbose, + dataSize, + client, + isRunningRef, + totalMessagesRef, + rateLimiter +) { + if (verbose) { + console.log( + `Publisher ${clientName} started. Mode: ${mode} | Channels: ${channels.length} | Payload: ${ + measureRTT ? 'RTT timestamp' : `fixed size ${dataSize} bytes` + }` + ); + } + + const payload = !measureRTT ? 'A'.repeat(dataSize) : ''; + const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher + + try { + while (isRunningRef.value) { + for (const channel of channels) { + try { + // Apply rate limiting if configured + if (rateLimiter) { + await rateLimiter.removeTokens(1); + } + + let msg = payload; + if (measureRTT) { + msg = Date.now().toString(); + } + + if (mode === 'spublish') { + await duplicatedClient.spublish(channel, msg); + } else { + await duplicatedClient.publish(channel, msg); + } + totalMessagesRef.value++; + } catch (err) { + console.error(`Error publishing to channel ${channel}:`, err); + } + } + } + } finally { + // Clean shutdown - disconnect the client + if (verbose) { + console.log(`Publisher ${clientName} shutting down...`); + } + try { + duplicatedClient.disconnect(); + if (verbose) { + console.log(`Publisher ${clientName} disconnected successfully`); + } + } catch (err) { + console.error(`Error disconnecting publisher ${clientName}:`, err); + } + } +} + +module.exports = { publisherRoutine }; diff --git a/js/lib/redisManager.js b/js/ioredis/lib/redisManager.js similarity index 60% rename from js/lib/redisManager.js rename to js/ioredis/lib/redisManager.js index d0ed84d..9ac6c7a 100644 --- a/js/lib/redisManager.js +++ b/js/ioredis/lib/redisManager.js @@ -2,7 +2,7 @@ const Redis = require('ioredis'); const clusterKeySlot = require('cluster-key-slot'); const { publisherRoutine } = require('./publisher'); const { subscriberRoutine } = require('./subscriber'); -const { updateCLI, writeFinalResults } = require('./metrics'); +const { updateCLI, writeFinalResults, createRttHistogram, RttAccumulator } = require('./metrics'); const seedrandom = require('seedrandom'); async function runBenchmark(argv) { @@ -25,8 +25,11 @@ async function runBenchmark(argv) { const totalConnectsRef = { value: 0 }; const isRunningRef = { value: true }; const messageRateTs = []; - const rttValues = []; - const rttArchive = []; + + // Create efficient RTT tracking + const rttAccumulator = argv['measure-rtt-latency'] ? new RttAccumulator() : null; + // Create histogram for RTT recording + const rttHistogram = argv['measure-rtt-latency'] ? createRttHistogram() : null; const redisOptions = { host: argv.host, @@ -48,11 +51,12 @@ async function runBenchmark(argv) { let clients = []; let nodeAddresses = []; let slotClientMap = new Map(); + let cluster = null; console.log(`Using ${argv['slot-refresh-interval']} slot-refresh-interval`); console.log(`Using ${argv['redis-timeout']} redis-timeout`); if (argv['oss-cluster-api-distribute-subscribers']) { - const cluster = new Redis.Cluster( + cluster = new Redis.Cluster( [ { host: argv.host, @@ -95,10 +99,12 @@ async function runBenchmark(argv) { } nodeAddresses.push(`${ip}:${port}`); - console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`); + clients.push(client); } + console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`); } else { const client = new Redis(redisOptions); + clients.push(client); // Redis Cluster hash slots range: 0 - 16383 for (let slot = 0; slot <= 16383; slot++) { slotClientMap.set(slot, client); @@ -111,7 +117,7 @@ async function runBenchmark(argv) { const totalChannels = argv['channel-maximum'] - argv['channel-minimum'] + 1; const totalSubscriptions = totalChannels * argv['subscribers-per-channel']; const totalExpectedMessages = totalSubscriptions * argv.messages; - const subscriptionsPerNode = Math.ceil(totalSubscriptions / clients.length); + const subscriptionsPerNode = Math.ceil(totalSubscriptions / nodeAddresses.length); if (argv['pool-size'] === 0) { redisOptions.connectionPoolSize = subscriptionsPerNode; @@ -128,7 +134,60 @@ async function runBenchmark(argv) { const promises = []; - if (argv.mode.includes('subscribe')) { + + if (argv.mode.includes('publish')) { + // Run publishers + totalPublishersRef.value = argv.clients; + console.log(`Starting ${argv.clients} publishers in ${argv.mode} mode`); + + for (let clientId = 1; clientId <= argv.clients; clientId++) { + const channels = []; + const numChannels = pickChannelCount(argv); + + for (let i = 0; i < numChannels; i++) { + const channelId = randomChannel(argv); + const channelName = `${argv['subscriber-prefix']}${channelId}`; + channels.push(channelName); + } + + const publisherName = `publisher#${clientId}`; + let client; + + if (argv.mode === 'spublish' && argv['oss-cluster-api-distribute-subscribers']) { + // For sharded publish in cluster mode, get the appropriate client for the first channel + const slot = clusterKeySlot(channels[0]); + client = slotClientMap.get(slot); + } else { + // For regular publish or non-cluster, round-robin assignment + client = clients[clientId % clients.length]; + } + + if (argv.verbose) { + console.log(`Publisher ${clientId} targeting channels ${channels}`); + } + + promises.push( + publisherRoutine( + publisherName, + channels, + argv.mode, + argv['measure-rtt-latency'], + argv.verbose, + argv['data-size'], + client, + isRunningRef, + totalMessagesRef + ) + ); + + totalConnectsRef.value++; + + if (clientId % 100 === 0) { + console.log(`Created ${clientId} publishers so far.`); + } + } + } else if (argv.mode.includes('subscribe')) { + // Only run subscribers if (argv['subscribers-placement-per-channel'] === 'dense') { for (let clientId = 1; clientId <= argv.clients; clientId++) { const channels = []; @@ -166,13 +225,13 @@ async function runBenchmark(argv) { argv['measure-rtt-latency'], client, isRunningRef, - rttValues, - rttArchive, + rttAccumulator, + rttHistogram, totalMessagesRef, totalSubscribedRef, totalConnectsRef, argv.verbose, - argv.cliens + argv.clients ) ); } @@ -182,39 +241,70 @@ async function runBenchmark(argv) { process.exit(1); } - const { startTime, now, perSecondStats } = await updateCLI( - argv['client-update-tick'], - argv.messages > 0 ? totalExpectedMessages : 0, - argv['test-time'], - argv['measure-rtt-latency'], - argv.mode, - isRunningRef, - totalMessagesRef, - totalConnectsRef, - totalSubscribedRef, - totalPublishersRef, - messageRateTs, - rttValues, - rttArchive, - () => {} // no-op, outputResults is handled after await - ); + try { + const { startTime, now, perSecondStats } = await updateCLI( + argv['client-update-tick'], + argv.messages > 0 ? totalExpectedMessages : 0, + argv['test-time'], + argv['measure-rtt-latency'], + argv.mode, + isRunningRef, + totalMessagesRef, + totalConnectsRef, + totalSubscribedRef, + totalPublishersRef, + messageRateTs, + rttAccumulator, + rttHistogram, + () => {} // no-op, outputResults is handled after await + ); - // Wait for all routines to finish - await Promise.all(promises); - - // THEN output final results - writeFinalResults( - startTime, - now, - argv, - argv.mode, - totalMessagesRef.value, - totalSubscribedRef.value, - messageRateTs, - rttValues, - rttArchive, - perSecondStats - ); + // Wait for all routines to finish + console.log('Waiting for all clients to shut down cleanly...'); + await Promise.all(promises); + + // THEN output final results + writeFinalResults( + startTime, + now, + argv, + argv.mode, + totalMessagesRef.value, + totalSubscribedRef.value, + messageRateTs, + rttAccumulator, + rttHistogram, + perSecondStats + ); + } finally { + // Clean shutdown of primary clients + console.log('Shutting down primary Redis connections...'); + + // Close cluster client if it exists + if (cluster) { + try { + await cluster.quit(); + console.log('Cluster client disconnected successfully'); + } catch (err) { + console.error('Error disconnecting cluster client:', err); + } + } + + // Close all standalone clients + const disconnectPromises = clients.map(async (client, i) => { + try { + await client.quit(); + if (argv.verbose) { + console.log(`Node client #${i} disconnected successfully`); + } + } catch (err) { + console.error(`Error disconnecting node client #${i}:`, err); + } + }); + + await Promise.all(disconnectPromises); + console.log('All Redis connections closed'); + } // cleanly exit the process once done process.exit(0); diff --git a/js/lib/subscriber.js b/js/ioredis/lib/subscriber.js similarity index 88% rename from js/lib/subscriber.js rename to js/ioredis/lib/subscriber.js index a7ed844..484073d 100644 --- a/js/lib/subscriber.js +++ b/js/ioredis/lib/subscriber.js @@ -1,5 +1,5 @@ function safeBigIntUs() { - return process.hrtime.bigint() / 1000n; + return Date.now(); } async function subscriberRoutine( @@ -11,8 +11,8 @@ async function subscriberRoutine( measureRTT, client, isRunningRef, - rttValues, - rttArchive, + rttAccumulator, + rttHistogram, totalMessagesRef, totalSubscribedRef, totalConnectsRef, @@ -70,15 +70,20 @@ async function subscriberRoutine( if (measureRTT) { try { - const now = BigInt(Date.now()) * 1000n; - const timestamp = BigInt(message); // ยตs + const now = Date.now(); + const timestamp = Number(message); // Timestamp from publisher const rtt = now - timestamp; - - if (rtt >= 0n) { - rttValues.push(rtt); - rttArchive.push(rtt); + if (rtt >= 0) { + // Add to accumulator for per-tick average calculation + if (rttAccumulator) { + rttAccumulator.add(rtt); + } + // Record directly to histogram for final stats + if (rttHistogram) { + rttHistogram.recordValue(rtt); + } if (verbose) { - console.log(`[${clientName}] RTT: ${rtt} ยตs`); + console.log(`[${clientName}] RTT: ${rtt} ms`); } } else { console.warn(`[${clientName}] Skipping negative RTT: now=${now}, ts=${timestamp}`); diff --git a/js/lib/publisher.js b/js/lib/publisher.js deleted file mode 100644 index 58bfb1a..0000000 --- a/js/lib/publisher.js +++ /dev/null @@ -1,43 +0,0 @@ -async function publisherRoutine( - clientName, - channels, - mode, - measureRTT, - verbose, - dataSize, - client, - isRunningRef, - totalMessagesRef -) { - if (verbose) { - console.log( - `Publisher ${clientName} started. Mode: ${mode} | Channels: ${channels.length} | Payload: ${ - measureRTT ? 'RTT timestamp' : `fixed size ${dataSize} bytes` - }` - ); - } - - const payload = !measureRTT ? 'A'.repeat(dataSize) : ''; - - while (isRunningRef.value) { - let msg = payload; - if (measureRTT) { - msg = process.hrtime.bigint() / 1000; - } - - for (const channel of channels) { - try { - if (mode === 'spublish') { - await client.spublish(channel, msg); - } else { - await client.publish(channel, msg); - } - totalMessagesRef.value++; - } catch (err) { - console.error(`Error publishing to channel ${channel}:`, err); - } - } - } -} - -module.exports = { publisherRoutine }; diff --git a/js/node-redis/.gitignore b/js/node-redis/.gitignore new file mode 100644 index 0000000..147eab4 --- /dev/null +++ b/js/node-redis/.gitignore @@ -0,0 +1,34 @@ +# Node modules +node_modules/ +npm-debug.log* +yarn-debug.log* +yarn-error.log* + +# Output files +*.log +*.out +*.tmp +*.pid + +# OS junk +.DS_Store +Thumbs.db + +# Environment & config +.env +.env.local +.env.* + +# Editor folders +.vscode/ +.idea/ +*.swp + +# Benchmark result output +results/ +benchmark-*.json + +# Transpiled stuff (if used later) +dist/ +build/ + diff --git a/js/node-redis/README.md b/js/node-redis/README.md new file mode 100644 index 0000000..7c7bcd4 --- /dev/null +++ b/js/node-redis/README.md @@ -0,0 +1,54 @@ +# pubsub-sub-bench (node-redis Edition) + +High-performance **Redis Pub/Sub benchmark tool**, written in Node.js with the node-redis client. +Supports both **standalone** and **Redis OSS Cluster** modes, with support for `PUBLISH`, `SPUBLISH`, `SUBSCRIBE`, and `SSUBSCRIBE`. + +> Part of the [redis-performance/pubsub-sub-bench](https://github.com/redis-performance/pubsub-sub-bench) suite. + +--- + +## ๐Ÿ“ฆ Installation + +```bash +cd pubsub-sub-bench/js/node-redis +npm install +``` + +## ๐Ÿš€ Usage + +```bash +# Run a basic subscriber benchmark +node bin/pubsub-sub-bench.js --mode subscribe --clients 50 --messages 1000 + +# Run a basic publisher benchmark +node bin/pubsub-sub-bench.js --mode publish --clients 10 --messages 1000 + +# Run with custom Redis connection +node bin/pubsub-sub-bench.js --host redis.example.com --port 6379 --a mypassword +``` + +### ๐Ÿ“‹ Command Line Arguments + +| Parameter | Description | Default | +|-----------|-------------|---------| +| `--host` | Redis host | 127.0.0.1 | +| `--port` | Redis port | 6379 | +| `--a` | Password for Redis Auth | "" | +| `--user` | ACL-style AUTH username | "" | +| `--data-size` | Payload size in bytes | 128 | +| `--mode` | Mode: subscribe/ssubscribe/publish/spublish | subscribe | +| `--subscribers-placement-per-channel` | dense/sparse | dense | +| `--channel-minimum` | Min channel ID | 1 | +| `--channel-maximum` | Max channel ID | 100 | +| `--subscribers-per-channel` | Subscribers per channel | 1 | +| `--clients` | Number of connections | 50 | +| `--min-number-channels-per-subscriber` | Minimum channels per subscriber | 1 | +| `--max-number-channels-per-subscriber` | Maximum channels per subscriber | 1 | +| `--messages` | Number of messages to send | 0 (unlimited) | +| `--json-out-file` | Output file for JSON results | "" | +| `--test-time` | Test duration in seconds | 0 (unlimited) | +| `--rand-seed` | Random seed | 12345 | +| `--subscriber-prefix` | Channel name prefix | "channel-" | +| `--measure-rtt-latency` | Measure RTT latency | false | +| `--print-messages` | Print received messages | false | +| `--verbose` | Enable verbose logging | false | \ No newline at end of file diff --git a/js/node-redis/bin/pubsub-sub-bench.js b/js/node-redis/bin/pubsub-sub-bench.js new file mode 100755 index 0000000..6eaccc1 --- /dev/null +++ b/js/node-redis/bin/pubsub-sub-bench.js @@ -0,0 +1,14 @@ +#!/usr/bin/env node +// filepath: /Users/hristo.temelski/code/etc/pubsub-sub-bench/js/node-redis/bin/pubsub-sub-bench.js + +const { parseArgs } = require('../lib/config'); +const { runBenchmark } = require('../lib/redisManager'); + +// Parse command line arguments +const argv = parseArgs(); + +// Run the benchmark +runBenchmark(argv).catch(err => { + console.error('Benchmark execution error:', err); + process.exit(1); +}); \ No newline at end of file diff --git a/js/node-redis/lib/config.js b/js/node-redis/lib/config.js new file mode 100644 index 0000000..280971b --- /dev/null +++ b/js/node-redis/lib/config.js @@ -0,0 +1,43 @@ +// filepath: /Users/hristo.temelski/code/etc/pubsub-sub-bench/js/node-redis/lib/config.js +const yargs = require('yargs'); + +function parseArgs() { + return yargs + .option('host', { description: 'Redis host', default: '127.0.0.1' }) + .option('port', { description: 'Redis port', default: '6379' }) + .option('a', { description: 'Password for Redis Auth', default: '' }) + .option('user', { description: 'ACL-style AUTH username', default: '' }) + .option('data-size', { description: 'Payload size in bytes', default: 128 }) + .option('mode', { + description: 'Mode: subscribe | ssubscribe | publish | spublish', + default: 'subscribe' + }) + .option('subscribers-placement-per-channel', { + description: 'dense | sparse', + default: 'dense' + }) + .option('channel-minimum', { description: 'Min channel ID', default: 1 }) + .option('channel-maximum', { description: 'Max channel ID', default: 100 }) + .option('subscribers-per-channel', { description: 'Subscribers per channel', default: 1 }) + .option('clients', { description: 'Number of connections', default: 50 }) + .option('min-number-channels-per-subscriber', { default: 1 }) + .option('max-number-channels-per-subscriber', { default: 1 }) + .option('min-reconnect-interval', { default: 0 }) + .option('max-reconnect-interval', { default: 0 }) + .option('messages', { default: 0 }) + .option('json-out-file', { default: '' }) + .option('client-update-tick', { default: 1 }) + .option('test-time', { default: 0 }) + .option('rand-seed', { default: 12345 }) + .option('subscriber-prefix', { default: 'channel-' }) + .option('oss-cluster-api-distribute-subscribers', { default: false }) + .option('slot-refresh-interval', { default: -1 }) + .option('print-messages', { default: false }) + .option('verbose', { default: false }) + .option('measure-rtt-latency', { default: false }) + .option('redis-timeout', { default: 120000 }) + .option('pool-size', { default: 0 }) + .help().argv; +} + +module.exports = { parseArgs }; \ No newline at end of file diff --git a/js/node-redis/lib/metrics.js b/js/node-redis/lib/metrics.js new file mode 100644 index 0000000..6464f09 --- /dev/null +++ b/js/node-redis/lib/metrics.js @@ -0,0 +1,215 @@ +const fs = require('fs'); +const hdr = require('hdr-histogram-js'); + +// Simple accumulator for RTT stats per tick +class RttAccumulator { + constructor() { + this.reset(); + } + + reset() { + this.sum = 0; + this.count = 0; + } + + add(value) { + this.sum += value; + this.count++; + } + + getAverage() { + return this.count > 0 ? this.sum / this.count : null; + } +} + +function createRttHistogram() { + return hdr.build({ + lowestDiscernibleValue: 1, + highestTrackableValue: 10_000_000, + numberOfSignificantValueDigits: 3 + }); +} + +function formatRow(row) { + const widths = [6, 15, 14, 14, 22, 14]; + return row.map((val, i) => String(val).padEnd(widths[i] || 10)).join(''); +} + +function updateCLI( + updateInterval, + messageLimit, + testTime, + measureRTT, + mode, + isRunningRef, + totalMessagesRef, + totalConnectsRef, + totalSubscribedRef, + totalPublishersRef, + messageRateTs, + rttAccumulator, + rttHistogram +) { + return new Promise((resolve) => { + let prevTime = Date.now(); + let prevMessageCount = 0; + let prevConnectCount = 0; + let startTime = Date.now(); + let resolved = false; + + console.log('Starting benchmark...'); + + const header = ['Time', 'Total Messages', 'Message Rate', 'Connect Rate']; + header.push(mode.includes('subscribe') ? 'Active Subscriptions' : 'Active Publishers'); + if (measureRTT) header.push('Avg RTT (ms)'); + console.log(formatRow(header)); + const perSecondStats = []; + + const interval = setInterval(() => { + const now = Date.now(); + const elapsed = (now - prevTime) / 1000; + + const messageRate = (totalMessagesRef.value - prevMessageCount) / elapsed; + const connectRate = (totalConnectsRef.value - prevConnectCount) / elapsed; + + if (prevMessageCount === 0 && totalMessagesRef.value !== 0) { + startTime = Date.now(); + } + + if (totalMessagesRef.value !== 0) { + messageRateTs.push(messageRate); + } + + prevMessageCount = totalMessagesRef.value; + prevConnectCount = totalConnectsRef.value; + prevTime = now; + + const metrics = [ + Math.floor((now - startTime) / 1000), + totalMessagesRef.value, + messageRate.toFixed(2), + connectRate.toFixed(2), + mode.includes('subscribe') ? totalSubscribedRef.value : totalPublishersRef.value + ]; + + let avgRttMs = null; + + if (measureRTT) { + if (rttAccumulator.count > 0) { + avgRttMs = rttAccumulator.getAverage(); + metrics.push(avgRttMs.toFixed(3)); + // Reset accumulator after using the values + rttAccumulator.reset(); + } else { + metrics.push('--'); + } + } + + perSecondStats.push({ + second: Math.floor((now - startTime) / 1000), + messages: totalMessagesRef.value, + messageRate: Number(messageRate.toFixed(2)), + avgRttMs: avgRttMs !== null ? Number(avgRttMs.toFixed(3)) : null + }); + + console.log(formatRow(metrics)); + + const shouldStop = + (messageLimit > 0 && totalMessagesRef.value >= messageLimit) || + (testTime > 0 && now - startTime >= testTime * 1000 && totalMessagesRef.value !== 0); + + if (shouldStop && !resolved) { + resolved = true; + clearInterval(interval); + isRunningRef.value = false; + resolve({ startTime, now, perSecondStats }); + } + }, updateInterval * 1000); + + process.on('SIGINT', () => { + if (!resolved) { + console.log('\nReceived Ctrl-C - shutting down'); + clearInterval(interval); + isRunningRef.value = false; + resolved = true; + resolve({ startTime, now: Date.now(), perSecondStats, sigint: true }); + } + }); + }); +} + +function writeFinalResults( + start, + end, + argv, + mode, + totalMessages, + totalSubscribed, + messageRateTs, + rttValues, + rttHistogram, + perSecondStats +) { + const duration = (end - start)/1000; + const messageRate = totalMessages / duration; + + console.log('#################################################'); + console.log(`Mode: ${mode}`); + console.log(`Total Duration: ${duration.toFixed(6)} Seconds`); + console.log(`Message Rate: ${messageRate.toFixed(6)} msg/sec`); + + const result = { + StartTime: Math.floor(start), + Duration: duration, + Mode: mode, + MessageRate: messageRate, + TotalMessages: totalMessages, + TotalSubscriptions: totalSubscribed, + ChannelMin: argv['channel-minimum'], + ChannelMax: argv['channel-maximum'], + SubscribersPerChannel: argv['subscribers-per-channel'], + MessagesPerChannel: argv['messages'], + MessageRateTs: messageRateTs, + OSSDistributedSlots: argv['oss-cluster-api-distribute-subscribers'], + Addresses: [`${argv.host}:${argv.port}`], + PerSecondStats: perSecondStats + }; + + if (argv['measure-rtt-latency'] && !mode.includes('publish')) { + const avgRtt = rttHistogram.mean; + const p50 = rttHistogram.getValueAtPercentile(50); + const p95 = rttHistogram.getValueAtPercentile(95); + const p99 = rttHistogram.getValueAtPercentile(99); + const p999 = rttHistogram.getValueAtPercentile(99.9); + + result.RTTSummary = { + AvgMs: Number(avgRtt.toFixed(3)), + P50Ms: Number(p50.toFixed(3)), + P95Ms: Number(p95.toFixed(3)), + P99Ms: Number(p99.toFixed(3)), + P999Ms: Number(p999.toFixed(3)), + totalCount: rttHistogram.totalCount + }; + + console.log(`Avg RTT ${avgRtt.toFixed(3)} ms`); + console.log(`P50 RTT ${p50.toFixed(3)} ms`); + console.log(`P95 RTT ${p95.toFixed(3)} ms`); + console.log(`P99 RTT ${p99.toFixed(3)} ms`); + console.log(`P999 RTT ${p999.toFixed(3)} ms`); + console.log(`Total Messages tracked latency ${rttHistogram.totalCount} messages`); + } + + console.log('#################################################'); + + if (argv['json-out-file']) { + fs.writeFileSync(argv['json-out-file'], JSON.stringify(result, null, 2)); + console.log(`Results written to ${argv['json-out-file']}`); + } +} + +module.exports = { + updateCLI, + writeFinalResults, + createRttHistogram, + RttAccumulator +}; \ No newline at end of file diff --git a/js/node-redis/lib/publisher.js b/js/node-redis/lib/publisher.js new file mode 100644 index 0000000..95f02b2 --- /dev/null +++ b/js/node-redis/lib/publisher.js @@ -0,0 +1,80 @@ +// filepath: /Users/hristo.temelski/code/etc/pubsub-sub-bench/js/node-redis/lib/publisher.js +const { createClient } = require('redis'); + +async function publisherRoutine( + clientName, + channels, + mode, + measureRTT, + verbose, + dataSize, + redisOptions, + isRunningRef, + totalMessagesRef, + rateLimiter +) { + if (verbose) { + console.log( + `Publisher ${clientName} started. Mode: ${mode} | Channels: ${channels.length} | Payload: ${ + measureRTT ? 'RTT timestamp' : `fixed size ${dataSize} bytes` + }` + ); + } + + const payload = !measureRTT ? 'A'.repeat(dataSize) : ''; + + // Create a new Redis client + const client = createClient(redisOptions); + + // Set up error handling + client.on('error', (err) => { + console.error(`[${clientName}] Redis error: ${err.message}`); + }); + + try { + // Connect to Redis + await client.connect(); + + while (isRunningRef.value) { + for (const channel of channels) { + try { + // Apply rate limiting if configured + if (rateLimiter) { + await rateLimiter.removeTokens(1); + } + + let msg = payload; + if (measureRTT) { + msg = Date.now().toString(); + } + + if (mode === 'spublish') { + await client.sPublish(channel, msg); + } else { + await client.publish(channel, msg); + } + totalMessagesRef.value++; + } catch (err) { + console.error(`[${clientName}] Error publishing to channel ${channel}:`, err); + } + } + } + } catch (err) { + console.error(`[${clientName}] Redis connection error:`, err); + } finally { + // Clean shutdown - disconnect the client + if (verbose) { + console.log(`Publisher ${clientName} shutting down...`); + } + try { + await client.quit(); + if (verbose) { + console.log(`Publisher ${clientName} disconnected successfully`); + } + } catch (err) { + console.error(`Error disconnecting publisher ${clientName}:`, err); + } + } +} + +module.exports = { publisherRoutine }; \ No newline at end of file diff --git a/js/node-redis/lib/redisManager.js b/js/node-redis/lib/redisManager.js new file mode 100644 index 0000000..7af43ad --- /dev/null +++ b/js/node-redis/lib/redisManager.js @@ -0,0 +1,346 @@ +// filepath: /Users/hristo.temelski/code/etc/pubsub-sub-bench/js/node-redis/lib/redisManager.js +const { createClient, createCluster } = require('redis'); +const { publisherRoutine } = require('./publisher'); +const { subscriberRoutine } = require('./subscriber'); +const { updateCLI, writeFinalResults, createRttHistogram, RttAccumulator } = require('./metrics'); +const seedrandom = require('seedrandom'); + +async function runBenchmark(argv) { + console.log(`pubsub-sub-bench (node-redis version)`); + console.log(`Using random seed: ${argv['rand-seed']}`); + Math.random = seedrandom(argv['rand-seed'].toString()); + + if (argv['measure-rtt-latency']) { + console.log('RTT measurement enabled.'); + } + + if (argv.verbose) { + console.log('Verbose mode enabled.'); + } + + // Shared mutable state (as references) + const totalMessagesRef = { value: 0 }; + const totalSubscribedRef = { value: 0 }; + const totalPublishersRef = { value: 0 }; + const totalConnectsRef = { value: 0 }; + const isRunningRef = { value: true }; + const messageRateTs = []; + + // Create efficient RTT tracking + const rttAccumulator = argv['measure-rtt-latency'] ? new RttAccumulator() : null; + // Create histogram for RTT recording + const rttHistogram = argv['measure-rtt-latency'] ? createRttHistogram() : null; + + // Configure Redis client options + const redisOptions = { + socket: { + host: argv.host, + port: argv.port, + connectTimeout: argv['redis-timeout'], + reconnectStrategy: false // disable auto-reconnect + }, + password: argv.a || undefined, + username: argv.user || undefined + }; + + let clientsMap = new Map(); + let nodeAddresses = []; + let isClusterMode = false; + + if (argv['oss-cluster-api-distribute-subscribers']) { + isClusterMode = true; + console.log('Using Redis Cluster mode'); + + try { + // Create a test cluster client to get nodes + const testCluster = createCluster({ + rootNodes: [ + { + socket: { + host: argv.host, + port: argv.port + }, + password: argv.a || undefined, + username: argv.user || undefined + } + ] + }); + + await testCluster.connect(); + + // Get cluster slots information to determine node distribution + const slots = await testCluster.clusterSlots(); + await testCluster.quit(); + + if (!slots || Object.keys(slots).length === 0) { + throw new Error('Cluster has no slot assignments. Check node health.'); + } + + // Create a map of slots to nodes + for (const slotRange of Object.values(slots)) { + const [startSlot, endSlot] = slotRange.slots; + const { host, port } = slotRange.master; + + // Create a standalone client for each node + const nodeId = `${host}:${port}`; + if (!clientsMap.has(nodeId)) { + const nodeOptions = { + socket: { + host, + port, + connectTimeout: argv['redis-timeout'], + reconnectStrategy: false + }, + password: argv.a || undefined, + username: argv.user || undefined + }; + + nodeAddresses.push(nodeId); + clientsMap.set(nodeId, nodeOptions); + } + + // Map each slot to this node + for (let slot = startSlot; slot <= endSlot; slot++) { + clientsMap.set(`slot:${slot}`, nodeId); + } + } + + console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`); + } catch (err) { + console.error('Failed to initialize cluster mode:', err); + process.exit(1); + } + } else { + // Single node mode + nodeAddresses.push(`${argv.host}:${argv.port}`); + clientsMap.set(nodeAddresses[0], redisOptions); + console.log('Standalone mode - using single Redis instance'); + } + + const totalChannels = argv['channel-maximum'] - argv['channel-minimum'] + 1; + const totalSubscriptions = totalChannels * argv['subscribers-per-channel']; + const totalExpectedMessages = totalSubscriptions * argv.messages; + + console.log(`Will use a subscriber prefix of: ${argv['subscriber-prefix']}`); + console.log(`Total channels: ${totalChannels}`); + console.log('Final setup used for benchmark:'); + nodeAddresses.forEach((addr, i) => { + console.log(`Node #${i}: Address: ${addr}`); + }); + + const promises = []; + + // Helper function to get a slot for a key (simplified version) + function getSlot(key) { + const crc16tab = [ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0 + ]; + + let crc = 0; + for (let i = 0; i < key.length; i++) { + crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ key.charCodeAt(i)) & 0xff]) & 0xffff; + } + return crc % 16384; // 16384 slots in Redis Cluster + } + + // Helper to get Redis options for a specific channel + function getRedisOptionsForChannel(channel) { + if (!isClusterMode) { + return redisOptions; + } + + const slot = getSlot(channel); + const nodeId = clientsMap.get(`slot:${slot}`); + return clientsMap.get(nodeId); + } + + function randomInt(min, max) { + if (min === max) return min; + return Math.floor(Math.random() * (max - min + 1)) + min; + } + + function pickChannelCount(argv) { + return randomInt( + argv['min-number-channels-per-subscriber'], + argv['max-number-channels-per-subscriber'] + ); + } + + function randomChannel(argv) { + return ( + Math.floor(Math.random() * (argv['channel-maximum'] - argv['channel-minimum'] + 1)) + + argv['channel-minimum'] + ); + } + + if (argv.mode.includes('publish')) { + // Run publishers + totalPublishersRef.value = argv.clients; + console.log(`Starting ${argv.clients} publishers in ${argv.mode} mode`); + + for (let clientId = 1; clientId <= argv.clients; clientId++) { + const channels = []; + const numChannels = pickChannelCount(argv); + + for (let i = 0; i < numChannels; i++) { + const channelId = randomChannel(argv); + const channelName = `${argv['subscriber-prefix']}${channelId}`; + channels.push(channelName); + } + + const publisherName = `publisher#${clientId}`; + const channelOptions = getRedisOptionsForChannel(channels[0]); + + if (argv.verbose) { + console.log(`Publisher ${clientId} targeting channels ${channels}`); + } + + promises.push( + publisherRoutine( + publisherName, + channels, + argv.mode, + argv['measure-rtt-latency'], + argv.verbose, + argv['data-size'], + channelOptions, + isRunningRef, + totalMessagesRef + ) + ); + + totalConnectsRef.value++; + + if (clientId % 100 === 0) { + console.log(`Created ${clientId} publishers so far.`); + } + } + } else if (argv.mode.includes('subscribe')) { + // Only run subscribers + if (argv['subscribers-placement-per-channel'] === 'dense') { + for (let clientId = 1; clientId <= argv.clients; clientId++) { + const channels = []; + const numChannels = pickChannelCount(argv); + + for (let i = 0; i < numChannels; i++) { + const id = randomChannel(argv); + channels.push(`${argv['subscriber-prefix']}${id}`); + } + + const subscriberName = `subscriber#${clientId}`; + const channelOptions = getRedisOptionsForChannel(channels[0]); + + const reconnectInterval = randomInt( + argv['min-reconnect-interval'], + argv['max-reconnect-interval'] + ); + + if (reconnectInterval > 0) { + console.log(`Reconnect interval for ${subscriberName}: ${reconnectInterval}ms`); + } + + if (clientId % 100 === 0 || clientId === argv.clients) { + console.log(`${subscriberName} subscribing to ${channels.length} channels.`); + } + + promises.push( + subscriberRoutine( + subscriberName, + argv.mode, + channels, + argv['print-messages'], + reconnectInterval, + argv['measure-rtt-latency'], + channelOptions, + isRunningRef, + rttAccumulator, + rttHistogram, + totalMessagesRef, + totalSubscribedRef, + totalConnectsRef, + argv.verbose, + argv.clients + ) + ); + } + } + } else { + console.error(`Invalid mode '${argv.mode}'. Use: subscribe, ssubscribe, publish, spublish`); + process.exit(1); + } + + try { + const { startTime, now, perSecondStats } = await updateCLI( + argv['client-update-tick'], + argv.messages > 0 ? totalExpectedMessages : 0, + argv['test-time'], + argv['measure-rtt-latency'], + argv.mode, + isRunningRef, + totalMessagesRef, + totalConnectsRef, + totalSubscribedRef, + totalPublishersRef, + messageRateTs, + rttAccumulator, + rttHistogram, + () => {} // no-op, outputResults is handled after await + ); + + // Wait for all routines to finish + console.log('Waiting for all clients to shut down cleanly...'); + await Promise.all(promises); + + // Output final results + writeFinalResults( + startTime, + now, + argv, + argv.mode, + totalMessagesRef.value, + totalSubscribedRef.value, + messageRateTs, + rttAccumulator, + rttHistogram, + perSecondStats + ); + } catch (err) { + console.error('Benchmark error:', err); + } + + // Clean exit + process.exit(0); +} + +module.exports = { runBenchmark }; \ No newline at end of file diff --git a/js/node-redis/lib/subscriber.js b/js/node-redis/lib/subscriber.js new file mode 100644 index 0000000..418ff74 --- /dev/null +++ b/js/node-redis/lib/subscriber.js @@ -0,0 +1,150 @@ +// filepath: /Users/hristo.temelski/code/etc/pubsub-sub-bench/js/node-redis/lib/subscriber.js +const { createClient } = require('redis'); + +async function subscriberRoutine( + clientName, + mode, + channels, + printMessages, + reconnectInterval, + measureRTT, + redisOptions, + isRunningRef, + rttAccumulator, + rttHistogram, + totalMessagesRef, + totalSubscribedRef, + totalConnectsRef, + verbose, + totalClients +) { + let client = null; + let reconnectTimer = null; + + // Subscribe function + const subscribe = async () => { + try { + // If already subscribed, disconnect and create new client + if (client) { + await client.quit(); + } + + // Create a new Redis client + client = createClient(redisOptions); + + // Set up error handling + client.on('error', (err) => { + console.error(`[${clientName}] Redis error: ${err.message}`); + }); + + // Connect to Redis + await client.connect(); + + // Subscribe to channels with appropriate method + if (mode === 'ssubscribe') { + await client.sSubscribe(...channels, handleMessage); + } else { + await client.subscribe(...channels, handleMessage); + } + + totalSubscribedRef.value += channels.length; + totalConnectsRef.value++; + + if (verbose) { + console.log(`[${clientName}] Successfully subscribed to ${channels.length} channels`); + } + } catch (err) { + console.error(`[${clientName}] Subscribe error:`, err); + } + }; + + // Handler for incoming messages + const handleMessage = (message, channel) => { + if (printMessages) { + console.log(`[${clientName}] ${channel}: ${message}`); + } + + if (measureRTT) { + try { + const now = Date.now(); + const timestamp = Number(message); // Timestamp from publisher + const rtt = now - timestamp; + if (rtt >= 0) { + // Add to accumulator for per-tick average calculation + if (rttAccumulator) { + rttAccumulator.add(rtt); + } + // Record directly to histogram for final stats + if (rttHistogram) { + rttHistogram.recordValue(rtt); + } + if (verbose) { + console.log(`[${clientName}] RTT: ${rtt} ms`); + } + } else { + console.warn(`[${clientName}] Skipping negative RTT: now=${now}, ts=${timestamp}`); + } + } catch (err) { + console.error(`[${clientName}] Invalid RTT message: ${message}`, err); + } + } + totalMessagesRef.value++; + }; + + // Initial subscription + await subscribe(); + + // Set up automatic re-subscription if reconnectInterval is set + if (reconnectInterval > 0) { + reconnectTimer = setInterval(async () => { + if (isRunningRef.value) { + await subscribe(); + } + }, reconnectInterval); + } + + // Shutdown function + const shutdown = async () => { + // Clear the reconnection timer if set + if (reconnectTimer) clearInterval(reconnectTimer); + + // Attempt to unsubscribe and disconnect + try { + if (client) { + if (client.isOpen) { + if (mode === 'ssubscribe') { + for (const channel of channels) { + await client.sUnsubscribe(channel); + } + } else { + for (const channel of channels) { + await client.unsubscribe(channel); + } + } + await client.quit(); + } + } + } catch (err) { + console.warn(`[${clientName}] Shutdown error: ${err.message}`); + } + }; + + // Return a promise that waits until isRunningRef becomes false, then cleans up + return new Promise((resolve) => { + const check = setInterval(async () => { + if (!isRunningRef.value) { + clearInterval(check); + const clientId = parseInt(clientName.split('#')[1], 10); + const shouldLog = clientId % 100 === 0 || clientId === totalClients; + + if (shouldLog) console.log(`[${clientName}] Triggering shutdown...`); + + await shutdown(); + if (shouldLog) console.log(`[${clientName}] Shutdown complete.`); + resolve(); + } + }, 500); + }); +} + +module.exports = { subscriberRoutine }; \ No newline at end of file diff --git a/js/node-redis/package.json b/js/node-redis/package.json new file mode 100644 index 0000000..a9da984 --- /dev/null +++ b/js/node-redis/package.json @@ -0,0 +1,19 @@ +{ + "name": "pubsub-sub-bench-node-redis", + "version": "1.0.0", + "description": "Redis Pub/Sub benchmark tool using node-redis", + "main": "bin/pubsub-sub-bench.js", + "scripts": { + "start": "node bin/pubsub-sub-bench.js" + }, + "dependencies": { + "redis": "^4.7.0", + "yargs": "^17.7.1", + "hdr-histogram-js": "^3.0.0", + "cli-progress": "^3.11.2", + "seedrandom": "^3.0.5" + }, + "engines": { + "node": ">=16.0.0" + } +} \ No newline at end of file diff --git a/js/package.json b/js/package.json deleted file mode 100644 index a9521e9..0000000 --- a/js/package.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "name": "pubsub-sub-bench", - "version": "0.1.0", - "main": "bin/pubsub-sub-bench.js", - "license": "Apache-2.0", - "dependencies": { - "cluster-key-slot": "^1.1.0", - "hdr-histogram-js": "^2.0.1", - "ioredis": "^4.30.0", - "seedrandom": "^3.0.5", - "yargs": "^17.7.2" - }, - "devDependencies": { - "husky": "^9.1.7", - "lint-staged": "^15.5.0", - "prettier": "^3.5.3" - }, - "scripts": { - "format": "prettier --write '**/*.js'" - }, - "lint-staged": { - "js/**/*.js": "prettier --write" - } -} \ No newline at end of file diff --git a/subscriber.go b/subscriber.go index fa5cebb..bc24189 100644 --- a/subscriber.go +++ b/subscriber.go @@ -92,8 +92,8 @@ func publisherRoutine(clientName string, channels []string, mode string, measure time.Sleep(r.Delay()) } if measureRTT { - now := time.Now().UnixMicro() - msg = strconv.FormatInt(now, 10) + now := time.Now().UnixMilli() + msg = strconv.FormatInt(int64(now), 10) } var err error switch mode {