From 14ad70bc48494fefa8bef5ac577ea312842fb8f1 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Tue, 1 Jul 2025 10:21:08 -0600 Subject: [PATCH 01/10] feat: add chainhooks index notifier --- .env | 6 +++ package-lock.json | 79 ++++++++++++++++++++++++++++ package.json | 1 + src/datastore/chainhooks-notifier.ts | 59 +++++++++++++++++++++ src/datastore/common.ts | 1 + src/datastore/helpers.ts | 2 + src/datastore/pg-write-store.ts | 37 +++++++++++-- src/index.ts | 1 + 8 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 src/datastore/chainhooks-notifier.ts diff --git a/.env b/.env index 91d193035..9d9b9882b 100644 --- a/.env +++ b/.env @@ -216,3 +216,9 @@ STACKS_EVENTS_DIR=./events # SNP_REDIS_URL=redis://127.0.0.1:6379 # Only specify `SNP_REDIS_STREAM_KEY_PREFIX` if `REDIS_STREAM_KEY_PREFIX` is configured on the SNP server. # SNP_REDIS_STREAM_KEY_PREFIX= + +# If enabled this service will notify Chainhooks via Redis whenever the Stacks index advances e.g. +# whenever a new block is confirmed. This is used by Chainhooks to send block data to its subscribers. +# CHAINHOOKS_NOTIFIER_ENABLED=true +# CHAINHOOKS_REDIS_URL=redis://127.0.0.1:6379 +# CHAINHOOKS_REDIS_QUEUE=index-progress diff --git a/package-lock.json b/package-lock.json index 2a94be02b..e18136750 100644 --- a/package-lock.json +++ b/package-lock.json @@ -46,6 +46,7 @@ "fastify": "4.29.1", "fastify-metrics": "11.0.0", "getopts": "2.3.0", + "ioredis": "5.6.1", "jsonc-parser": "3.0.0", "jsonrpc-lite": "2.2.0", "lru-cache": "6.0.0", @@ -1624,6 +1625,12 @@ "deprecated": "Use @eslint/object-schema instead", "dev": true }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "license": "MIT" + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -7439,6 +7446,15 @@ "resolved": "https://registry.npmjs.org/delegates/-/delegates-1.0.0.tgz", "integrity": "sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==" }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -10339,6 +10355,30 @@ "node": ">= 0.4" } }, + "node_modules/ioredis": { + "version": "5.6.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.6.1.tgz", + "integrity": "sha512-UxC0Yv1Y4WRJiGQxQkP0hfdL0/5/6YvdfOOClRgJ0qppSarkhneSa6UvkMkms0AkdGimSH3Ikqm+6mkMmX7vGA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ip-address": { "version": "9.0.5", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", @@ -12760,6 +12800,18 @@ "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", "integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ==" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.isplainobject": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/lodash.isplainobject/-/lodash.isplainobject-4.0.6.tgz", @@ -15468,6 +15520,27 @@ "@redis/time-series": "1.1.0" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/redoc": { "version": "2.4.0", "resolved": "https://registry.npmjs.org/redoc/-/redoc-2.4.0.tgz", @@ -16645,6 +16718,12 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-16.18.104.tgz", "integrity": "sha512-OF3keVCbfPlkzxnnDBUZJn1RiCJzKeadjiW0xTEb0G1SUJ5gDVb3qnzZr2T4uIFvsbKJbXy1v2DN7e2zaEY7jQ==" }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/package.json b/package.json index 2128496f5..183a6fc56 100644 --- a/package.json +++ b/package.json @@ -126,6 +126,7 @@ "fastify": "4.29.1", "fastify-metrics": "11.0.0", "getopts": "2.3.0", + "ioredis": "5.6.1", "jsonc-parser": "3.0.0", "jsonrpc-lite": "2.2.0", "lru-cache": "6.0.0", diff --git a/src/datastore/chainhooks-notifier.ts b/src/datastore/chainhooks-notifier.ts new file mode 100644 index 000000000..5d42d199d --- /dev/null +++ b/src/datastore/chainhooks-notifier.ts @@ -0,0 +1,59 @@ +import Redis from 'ioredis'; +import { ReOrgUpdatedEntities } from './common'; +import { ChainID } from '@stacks/transactions'; +import { getApiConfiguredChainID } from '../helpers'; +import { logger } from '@hirosystems/api-toolkit'; + +/** + * Notifies Chainhooks of the progress of the Stacks index. + */ +export class ChainhooksNotifier { + private readonly redis: Redis; + private readonly chainId: ChainID; + + constructor() { + const url = process.env.CHAINHOOKS_REDIS_URL; + if (!url) throw new Error(`ChainhooksNotifier CHAINHOOKS_REDIS_URL is not set`); + this.redis = new Redis(url); + this.chainId = getApiConfiguredChainID(); + } + + /** + * Broadcast index progress message to Chainhooks Redis queue. + * @param reOrg - The re-org updated entities + * @param indexBlockHash - The index block hash that we will restore first + * @param blockHeight - The block height that we will restore first + */ + async notify(reOrg: ReOrgUpdatedEntities, indexBlockHash: string, blockHeight: number) { + const message = { + id: `stacks-${Date.now()}`, + payload: { + chain: 'stacks', + network: this.chainId === ChainID.Mainnet ? 'mainnet' : 'testnet', + apply_blocks: [ + ...reOrg.markedCanonical.blockHeaders.map(block => ({ + hash: block.index_block_hash, + index: block.block_height, + })), + { + hash: indexBlockHash, + index: blockHeight, + }, + ], + rollback_blocks: reOrg.markedNonCanonical.blockHeaders.map(block => ({ + hash: block.index_block_hash, + index: block.block_height, + })), + }, + }; + logger.debug(message, 'ChainhooksNotifier broadcasting index progress message'); + await this.redis.lpush( + process.env.CHAINHOOKS_REDIS_QUEUE || 'chainhooks:index-progress', + JSON.stringify(message) + ); + } + + async close() { + await this.redis.quit(); + } +} diff --git a/src/datastore/common.ts b/src/datastore/common.ts index 902916a42..cc7bebd0e 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -1111,6 +1111,7 @@ export interface DbPoxCycleSignerStacker { } interface ReOrgEntities { + blockHeaders: { index_block_hash: string; block_height: number }[]; blocks: number; microblocks: number; minerRewards: number; diff --git a/src/datastore/helpers.ts b/src/datastore/helpers.ts index 6965e4a9b..0c41edd94 100644 --- a/src/datastore/helpers.ts +++ b/src/datastore/helpers.ts @@ -1315,6 +1315,7 @@ export function markBlockUpdateDataAsNonCanonical(data: DataStoreBlockUpdateData export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities { return { markedCanonical: { + blockHeaders: [], blocks: 0, microblocks: 0, minerRewards: 0, @@ -1335,6 +1336,7 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities { poxCycles: 0, }, markedNonCanonical: { + blockHeaders: [], blocks: 0, microblocks: 0, minerRewards: 0, diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 6a21705f5..e8117b04b 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -94,6 +94,7 @@ import { } from '@hirosystems/api-toolkit'; import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; import { BigNumber } from 'bignumber.js'; +import { ChainhooksNotifier } from './chainhooks-notifier'; const MIGRATIONS_TABLE = 'pgmigrations'; const INSERT_BATCH_SIZE = 500; @@ -129,26 +130,31 @@ type TransactionHeader = { */ export class PgWriteStore extends PgStore { readonly isEventReplay: boolean; + protected readonly chainhooksNotifier: ChainhooksNotifier | undefined = undefined; protected isIbdBlockHeightReached = false; constructor( sql: PgSqlClient, notifier: PgNotifier | undefined = undefined, - isEventReplay: boolean = false + isEventReplay: boolean = false, + chainhooksNotifier: ChainhooksNotifier | undefined = undefined ) { super(sql, notifier); this.isEventReplay = isEventReplay; + this.chainhooksNotifier = chainhooksNotifier; } static async connect({ usageName, skipMigrations = false, withNotifier = true, + withChainhooksNotifier = false, isEventReplay = false, }: { usageName: string; skipMigrations?: boolean; withNotifier?: boolean; + withChainhooksNotifier?: boolean; isEventReplay?: boolean; }): Promise { const sql = await connectPostgres({ @@ -171,7 +177,8 @@ export class PgWriteStore extends PgStore { }); } const notifier = withNotifier ? await PgNotifier.create(usageName) : undefined; - const store = new PgWriteStore(sql, notifier, isEventReplay); + const chainhooksNotifier = withChainhooksNotifier ? new ChainhooksNotifier() : undefined; + const store = new PgWriteStore(sql, notifier, isEventReplay, chainhooksNotifier); await store.connectPgNotifier(); return store; } @@ -210,10 +217,11 @@ export class PgWriteStore extends PgStore { async update(data: DataStoreBlockUpdateData): Promise { let garbageCollectedMempoolTxs: string[] = []; let newTxData: DataStoreTxEventData[] = []; + let updatedEntities: ReOrgUpdatedEntities = newReOrgUpdatedEntities(); await this.sqlWriteTransaction(async sql => { const chainTip = await this.getChainTip(sql); - await this.handleReorg(sql, data.block, chainTip.block_height); + updatedEntities = await this.handleReorg(sql, data.block, chainTip.block_height); const isCanonical = data.block.block_height > chainTip.block_height; if (!isCanonical) { markBlockUpdateDataAsNonCanonical(data); @@ -367,6 +375,11 @@ export class PgWriteStore extends PgStore { // Send block updates but don't block current execution unless we're testing. if (isTestEnv) await this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); else void this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); + await this.chainhooksNotifier?.notify( + updatedEntities, + data.block.index_block_hash, + data.block.block_height + ); } /** @@ -3343,6 +3356,13 @@ export class PgWriteStore extends PgStore { return result; } + /** + * Recursively restore previously orphaned blocks to canonical. + * @param sql - The SQL client + * @param indexBlockHash - The index block hash that we will restore first + * @param updatedEntities - The updated entities + * @returns The updated entities + */ async restoreOrphanedChain( sql: PgSqlClient, indexBlockHash: string, @@ -3363,6 +3383,10 @@ export class PgWriteStore extends PgStore { throw new Error(`Found multiple non-canonical parents for index_hash ${indexBlockHash}`); } updatedEntities.markedCanonical.blocks++; + updatedEntities.markedCanonical.blockHeaders.unshift({ + index_block_hash: restoredBlockResult[0].index_block_hash, + block_height: restoredBlockResult[0].block_height, + }); // Orphan the now conflicting block at the same height const orphanedBlockResult = await sql` @@ -3401,6 +3425,10 @@ export class PgWriteStore extends PgStore { } updatedEntities.markedNonCanonical.blocks++; + updatedEntities.markedNonCanonical.blockHeaders.unshift({ + index_block_hash: orphanedBlockResult[0].index_block_hash, + block_height: orphanedBlockResult[0].block_height, + }); const markNonCanonicalResult = await this.markEntitiesCanonical( sql, orphanedBlockResult[0].index_block_hash, @@ -3451,6 +3479,8 @@ export class PgWriteStore extends PgStore { markCanonicalResult.txsMarkedCanonical ); updatedEntities.prunedMempoolTxs += prunedMempoolTxs.removedTxs.length; + + // Do we have a parent that is non-canonical? If so, restore it recursively. const parentResult = await sql<{ index_block_hash: string }[]>` SELECT index_block_hash FROM blocks @@ -3808,6 +3838,7 @@ export class PgWriteStore extends PgStore { if (this._debounceMempoolStat.debounce) { clearTimeout(this._debounceMempoolStat.debounce); } + await this.chainhooksNotifier?.close(); await super.close(args); } } diff --git a/src/index.ts b/src/index.ts index 0d4636f3b..89c8b26a5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -130,6 +130,7 @@ async function init(): Promise { dbWriteStore = await PgWriteStore.connect({ usageName: `write-datastore-${apiMode}`, skipMigrations: apiMode === StacksApiMode.readOnly, + withChainhooksNotifier: parseBoolean(process.env['CHAINHOOKS_NOTIFIER_ENABLED']) || false, }); registerMempoolPromStats(dbWriteStore.eventEmitter); } From 3825c5bf1a873be33a24283a68b27bf25c7ebcbf Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 6 Aug 2025 13:50:10 -0600 Subject: [PATCH 02/10] fix: tests --- tests/api/datastore.test.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/api/datastore.test.ts b/tests/api/datastore.test.ts index ef70fdd58..909a8d495 100644 --- a/tests/api/datastore.test.ts +++ b/tests/api/datastore.test.ts @@ -3772,6 +3772,12 @@ describe('postgres datastore', () => { const reorgResult = await db.handleReorg(client, block5, 0); const expectedReorgResult: ReOrgUpdatedEntities = { markedCanonical: { + blockHeaders: [ + { block_height: 1, index_block_hash: '0xaa' }, + { block_height: 2, index_block_hash: '0xbb' }, + { block_height: 3, index_block_hash: '0xcc' }, + { block_height: 4, index_block_hash: '0xdd' }, + ], blocks: 4, microblocks: 0, microblockHashes: [], @@ -3793,6 +3799,7 @@ describe('postgres datastore', () => { poxSigners: 0, }, markedNonCanonical: { + blockHeaders: [{ block_height: 3, index_block_hash: '0xccbb' }], blocks: 1, microblocks: 0, microblockHashes: [], From 7618ccf8fdb32caa0ef3beb9eb101e44f4170b7b Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 7 Aug 2025 10:14:25 -0600 Subject: [PATCH 03/10] chore: log init --- src/datastore/chainhooks-notifier.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/datastore/chainhooks-notifier.ts b/src/datastore/chainhooks-notifier.ts index 5d42d199d..7128e28e8 100644 --- a/src/datastore/chainhooks-notifier.ts +++ b/src/datastore/chainhooks-notifier.ts @@ -10,12 +10,15 @@ import { logger } from '@hirosystems/api-toolkit'; export class ChainhooksNotifier { private readonly redis: Redis; private readonly chainId: ChainID; + private readonly queue: string; constructor() { const url = process.env.CHAINHOOKS_REDIS_URL; if (!url) throw new Error(`ChainhooksNotifier CHAINHOOKS_REDIS_URL is not set`); + this.queue = process.env.CHAINHOOKS_REDIS_QUEUE || 'chainhooks:index-progress'; this.redis = new Redis(url); this.chainId = getApiConfiguredChainID(); + logger.info(`ChainhooksNotifier initialized for queue ${this.queue} on ${url}`); } /** @@ -46,11 +49,8 @@ export class ChainhooksNotifier { })), }, }; - logger.debug(message, 'ChainhooksNotifier broadcasting index progress message'); - await this.redis.lpush( - process.env.CHAINHOOKS_REDIS_QUEUE || 'chainhooks:index-progress', - JSON.stringify(message) - ); + logger.info(message, 'ChainhooksNotifier broadcasting index progress message'); + await this.redis.lpush(this.queue, JSON.stringify(message)); } async close() { From 3cd7d2e8ccf6baa8ea7f16c0a152ec9903d3855a Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 7 Aug 2025 14:03:01 -0600 Subject: [PATCH 04/10] fix: push to the right of the queue --- src/datastore/chainhooks-notifier.ts | 4 ++-- src/datastore/pg-write-store.ts | 12 +++++++----- src/index.ts | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/datastore/chainhooks-notifier.ts b/src/datastore/chainhooks-notifier.ts index 7128e28e8..f36823946 100644 --- a/src/datastore/chainhooks-notifier.ts +++ b/src/datastore/chainhooks-notifier.ts @@ -29,7 +29,7 @@ export class ChainhooksNotifier { */ async notify(reOrg: ReOrgUpdatedEntities, indexBlockHash: string, blockHeight: number) { const message = { - id: `stacks-${Date.now()}`, + id: `stacks-${blockHeight}-${indexBlockHash}-${Date.now()}`, payload: { chain: 'stacks', network: this.chainId === ChainID.Mainnet ? 'mainnet' : 'testnet', @@ -50,7 +50,7 @@ export class ChainhooksNotifier { }, }; logger.info(message, 'ChainhooksNotifier broadcasting index progress message'); - await this.redis.lpush(this.queue, JSON.stringify(message)); + await this.redis.rpush(this.queue, JSON.stringify(message)); } async close() { diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index a397f0179..e313a1cd7 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -410,11 +410,13 @@ export class PgWriteStore extends PgStore { // Send block updates but don't block current execution unless we're testing. if (isTestEnv) await this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); else void this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); - await this.chainhooksNotifier?.notify( - reorg, - data.block.index_block_hash, - data.block.block_height - ); + if (data.block.block_height >= 1) { + await this.chainhooksNotifier?.notify( + reorg, + data.block.index_block_hash, + data.block.block_height + ); + } } /** diff --git a/src/index.ts b/src/index.ts index 89c8b26a5..8c6c3598b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -130,7 +130,7 @@ async function init(): Promise { dbWriteStore = await PgWriteStore.connect({ usageName: `write-datastore-${apiMode}`, skipMigrations: apiMode === StacksApiMode.readOnly, - withChainhooksNotifier: parseBoolean(process.env['CHAINHOOKS_NOTIFIER_ENABLED']) || false, + withChainhooksNotifier: parseBoolean(process.env['CHAINHOOKS_NOTIFIER_ENABLED']) ?? false, }); registerMempoolPromStats(dbWriteStore.eventEmitter); } From 86950240346a637e7118f2a644dca1f55a054558 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 7 Aug 2025 14:48:07 -0600 Subject: [PATCH 05/10] fix: only is canonical --- src/datastore/pg-write-store.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index e313a1cd7..a3ca32bb2 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -237,11 +237,12 @@ export class PgWriteStore extends PgStore { let garbageCollectedMempoolTxs: string[] = []; let newTxData: DataStoreTxEventData[] = []; let reorg: ReOrgUpdatedEntities = newReOrgUpdatedEntities(); + let isCanonical = true; await this.sqlWriteTransaction(async sql => { const chainTip = await this.getChainTip(sql); reorg = await this.handleReorg(sql, data.block, chainTip.block_height); - const isCanonical = data.block.block_height > chainTip.block_height; + isCanonical = data.block.block_height > chainTip.block_height; if (!isCanonical) { markBlockUpdateDataAsNonCanonical(data); } else { @@ -410,7 +411,7 @@ export class PgWriteStore extends PgStore { // Send block updates but don't block current execution unless we're testing. if (isTestEnv) await this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); else void this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); - if (data.block.block_height >= 1) { + if (isCanonical) { await this.chainhooksNotifier?.notify( reorg, data.block.index_block_hash, From fe7e6ac9f1c2d0885a6f387ac150e1e6de61ecff Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 7 Aug 2025 17:36:52 -0600 Subject: [PATCH 06/10] test: redis broadcast --- src/datastore/chainhooks-notifier.ts | 16 ++-- tests/api/chainhooks-notifier.test.ts | 126 ++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 7 deletions(-) create mode 100644 tests/api/chainhooks-notifier.test.ts diff --git a/src/datastore/chainhooks-notifier.ts b/src/datastore/chainhooks-notifier.ts index f36823946..20f76fde3 100644 --- a/src/datastore/chainhooks-notifier.ts +++ b/src/datastore/chainhooks-notifier.ts @@ -5,7 +5,9 @@ import { getApiConfiguredChainID } from '../helpers'; import { logger } from '@hirosystems/api-toolkit'; /** - * Notifies Chainhooks of the progress of the Stacks index. + * Notifies Chainhooks of the progress of the Stacks index via a message sent to a Redis queue. This + * message will contain a block header for each new canonical block as well as headers for those + * that need to be rolled back from a re-org. */ export class ChainhooksNotifier { private readonly redis: Redis; @@ -14,8 +16,8 @@ export class ChainhooksNotifier { constructor() { const url = process.env.CHAINHOOKS_REDIS_URL; - if (!url) throw new Error(`ChainhooksNotifier CHAINHOOKS_REDIS_URL is not set`); - this.queue = process.env.CHAINHOOKS_REDIS_QUEUE || 'chainhooks:index-progress'; + if (!url) throw new Error(`ChainhooksNotifier is enabled but CHAINHOOKS_REDIS_URL is not set`); + this.queue = process.env.CHAINHOOKS_REDIS_QUEUE ?? 'chainhooks:stacks:index-progress'; this.redis = new Redis(url); this.chainId = getApiConfiguredChainID(); logger.info(`ChainhooksNotifier initialized for queue ${this.queue} on ${url}`); @@ -23,9 +25,9 @@ export class ChainhooksNotifier { /** * Broadcast index progress message to Chainhooks Redis queue. - * @param reOrg - The re-org updated entities - * @param indexBlockHash - The index block hash that we will restore first - * @param blockHeight - The block height that we will restore first + * @param reOrg - The re-org updated entities, if any + * @param indexBlockHash - Block hash of the newest canonical block + * @param blockHeight - Block height of the newest canonical block */ async notify(reOrg: ReOrgUpdatedEntities, indexBlockHash: string, blockHeight: number) { const message = { @@ -49,7 +51,7 @@ export class ChainhooksNotifier { })), }, }; - logger.info(message, 'ChainhooksNotifier broadcasting index progress message'); + logger.debug(message, 'ChainhooksNotifier broadcasting index progress message'); await this.redis.rpush(this.queue, JSON.stringify(message)); } diff --git a/tests/api/chainhooks-notifier.test.ts b/tests/api/chainhooks-notifier.test.ts new file mode 100644 index 000000000..0ecebe046 --- /dev/null +++ b/tests/api/chainhooks-notifier.test.ts @@ -0,0 +1,126 @@ +const messages: string[] = []; + +// Mock needs to handle both default and named exports +jest.mock('ioredis', () => { + const redisMock = jest.fn().mockImplementation(() => ({ + rpush: jest.fn((_, message) => { + messages.push(message); + }), + quit: jest.fn().mockResolvedValue(undefined), + })); + // Handle both default and named exports + const mock = redisMock as unknown as { default: typeof redisMock }; + mock.default = redisMock; + return mock; +}); + +import { migrate } from '../utils/test-helpers'; +import { PgWriteStore } from '../../src/datastore/pg-write-store'; +import { TestBlockBuilder } from '../utils/test-builders'; + +describe('chainhooks notifier', () => { + let db: PgWriteStore; + + beforeEach(async () => { + process.env.CHAINHOOKS_NOTIFIER_ENABLED = '1'; + process.env.CHAINHOOKS_REDIS_URL = 'redis://localhost:6379'; + process.env.CHAINHOOKS_REDIS_QUEUE = 'test-queue'; + db = await PgWriteStore.connect({ + usageName: 'tests', + withNotifier: false, + withChainhooksNotifier: true, + skipMigrations: true, + }); + await migrate('up'); + messages.length = 0; // Clear messages array before each test + }); + + afterEach(async () => { + await db.close(); + await migrate('down'); + }); + + test('updates chainhooks', async () => { + const block1 = new TestBlockBuilder({ + block_height: 1, + block_hash: '0x1234', + index_block_hash: '0x1234', + }).build(); + await db.update(block1); + + expect(messages.length).toBe(1); + expect(JSON.parse(messages[0]).payload).toEqual({ + chain: 'stacks', + network: 'mainnet', + apply_blocks: [{ hash: '0x1234', index: 1 }], + rollback_blocks: [], + }); + }); + + test('updates chainhooks with re-orgs', async () => { + await db.update( + new TestBlockBuilder({ + block_height: 1, + block_hash: '0x1234', + index_block_hash: '0x1234', + }).build() + ); + expect(messages.length).toBe(1); + expect(JSON.parse(messages[0]).payload).toEqual({ + chain: 'stacks', + network: 'mainnet', + apply_blocks: [{ hash: '0x1234', index: 1 }], + rollback_blocks: [], + }); + messages.length = 0; + + await db.update( + new TestBlockBuilder({ + block_height: 2, + block_hash: '0x1235', + index_block_hash: '0x1235', + parent_index_block_hash: '0x1234', + }).build() + ); + expect(messages.length).toBe(1); + expect(JSON.parse(messages[0]).payload).toEqual({ + chain: 'stacks', + network: 'mainnet', + apply_blocks: [{ hash: '0x1235', index: 2 }], + rollback_blocks: [], + }); + messages.length = 0; + + // Re-org block 2, should not send a message because this block is not canonical + await db.update( + new TestBlockBuilder({ + block_height: 2, + block_hash: '0x1235aa', + index_block_hash: '0x1235aa', + parent_index_block_hash: '0x1234', + }).build() + ); + expect(messages.length).toBe(0); + messages.length = 0; + + // Advance the non-canoincal chain, original block 2 should be sent as a rollback block + await db.update( + new TestBlockBuilder({ + block_height: 3, + block_hash: '0x1236', + index_block_hash: '0x1236', + parent_index_block_hash: '0x1235aa', + }).build() + ); + expect(messages.length).toBe(1); + expect(JSON.parse(messages[0]).payload).toEqual({ + chain: 'stacks', + network: 'mainnet', + apply_blocks: [ + { hash: '0x1235aa', index: 2 }, + { hash: '0x1236', index: 3 }, + ], + rollback_blocks: [{ hash: '0x1235', index: 2 }], + }); + }); +}); From e397205d9171b8e0d4a773525a6c7c0698d9a3fd Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 20 Aug 2025 11:17:10 -0600 Subject: [PATCH 07/10] fix: rename to redis notifier --- .env | 19 ++- src/datastore/chainhooks-notifier.ts | 61 --------- src/datastore/pg-write-store.ts | 28 ++--- src/datastore/redis-notifier.ts | 116 ++++++++++++++++++ ...otifier.test.ts => redis-notifier.test.ts} | 32 +++-- 5 files changed, 156 insertions(+), 100 deletions(-) delete mode 100644 src/datastore/chainhooks-notifier.ts create mode 100644 src/datastore/redis-notifier.ts rename tests/api/{chainhooks-notifier.test.ts => redis-notifier.test.ts} (78%) diff --git a/.env b/.env index 9d9b9882b..5ea7f421d 100644 --- a/.env +++ b/.env @@ -217,8 +217,17 @@ STACKS_EVENTS_DIR=./events # Only specify `SNP_REDIS_STREAM_KEY_PREFIX` if `REDIS_STREAM_KEY_PREFIX` is configured on the SNP server. # SNP_REDIS_STREAM_KEY_PREFIX= -# If enabled this service will notify Chainhooks via Redis whenever the Stacks index advances e.g. -# whenever a new block is confirmed. This is used by Chainhooks to send block data to its subscribers. -# CHAINHOOKS_NOTIFIER_ENABLED=true -# CHAINHOOKS_REDIS_URL=redis://127.0.0.1:6379 -# CHAINHOOKS_REDIS_QUEUE=index-progress +# If enabled this service will notify Redis whenever the Stacks index advances i.e. whenever a new block is confirmed. +# High Availability Redis is supported via Sentinels, Cluster or a simple Redis connection URL. +# REDIS_NOTIFIER_ENABLED=1 +# REDIS_QUEUE=index-progress +# REDIS_URL=127.0.0.1:6379 +# REDIS_SENTINELS= +# REDIS_SENTINEL_MASTER= +# REDIS_SENTINEL_PASSWORD= +# REDIS_SENTINEL_AUTH_PASSWORD= +# REDIS_CLUSTER_NODES= +# REDIS_CLUSTER_PASSWORD= +# REDIS_CONNECTION_TIMEOUT=10000 +# REDIS_COMMAND_TIMEOUT=5000 +# REDIS_MAX_RETRIES=20 diff --git a/src/datastore/chainhooks-notifier.ts b/src/datastore/chainhooks-notifier.ts deleted file mode 100644 index 20f76fde3..000000000 --- a/src/datastore/chainhooks-notifier.ts +++ /dev/null @@ -1,61 +0,0 @@ -import Redis from 'ioredis'; -import { ReOrgUpdatedEntities } from './common'; -import { ChainID } from '@stacks/transactions'; -import { getApiConfiguredChainID } from '../helpers'; -import { logger } from '@hirosystems/api-toolkit'; - -/** - * Notifies Chainhooks of the progress of the Stacks index via a message sent to a Redis queue. This - * message will contain a block header for each new canonical block as well as headers for those - * that need to be rolled back from a re-org. - */ -export class ChainhooksNotifier { - private readonly redis: Redis; - private readonly chainId: ChainID; - private readonly queue: string; - - constructor() { - const url = process.env.CHAINHOOKS_REDIS_URL; - if (!url) throw new Error(`ChainhooksNotifier is enabled but CHAINHOOKS_REDIS_URL is not set`); - this.queue = process.env.CHAINHOOKS_REDIS_QUEUE ?? 'chainhooks:stacks:index-progress'; - this.redis = new Redis(url); - this.chainId = getApiConfiguredChainID(); - logger.info(`ChainhooksNotifier initialized for queue ${this.queue} on ${url}`); - } - - /** - * Broadcast index progress message to Chainhooks Redis queue. - * @param reOrg - The re-org updated entities, if any - * @param indexBlockHash - Block hash of the newest canonical block - * @param blockHeight - Block height of the newest canonical block - */ - async notify(reOrg: ReOrgUpdatedEntities, indexBlockHash: string, blockHeight: number) { - const message = { - id: `stacks-${blockHeight}-${indexBlockHash}-${Date.now()}`, - payload: { - chain: 'stacks', - network: this.chainId === ChainID.Mainnet ? 'mainnet' : 'testnet', - apply_blocks: [ - ...reOrg.markedCanonical.blockHeaders.map(block => ({ - hash: block.index_block_hash, - index: block.block_height, - })), - { - hash: indexBlockHash, - index: blockHeight, - }, - ], - rollback_blocks: reOrg.markedNonCanonical.blockHeaders.map(block => ({ - hash: block.index_block_hash, - index: block.block_height, - })), - }, - }; - logger.debug(message, 'ChainhooksNotifier broadcasting index progress message'); - await this.redis.rpush(this.queue, JSON.stringify(message)); - } - - async close() { - await this.redis.quit(); - } -} diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index a3ca32bb2..f95ee8be0 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -95,7 +95,7 @@ import { } from '@hirosystems/api-toolkit'; import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; import { BigNumber } from 'bignumber.js'; -import { ChainhooksNotifier } from './chainhooks-notifier'; +import { RedisNotifier } from './redis-notifier'; const MIGRATIONS_TABLE = 'pgmigrations'; const INSERT_BATCH_SIZE = 500; @@ -131,7 +131,7 @@ type TransactionHeader = { */ export class PgWriteStore extends PgStore { readonly isEventReplay: boolean; - protected readonly chainhooksNotifier: ChainhooksNotifier | undefined = undefined; + protected readonly redisNotifier: RedisNotifier | undefined = undefined; protected isIbdBlockHeightReached = false; private metrics: | { @@ -144,11 +144,11 @@ export class PgWriteStore extends PgStore { sql: PgSqlClient, notifier: PgNotifier | undefined = undefined, isEventReplay: boolean = false, - chainhooksNotifier: ChainhooksNotifier | undefined = undefined + redisNotifier: RedisNotifier | undefined = undefined ) { super(sql, notifier); this.isEventReplay = isEventReplay; - this.chainhooksNotifier = chainhooksNotifier; + this.redisNotifier = redisNotifier; if (isProdEnv) { this.metrics = { blockHeight: new prom.Gauge({ @@ -167,13 +167,13 @@ export class PgWriteStore extends PgStore { usageName, skipMigrations = false, withNotifier = true, - withChainhooksNotifier = false, + withRedisNotifier = false, isEventReplay = false, }: { usageName: string; skipMigrations?: boolean; withNotifier?: boolean; - withChainhooksNotifier?: boolean; + withRedisNotifier?: boolean; isEventReplay?: boolean; }): Promise { const sql = await connectPostgres({ @@ -196,8 +196,8 @@ export class PgWriteStore extends PgStore { }); } const notifier = withNotifier ? await PgNotifier.create(usageName) : undefined; - const chainhooksNotifier = withChainhooksNotifier ? new ChainhooksNotifier() : undefined; - const store = new PgWriteStore(sql, notifier, isEventReplay, chainhooksNotifier); + const redisNotifier = withRedisNotifier ? new RedisNotifier() : undefined; + const store = new PgWriteStore(sql, notifier, isEventReplay, redisNotifier); await store.connectPgNotifier(); return store; } @@ -405,19 +405,15 @@ export class PgWriteStore extends PgStore { } } }); + if (isCanonical) { + await this.redisNotifier?.notify(reorg, data.block.index_block_hash, data.block.block_height); + } // Do we have an IBD height defined in ENV? If so, check if this block update reached it. const ibdHeight = getIbdBlockHeight(); this.isIbdBlockHeightReached = ibdHeight ? data.block.block_height > ibdHeight : true; // Send block updates but don't block current execution unless we're testing. if (isTestEnv) await this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); else void this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); - if (isCanonical) { - await this.chainhooksNotifier?.notify( - reorg, - data.block.index_block_hash, - data.block.block_height - ); - } } /** @@ -4052,7 +4048,7 @@ export class PgWriteStore extends PgStore { if (this._debounceMempoolStat.debounce) { clearTimeout(this._debounceMempoolStat.debounce); } - await this.chainhooksNotifier?.close(); + await this.redisNotifier?.close(); await super.close(args); } } diff --git a/src/datastore/redis-notifier.ts b/src/datastore/redis-notifier.ts new file mode 100644 index 000000000..be8323b43 --- /dev/null +++ b/src/datastore/redis-notifier.ts @@ -0,0 +1,116 @@ +import Redis, { Cluster, RedisOptions } from 'ioredis'; +import { ReOrgUpdatedEntities } from './common'; +import { ChainID } from '@stacks/transactions'; +import { getApiConfiguredChainID } from '../helpers'; +import { logger } from '@hirosystems/api-toolkit'; + +/** + * Notifies Chainhooks of the progress of the Stacks index via a message sent to a Redis queue. This + * message will contain a block header for each new canonical block as well as headers for those + * that need to be rolled back from a re-org. + */ +export class RedisNotifier { + private readonly redis: Redis | Cluster; + private readonly chainId: ChainID; + private readonly queue: string; + + constructor() { + this.redis = this.newRedisConnection(); + this.chainId = getApiConfiguredChainID(); + this.queue = process.env.REDIS_QUEUE ?? 'index-progress'; + logger.info(`RedisNotifier initialized for queue ${this.queue}`); + } + + /** + * Broadcast index progress message to the Redis queue. + * @param reOrg - The re-org updated entities, if any + * @param indexBlockHash - Block hash of the newest canonical block + * @param blockHeight - Block height of the newest canonical block + */ + async notify(reOrg: ReOrgUpdatedEntities, indexBlockHash: string, blockHeight: number) { + const message = { + id: `stacks-${blockHeight}-${indexBlockHash}-${Date.now()}`, + payload: { + chain: 'stacks', + network: this.chainId === ChainID.Mainnet ? 'mainnet' : 'testnet', + apply_blocks: [ + ...reOrg.markedCanonical.blockHeaders.map(block => ({ + hash: block.index_block_hash, + index: block.block_height, + })), + { + hash: indexBlockHash, + index: blockHeight, + }, + ], + rollback_blocks: reOrg.markedNonCanonical.blockHeaders.map(block => ({ + hash: block.index_block_hash, + index: block.block_height, + })), + }, + }; + logger.debug(message, 'RedisNotifier broadcasting index progress message'); + await this.redis.rpush(this.queue, JSON.stringify(message)); + } + + async close() { + await this.redis.quit(); + } + + /** + * Create a new Redis connection based on the environment variables. This will auto-select a + * single connection, cluster or sentinel. + */ + private newRedisConnection(): Redis | Cluster { + const baseOptions: RedisOptions = { + retryStrategy: times => Math.min(times * 50, 2000), + maxRetriesPerRequest: parseInt(process.env.REDIS_MAX_RETRIES ?? '20'), + connectTimeout: parseInt(process.env.REDIS_CONNECTION_TIMEOUT ?? '10000'), + commandTimeout: parseInt(process.env.REDIS_COMMAND_TIMEOUT ?? '5000'), + lazyConnect: true, + }; + + // Single Redis instance with URL + if (process.env.REDIS_URL) { + logger.info(`RedisNotifier connecting to redis at ${process.env.REDIS_URL}`); + return new Redis(process.env.REDIS_URL, baseOptions); + } + + // Redis Cluster configuration + if (process.env.REDIS_CLUSTER_NODES) { + const clusterNodes = process.env.REDIS_CLUSTER_NODES.split(','); + logger.info( + `RedisNotifier connecting to redis cluster at ${process.env.REDIS_CLUSTER_NODES}` + ); + return new Redis.Cluster(clusterNodes, { + ...baseOptions, + redisOptions: { + ...baseOptions, + password: process.env.REDIS_CLUSTER_PASSWORD, + }, + clusterRetryStrategy: times => Math.min(times * 50, 2000), + }); + } + + // Redis Sentinel configuration + if (process.env.REDIS_SENTINELS) { + const sentinels = process.env.REDIS_SENTINELS.split(','); + logger.info(`RedisNotifier connecting to redis sentinel at ${process.env.REDIS_SENTINELS}`); + return new Redis({ + ...baseOptions, + sentinels: sentinels.map(sentinel => { + const [host, port] = sentinel.split(':'); + return { host, port: parseInt(port) }; + }), + name: process.env.REDIS_SENTINEL_MASTER, + password: process.env.REDIS_SENTINEL_PASSWORD, + sentinelPassword: process.env.REDIS_SENTINEL_AUTH_PASSWORD, + sentinelRetryStrategy: times => Math.min(times * 50, 2000), + }); + } + + throw new Error( + 'Redis configuration required. Please set REDIS_URL, REDIS_SENTINELS, or REDIS_CLUSTER_NODES' + ); + } +} diff --git a/tests/api/chainhooks-notifier.test.ts b/tests/api/redis-notifier.test.ts similarity index 78% rename from tests/api/chainhooks-notifier.test.ts rename to tests/api/redis-notifier.test.ts index 0ecebe046..0236574ba 100644 --- a/tests/api/chainhooks-notifier.test.ts +++ b/tests/api/redis-notifier.test.ts @@ -1,6 +1,6 @@ const messages: string[] = []; -// Mock needs to handle both default and named exports +// Mock Redis to capture messages jest.mock('ioredis', () => { const redisMock = jest.fn().mockImplementation(() => ({ rpush: jest.fn((_, message) => { @@ -8,7 +8,6 @@ jest.mock('ioredis', () => { }), quit: jest.fn().mockResolvedValue(undefined), })); - // Handle both default and named exports const mock = redisMock as unknown as { default: typeof redisMock }; mock.default = redisMock; return mock; @@ -18,21 +17,21 @@ import { migrate } from '../utils/test-helpers'; import { PgWriteStore } from '../../src/datastore/pg-write-store'; import { TestBlockBuilder } from '../utils/test-builders'; -describe('chainhooks notifier', () => { +describe('redis notifier', () => { let db: PgWriteStore; beforeEach(async () => { - process.env.CHAINHOOKS_NOTIFIER_ENABLED = '1'; - process.env.CHAINHOOKS_REDIS_URL = 'redis://localhost:6379'; - process.env.CHAINHOOKS_REDIS_QUEUE = 'test-queue'; + process.env.REDIS_NOTIFIER_ENABLED = '1'; + process.env.REDIS_URL = 'localhost:6379'; + process.env.REDIS_QUEUE = 'test-queue'; db = await PgWriteStore.connect({ usageName: 'tests', withNotifier: false, - withChainhooksNotifier: true, + withRedisNotifier: true, skipMigrations: true, }); await migrate('up'); - messages.length = 0; // Clear messages array before each test + messages.length = 0; }); afterEach(async () => { @@ -40,7 +39,7 @@ describe('chainhooks notifier', () => { await migrate('down'); }); - test('updates chainhooks', async () => { + test('updates redis', async () => { const block1 = new TestBlockBuilder({ block_height: 1, block_hash: '0x1234', @@ -57,7 +56,7 @@ describe('chainhooks notifier', () => { }); }); - test('updates chainhooks with re-orgs', async () => { + test('updates redis with re-orgs', async () => { await db.update( new TestBlockBuilder({ block_height: 1, @@ -72,7 +71,6 @@ describe('chainhooks notifier', () => { apply_blocks: [{ hash: '0x1234', index: 1 }], rollback_blocks: [], }); - messages.length = 0; await db.update( new TestBlockBuilder({ @@ -82,14 +80,13 @@ describe('chainhooks notifier', () => { parent_index_block_hash: '0x1234', }).build() ); - expect(messages.length).toBe(1); - expect(JSON.parse(messages[0]).payload).toEqual({ + expect(messages.length).toBe(2); + expect(JSON.parse(messages[1]).payload).toEqual({ chain: 'stacks', network: 'mainnet', apply_blocks: [{ hash: '0x1235', index: 2 }], rollback_blocks: [], }); - messages.length = 0; // Re-org block 2, should not send a message because this block is not canonical await db.update( @@ -100,8 +97,7 @@ describe('chainhooks notifier', () => { parent_index_block_hash: '0x1234', }).build() ); - expect(messages.length).toBe(0); - messages.length = 0; + expect(messages.length).toBe(2); // Advance the non-canoincal chain, original block 2 should be sent as a rollback block await db.update( @@ -112,8 +108,8 @@ describe('chainhooks notifier', () => { parent_index_block_hash: '0x1235aa', }).build() ); - expect(messages.length).toBe(1); - expect(JSON.parse(messages[0]).payload).toEqual({ + expect(messages.length).toBe(3); + expect(JSON.parse(messages[2]).payload).toEqual({ chain: 'stacks', network: 'mainnet', apply_blocks: [ From 8aeaa19b5b6d1203c977d51d1cdc62c5026e6a7d Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 20 Aug 2025 13:56:10 -0600 Subject: [PATCH 08/10] fix: build --- src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 8c6c3598b..5cfc5293e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -130,7 +130,7 @@ async function init(): Promise { dbWriteStore = await PgWriteStore.connect({ usageName: `write-datastore-${apiMode}`, skipMigrations: apiMode === StacksApiMode.readOnly, - withChainhooksNotifier: parseBoolean(process.env['CHAINHOOKS_NOTIFIER_ENABLED']) ?? false, + withRedisNotifier: parseBoolean(process.env['REDIS_NOTIFIER_ENABLED']) ?? false, }); registerMempoolPromStats(dbWriteStore.eventEmitter); } From ed35d26d724898c1a4222315a9d33502ef7af6d1 Mon Sep 17 00:00:00 2001 From: CharlieC3 <2747302+CharlieC3@users.noreply.github.com> Date: Thu, 28 Aug 2025 09:55:49 -0400 Subject: [PATCH 09/10] feat: add support for SRV records in cluster --- src/datastore/redis-notifier.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/datastore/redis-notifier.ts b/src/datastore/redis-notifier.ts index be8323b43..ec09ba111 100644 --- a/src/datastore/redis-notifier.ts +++ b/src/datastore/redis-notifier.ts @@ -77,17 +77,22 @@ export class RedisNotifier { } // Redis Cluster configuration - if (process.env.REDIS_CLUSTER_NODES) { - const clusterNodes = process.env.REDIS_CLUSTER_NODES.split(','); + if (process.env.REDIS_CLUSTER_NODES && process.env.REDIS_CLUSTER_NODES.length > 0) { + let isSRVRecord = false; + const clusterNodesArray = process.env.REDIS_CLUSTER_NODES.split(','); + if (clusterNodesArray.length === 1) { + isSRVRecord = true; + } logger.info( `RedisNotifier connecting to redis cluster at ${process.env.REDIS_CLUSTER_NODES}` ); - return new Redis.Cluster(clusterNodes, { + return new Redis.Cluster(clusterNodesArray, { ...baseOptions, redisOptions: { ...baseOptions, password: process.env.REDIS_CLUSTER_PASSWORD, }, + useSRVRecords: isSRVRecord, clusterRetryStrategy: times => Math.min(times * 50, 2000), }); } From 5d5f2c83837db02119240b0adf3328d8f24126ab Mon Sep 17 00:00:00 2001 From: CharlieC3 <2747302+CharlieC3@users.noreply.github.com> Date: Thu, 28 Aug 2025 10:21:08 -0400 Subject: [PATCH 10/10] chore: update default REDIS_QUEUE value to match deployment --- src/datastore/redis-notifier.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datastore/redis-notifier.ts b/src/datastore/redis-notifier.ts index ec09ba111..a34f06789 100644 --- a/src/datastore/redis-notifier.ts +++ b/src/datastore/redis-notifier.ts @@ -17,7 +17,7 @@ export class RedisNotifier { constructor() { this.redis = this.newRedisConnection(); this.chainId = getApiConfiguredChainID(); - this.queue = process.env.REDIS_QUEUE ?? 'index-progress'; + this.queue = process.env.REDIS_QUEUE ?? 'chainhooks:stacks:index-progress'; logger.info(`RedisNotifier initialized for queue ${this.queue}`); }