@@ -303,6 +303,7 @@ type BlockChain struct {
303303 blockProcCounter int32
304304 scope event.SubscriptionScope
305305 genesisBlock * types.Block
306+ indexServers indexServers
306307
307308 // This mutex synchronizes chain write operations.
308309 // Readers don't need to take it, they can just read the database.
@@ -523,9 +524,15 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine,
523524 if bc .cfg .TxLookupLimit >= 0 {
524525 bc .txIndexer = newTxIndexer (uint64 (bc .cfg .TxLookupLimit ), bc )
525526 }
527+ bc .indexServers .init (bc )
526528 return bc , nil
527529}
528530
531+ // RegisterIndexer registers a new indexer to the chain.
532+ func (bc * BlockChain ) RegisterIndexer (indexer Indexer ) {
533+ bc .indexServers .register (indexer )
534+ }
535+
529536func (bc * BlockChain ) setupSnapshot () {
530537 // Short circuit if the chain is established with path scheme, as the
531538 // state snapshot has been integrated into path database natively.
@@ -638,6 +645,7 @@ func (bc *BlockChain) loadLastState() error {
638645 if head := rawdb .ReadFinalizedBlockHash (bc .db ); head != (common.Hash {}) {
639646 if block := bc .GetBlockByHash (head ); block != nil {
640647 bc .currentFinalBlock .Store (block .Header ())
648+ bc .indexServers .setFinalBlock (block .NumberU64 ())
641649 headFinalizedBlockGauge .Update (int64 (block .NumberU64 ()))
642650 bc .currentSafeBlock .Store (block .Header ())
643651 headSafeBlockGauge .Update (int64 (block .NumberU64 ()))
@@ -685,6 +693,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
685693 return errors .New ("unexpected database tail" )
686694 }
687695 bc .historyPrunePoint .Store (predefinedPoint )
696+ bc .indexServers .setHistoryCutoff (predefinedPoint .BlockNumber )
688697 return nil
689698
690699 case history .KeepPostMerge :
@@ -706,6 +715,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
706715 return errors .New ("unexpected database tail" )
707716 }
708717 bc .historyPrunePoint .Store (predefinedPoint )
718+ bc .indexServers .setHistoryCutoff (predefinedPoint .BlockNumber )
709719 return nil
710720
711721 default :
@@ -768,9 +778,11 @@ func (bc *BlockChain) SetFinalized(header *types.Header) {
768778 if header != nil {
769779 rawdb .WriteFinalizedBlockHash (bc .db , header .Hash ())
770780 headFinalizedBlockGauge .Update (int64 (header .Number .Uint64 ()))
781+ bc .indexServers .setFinalBlock (header .Number .Uint64 ())
771782 } else {
772783 rawdb .WriteFinalizedBlockHash (bc .db , common.Hash {})
773784 headFinalizedBlockGauge .Update (0 )
785+ bc .indexServers .setFinalBlock (0 )
774786 }
775787}
776788
@@ -1133,6 +1145,7 @@ func (bc *BlockChain) Reset() error {
11331145// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
11341146// specified genesis state.
11351147func (bc * BlockChain ) ResetWithGenesisBlock (genesis * types.Block ) error {
1148+ bc .indexServers .revert (genesis .Header ())
11361149 // Dump the entire block chain and purge the caches
11371150 if err := bc .SetHead (0 ); err != nil {
11381151 return err
@@ -1149,6 +1162,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
11491162 log .Crit ("Failed to write genesis block" , "err" , err )
11501163 }
11511164 bc .writeHeadBlock (genesis )
1165+ bc .indexServers .broadcast (genesis .Header (), true )
11521166
11531167 // Last update all in-memory chain markers
11541168 bc .genesisBlock = genesis
@@ -1261,6 +1275,7 @@ func (bc *BlockChain) stopWithoutSaving() {
12611275// Stop stops the blockchain service. If any imports are currently in progress
12621276// it will abort them using the procInterrupt.
12631277func (bc * BlockChain ) Stop () {
1278+ bc .indexServers .stop ()
12641279 bc .stopWithoutSaving ()
12651280
12661281 // Ensure that the entirety of the state snapshot is journaled to disk.
@@ -1562,6 +1577,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
15621577 }
15631578 }
15641579 bc .writeHeadBlock (block )
1580+ bc .indexServers .broadcast (block .Header (), true )
15651581 return nil
15661582}
15671583
@@ -1577,7 +1593,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
15771593 // should be written atomically. BlockBatch is used for containing all components.
15781594 blockBatch := bc .db .NewBatch ()
15791595 rawdb .WriteBlock (blockBatch , block )
1580- rawdb .WriteReceipts (blockBatch , block .Hash (), block .NumberU64 (), receipts )
1596+ blockHash := block .Hash ()
1597+ bc .blockCache .Add (blockHash , block )
1598+ rawdb .WriteReceipts (blockBatch , blockHash , block .NumberU64 (), receipts )
1599+ bc .receiptsCache .Add (blockHash , receipts )
15811600 rawdb .WritePreimages (blockBatch , statedb .Preimages ())
15821601 if err := blockBatch .Write (); err != nil {
15831602 log .Crit ("Failed to write block into disk" , "err" , err )
@@ -1664,6 +1683,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
16641683
16651684 // Set new head.
16661685 bc .writeHeadBlock (block )
1686+ bc .indexServers .broadcast (block .Header (), true )
16671687
16681688 bc .chainFeed .Send (ChainEvent {Header : block .Header ()})
16691689 if len (logs ) > 0 {
@@ -1730,10 +1750,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
17301750 }
17311751
17321752 if atomic .AddInt32 (& bc .blockProcCounter , 1 ) == 1 {
1753+ bc .indexServers .setSuspended (true )
17331754 bc .blockProcFeed .Send (true )
17341755 }
17351756 defer func () {
17361757 if atomic .AddInt32 (& bc .blockProcCounter , - 1 ) == 0 {
1758+ bc .indexServers .setSuspended (false )
17371759 bc .blockProcFeed .Send (false )
17381760 }
17391761 }()
@@ -2385,6 +2407,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
23852407 return errInvalidNewChain
23862408 }
23872409 }
2410+ bc .indexServers .revert (commonBlock )
23882411 // Ensure the user sees large reorgs
23892412 if len (oldChain ) > 0 && len (newChain ) > 0 {
23902413 logFn := log .Info
@@ -2482,6 +2505,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
24822505 }
24832506 // Update the head block
24842507 bc .writeHeadBlock (block )
2508+ bc .indexServers .broadcast (block .Header (), false )
24852509 }
24862510 if len (rebirthLogs ) > 0 {
24872511 bc .logsFeed .Send (rebirthLogs )
@@ -2557,6 +2581,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
25572581 }
25582582 }
25592583 bc .writeHeadBlock (head )
2584+ bc .indexServers .broadcast (head .Header (), true )
25602585
25612586 // Emit events
25622587 logs := bc .collectLogs (head , false )
0 commit comments