@@ -95,6 +95,7 @@ import {
95
95
} from '@hirosystems/api-toolkit' ;
96
96
import { PgServer , getConnectionArgs , getConnectionConfig } from './connection' ;
97
97
import { BigNumber } from 'bignumber.js' ;
98
+ import { RedisNotifier } from './redis-notifier' ;
98
99
99
100
const MIGRATIONS_TABLE = 'pgmigrations' ;
100
101
const INSERT_BATCH_SIZE = 500 ;
@@ -130,6 +131,7 @@ type TransactionHeader = {
130
131
*/
131
132
export class PgWriteStore extends PgStore {
132
133
readonly isEventReplay : boolean ;
134
+ protected readonly redisNotifier : RedisNotifier | undefined = undefined ;
133
135
protected isIbdBlockHeightReached = false ;
134
136
private metrics :
135
137
| {
@@ -141,10 +143,12 @@ export class PgWriteStore extends PgStore {
141
143
constructor (
142
144
sql : PgSqlClient ,
143
145
notifier : PgNotifier | undefined = undefined ,
144
- isEventReplay : boolean = false
146
+ isEventReplay : boolean = false ,
147
+ redisNotifier : RedisNotifier | undefined = undefined
145
148
) {
146
149
super ( sql , notifier ) ;
147
150
this . isEventReplay = isEventReplay ;
151
+ this . redisNotifier = redisNotifier ;
148
152
if ( isProdEnv ) {
149
153
this . metrics = {
150
154
blockHeight : new prom . Gauge ( {
@@ -163,11 +167,13 @@ export class PgWriteStore extends PgStore {
163
167
usageName,
164
168
skipMigrations = false ,
165
169
withNotifier = true ,
170
+ withRedisNotifier = false ,
166
171
isEventReplay = false ,
167
172
} : {
168
173
usageName : string ;
169
174
skipMigrations ?: boolean ;
170
175
withNotifier ?: boolean ;
176
+ withRedisNotifier ?: boolean ;
171
177
isEventReplay ?: boolean ;
172
178
} ) : Promise < PgWriteStore > {
173
179
const sql = await connectPostgres ( {
@@ -190,7 +196,8 @@ export class PgWriteStore extends PgStore {
190
196
} ) ;
191
197
}
192
198
const notifier = withNotifier ? await PgNotifier . create ( usageName ) : undefined ;
193
- const store = new PgWriteStore ( sql , notifier , isEventReplay ) ;
199
+ const redisNotifier = withRedisNotifier ? new RedisNotifier ( ) : undefined ;
200
+ const store = new PgWriteStore ( sql , notifier , isEventReplay , redisNotifier ) ;
194
201
await store . connectPgNotifier ( ) ;
195
202
return store ;
196
203
}
@@ -229,11 +236,13 @@ export class PgWriteStore extends PgStore {
229
236
async update ( data : DataStoreBlockUpdateData ) : Promise < void > {
230
237
let garbageCollectedMempoolTxs : string [ ] = [ ] ;
231
238
let newTxData : DataStoreTxEventData [ ] = [ ] ;
239
+ let reorg : ReOrgUpdatedEntities = newReOrgUpdatedEntities ( ) ;
240
+ let isCanonical = true ;
232
241
233
242
await this . sqlWriteTransaction ( async sql => {
234
243
const chainTip = await this . getChainTip ( sql ) ;
235
- const reorg = await this . handleReorg ( sql , data . block , chainTip . block_height ) ;
236
- const isCanonical = data . block . block_height > chainTip . block_height ;
244
+ reorg = await this . handleReorg ( sql , data . block , chainTip . block_height ) ;
245
+ isCanonical = data . block . block_height > chainTip . block_height ;
237
246
if ( ! isCanonical ) {
238
247
markBlockUpdateDataAsNonCanonical ( data ) ;
239
248
} else {
@@ -396,6 +405,9 @@ export class PgWriteStore extends PgStore {
396
405
}
397
406
}
398
407
} ) ;
408
+ if ( isCanonical ) {
409
+ await this . redisNotifier ?. notify ( reorg , data . block . index_block_hash , data . block . block_height ) ;
410
+ }
399
411
// Do we have an IBD height defined in ENV? If so, check if this block update reached it.
400
412
const ibdHeight = getIbdBlockHeight ( ) ;
401
413
this . isIbdBlockHeightReached = ibdHeight ? data . block . block_height > ibdHeight : true ;
@@ -3548,6 +3560,13 @@ export class PgWriteStore extends PgStore {
3548
3560
return result ;
3549
3561
}
3550
3562
3563
+ /**
3564
+ * Recursively restore previously orphaned blocks to canonical.
3565
+ * @param sql - The SQL client
3566
+ * @param indexBlockHash - The index block hash that we will restore first
3567
+ * @param updatedEntities - The updated entities
3568
+ * @returns The updated entities
3569
+ */
3551
3570
async restoreOrphanedChain (
3552
3571
sql : PgSqlClient ,
3553
3572
indexBlockHash : string ,
@@ -3568,6 +3587,10 @@ export class PgWriteStore extends PgStore {
3568
3587
throw new Error ( `Found multiple non-canonical parents for index_hash ${ indexBlockHash } ` ) ;
3569
3588
}
3570
3589
updatedEntities . markedCanonical . blocks ++ ;
3590
+ updatedEntities . markedCanonical . blockHeaders . unshift ( {
3591
+ index_block_hash : restoredBlockResult [ 0 ] . index_block_hash ,
3592
+ block_height : restoredBlockResult [ 0 ] . block_height ,
3593
+ } ) ;
3571
3594
3572
3595
// Orphan the now conflicting block at the same height
3573
3596
const orphanedBlockResult = await sql < BlockQueryResult [ ] > `
@@ -3606,6 +3629,10 @@ export class PgWriteStore extends PgStore {
3606
3629
}
3607
3630
3608
3631
updatedEntities . markedNonCanonical . blocks ++ ;
3632
+ updatedEntities . markedNonCanonical . blockHeaders . unshift ( {
3633
+ index_block_hash : orphanedBlockResult [ 0 ] . index_block_hash ,
3634
+ block_height : orphanedBlockResult [ 0 ] . block_height ,
3635
+ } ) ;
3609
3636
const markNonCanonicalResult = await this . markEntitiesCanonical (
3610
3637
sql ,
3611
3638
orphanedBlockResult [ 0 ] . index_block_hash ,
@@ -3662,6 +3689,8 @@ export class PgWriteStore extends PgStore {
3662
3689
markCanonicalResult . txsMarkedCanonical
3663
3690
) ;
3664
3691
updatedEntities . prunedMempoolTxs += prunedMempoolTxs . removedTxs . length ;
3692
+
3693
+ // Do we have a parent that is non-canonical? If so, restore it recursively.
3665
3694
const parentResult = await sql < { index_block_hash : string } [ ] > `
3666
3695
SELECT index_block_hash
3667
3696
FROM blocks
@@ -4019,6 +4048,7 @@ export class PgWriteStore extends PgStore {
4019
4048
if ( this . _debounceMempoolStat . debounce ) {
4020
4049
clearTimeout ( this . _debounceMempoolStat . debounce ) ;
4021
4050
}
4051
+ await this . redisNotifier ?. close ( ) ;
4022
4052
await super . close ( args ) ;
4023
4053
}
4024
4054
}
0 commit comments