Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,18 @@ STACKS_EVENTS_DIR=./events
# SNP_REDIS_URL=redis://127.0.0.1:6379
# Only specify `SNP_REDIS_STREAM_KEY_PREFIX` if `REDIS_STREAM_KEY_PREFIX` is configured on the SNP server.
# SNP_REDIS_STREAM_KEY_PREFIX=

# If enabled this service will notify Redis whenever the Stacks index advances i.e. whenever a new block is confirmed.
# High Availability Redis is supported via Sentinels, Cluster or a simple Redis connection URL.
# REDIS_NOTIFIER_ENABLED=1
# REDIS_QUEUE=index-progress
# REDIS_URL=127.0.0.1:6379
# REDIS_SENTINELS=
# REDIS_SENTINEL_MASTER=
# REDIS_SENTINEL_PASSWORD=
# REDIS_SENTINEL_AUTH_PASSWORD=
# REDIS_CLUSTER_NODES=
# REDIS_CLUSTER_PASSWORD=
# REDIS_CONNECTION_TIMEOUT=10000
# REDIS_COMMAND_TIMEOUT=5000
# REDIS_MAX_RETRIES=20
79 changes: 79 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
"fastify": "4.29.1",
"fastify-metrics": "11.0.0",
"getopts": "2.3.0",
"ioredis": "5.6.1",
"jsonc-parser": "3.0.0",
"jsonrpc-lite": "2.2.0",
"lru-cache": "6.0.0",
Expand Down
1 change: 1 addition & 0 deletions src/datastore/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ export interface DbPoxCycleSignerStacker {
}

interface ReOrgEntities {
blockHeaders: { index_block_hash: string; block_height: number }[];
blocks: number;
microblockHashes: string[];
microblocks: number;
Expand Down
2 changes: 2 additions & 0 deletions src/datastore/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ export function markBlockUpdateDataAsNonCanonical(data: DataStoreBlockUpdateData
export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
return {
markedCanonical: {
blockHeaders: [],
blocks: 0,
microblockHashes: [],
microblocks: 0,
Expand All @@ -1336,6 +1337,7 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
poxCycles: 0,
},
markedNonCanonical: {
blockHeaders: [],
blocks: 0,
microblockHashes: [],
microblocks: 0,
Expand Down
38 changes: 34 additions & 4 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import {
} from '@hirosystems/api-toolkit';
import { PgServer, getConnectionArgs, getConnectionConfig } from './connection';
import { BigNumber } from 'bignumber.js';
import { RedisNotifier } from './redis-notifier';

const MIGRATIONS_TABLE = 'pgmigrations';
const INSERT_BATCH_SIZE = 500;
Expand Down Expand Up @@ -130,6 +131,7 @@ type TransactionHeader = {
*/
export class PgWriteStore extends PgStore {
readonly isEventReplay: boolean;
protected readonly redisNotifier: RedisNotifier | undefined = undefined;
protected isIbdBlockHeightReached = false;
private metrics:
| {
Expand All @@ -141,10 +143,12 @@ export class PgWriteStore extends PgStore {
constructor(
sql: PgSqlClient,
notifier: PgNotifier | undefined = undefined,
isEventReplay: boolean = false
isEventReplay: boolean = false,
redisNotifier: RedisNotifier | undefined = undefined
) {
super(sql, notifier);
this.isEventReplay = isEventReplay;
this.redisNotifier = redisNotifier;
if (isProdEnv) {
this.metrics = {
blockHeight: new prom.Gauge({
Expand All @@ -163,11 +167,13 @@ export class PgWriteStore extends PgStore {
usageName,
skipMigrations = false,
withNotifier = true,
withRedisNotifier = false,
isEventReplay = false,
}: {
usageName: string;
skipMigrations?: boolean;
withNotifier?: boolean;
withRedisNotifier?: boolean;
isEventReplay?: boolean;
}): Promise<PgWriteStore> {
const sql = await connectPostgres({
Expand All @@ -190,7 +196,8 @@ export class PgWriteStore extends PgStore {
});
}
const notifier = withNotifier ? await PgNotifier.create(usageName) : undefined;
const store = new PgWriteStore(sql, notifier, isEventReplay);
const redisNotifier = withRedisNotifier ? new RedisNotifier() : undefined;
const store = new PgWriteStore(sql, notifier, isEventReplay, redisNotifier);
await store.connectPgNotifier();
return store;
}
Expand Down Expand Up @@ -229,11 +236,13 @@ export class PgWriteStore extends PgStore {
async update(data: DataStoreBlockUpdateData): Promise<void> {
let garbageCollectedMempoolTxs: string[] = [];
let newTxData: DataStoreTxEventData[] = [];
let reorg: ReOrgUpdatedEntities = newReOrgUpdatedEntities();
let isCanonical = true;

await this.sqlWriteTransaction(async sql => {
const chainTip = await this.getChainTip(sql);
const reorg = await this.handleReorg(sql, data.block, chainTip.block_height);
const isCanonical = data.block.block_height > chainTip.block_height;
reorg = await this.handleReorg(sql, data.block, chainTip.block_height);
isCanonical = data.block.block_height > chainTip.block_height;
if (!isCanonical) {
markBlockUpdateDataAsNonCanonical(data);
} else {
Expand Down Expand Up @@ -396,6 +405,9 @@ export class PgWriteStore extends PgStore {
}
}
});
if (isCanonical) {
await this.redisNotifier?.notify(reorg, data.block.index_block_hash, data.block.block_height);
}
// Do we have an IBD height defined in ENV? If so, check if this block update reached it.
const ibdHeight = getIbdBlockHeight();
this.isIbdBlockHeightReached = ibdHeight ? data.block.block_height > ibdHeight : true;
Expand Down Expand Up @@ -3548,6 +3560,13 @@ export class PgWriteStore extends PgStore {
return result;
}

/**
* Recursively restore previously orphaned blocks to canonical.
* @param sql - The SQL client
* @param indexBlockHash - The index block hash that we will restore first
* @param updatedEntities - The updated entities
* @returns The updated entities
*/
async restoreOrphanedChain(
sql: PgSqlClient,
indexBlockHash: string,
Expand All @@ -3568,6 +3587,10 @@ export class PgWriteStore extends PgStore {
throw new Error(`Found multiple non-canonical parents for index_hash ${indexBlockHash}`);
}
updatedEntities.markedCanonical.blocks++;
updatedEntities.markedCanonical.blockHeaders.unshift({
index_block_hash: restoredBlockResult[0].index_block_hash,
block_height: restoredBlockResult[0].block_height,
});

// Orphan the now conflicting block at the same height
const orphanedBlockResult = await sql<BlockQueryResult[]>`
Expand Down Expand Up @@ -3606,6 +3629,10 @@ export class PgWriteStore extends PgStore {
}

updatedEntities.markedNonCanonical.blocks++;
updatedEntities.markedNonCanonical.blockHeaders.unshift({
index_block_hash: orphanedBlockResult[0].index_block_hash,
block_height: orphanedBlockResult[0].block_height,
});
const markNonCanonicalResult = await this.markEntitiesCanonical(
sql,
orphanedBlockResult[0].index_block_hash,
Expand Down Expand Up @@ -3662,6 +3689,8 @@ export class PgWriteStore extends PgStore {
markCanonicalResult.txsMarkedCanonical
);
updatedEntities.prunedMempoolTxs += prunedMempoolTxs.removedTxs.length;

// Do we have a parent that is non-canonical? If so, restore it recursively.
const parentResult = await sql<{ index_block_hash: string }[]>`
SELECT index_block_hash
FROM blocks
Expand Down Expand Up @@ -4019,6 +4048,7 @@ export class PgWriteStore extends PgStore {
if (this._debounceMempoolStat.debounce) {
clearTimeout(this._debounceMempoolStat.debounce);
}
await this.redisNotifier?.close();
await super.close(args);
}
}
Loading