Skip to content

Commit f6525b6

Browse files
committed
core: simplified index server
1 parent 66912c5 commit f6525b6

File tree

6 files changed

+624
-287
lines changed

6 files changed

+624
-287
lines changed

core/blockchain.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,6 +1106,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
11061106
bc.receiptsCache.Purge()
11071107
bc.blockCache.Purge()
11081108
bc.txLookupCache.Purge()
1109+
bc.indexServers.revert(bc.CurrentBlock())
11091110

11101111
// Clear safe block, finalized block if needed
11111112
if safe := bc.CurrentSafeBlock(); safe != nil && head < safe.Number.Uint64() {
@@ -1179,7 +1180,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
11791180
log.Crit("Failed to write genesis block", "err", err)
11801181
}
11811182
bc.writeHeadBlock(genesis)
1182-
bc.indexServers.broadcast(genesis.Header(), true)
1183+
bc.indexServers.broadcast(genesis.Header())
11831184

11841185
// Last update all in-memory chain markers
11851186
bc.genesisBlock = genesis
@@ -1598,7 +1599,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
15981599
}
15991600
}
16001601
bc.writeHeadBlock(block)
1601-
bc.indexServers.broadcast(block.Header(), true)
1602+
bc.indexServers.broadcast(block.Header())
16021603
return nil
16031604
}
16041605

@@ -1712,7 +1713,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
17121713

17131714
// Set new head.
17141715
bc.writeHeadBlock(block)
1715-
bc.indexServers.broadcast(block.Header(), true)
1716+
bc.indexServers.broadcast(block.Header())
17161717

17171718
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
17181719
if len(logs) > 0 {
@@ -1779,11 +1780,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
17791780
}
17801781

17811782
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
1782-
bc.indexServers.suspended()
1783+
bc.indexServers.setBlockProcessing(true)
17831784
bc.blockProcFeed.Send(true)
17841785
}
17851786
defer func() {
17861787
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
1788+
bc.indexServers.setBlockProcessing(false)
17871789
bc.blockProcFeed.Send(false)
17881790
}
17891791
}()
@@ -2537,7 +2539,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
25372539
}
25382540
// Update the head block
25392541
bc.writeHeadBlock(block)
2540-
bc.indexServers.broadcast(block.Header(), false)
2542+
bc.indexServers.broadcast(block.Header())
25412543
}
25422544
if len(rebirthLogs) > 0 {
25432545
bc.logsFeed.Send(rebirthLogs)
@@ -2613,7 +2615,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
26132615
}
26142616
}
26152617
bc.writeHeadBlock(head)
2616-
bc.indexServers.broadcast(head.Header(), true)
2618+
bc.indexServers.broadcast(head.Header())
26172619

26182620
// Emit events
26192621
logs := bc.collectLogs(head, false)

core/filtermaps/index_view.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (iv *IndexView) GetBlockLvPointer(blockNumber uint64) (uint64, error) {
9595
return iv.headLvPointer, nil
9696
}
9797
if blockNumber > iv.blockRange.AfterLast() {
98-
return 0, errors.New("block number out of range")
98+
return 0, ErrOutOfRange
9999
}
100100
for _, fm := range iv.finishedMaps {
101101
if blockNumber >= fm.firstBlock() && blockNumber <= fm.lastBlock.number {
@@ -123,7 +123,7 @@ func (iv *IndexView) GetLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, er
123123
return lastNumber, lastHash, err
124124
}
125125
if mapIndex > iv.headMapIndex {
126-
return 0, common.Hash{}, errors.New("map index out of range")
126+
return 0, common.Hash{}, ErrOutOfRange
127127
}
128128
if mapIndex == iv.headMapIndex {
129129
return iv.headMap.lastBlock.number, iv.headMap.lastBlock.hash, nil

core/filtermaps/indexer.go

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -142,52 +142,47 @@ func (ix *Indexer) Status() (bool, common.Range[uint64]) {
142142
// Note that this function also resumes the storage layer background process if
143143
// it was previously suspended.
144144
// AddBlockData implements core.Indexer.
145-
func (ix *Indexer) AddBlockData(headers []*types.Header, receipts []types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
145+
func (ix *Indexer) AddBlockData(header *types.Header, receipts types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
146146
if ix.config.Disabled {
147147
return false, common.Range[uint64]{}
148148
}
149-
if len(headers) == 0 {
150-
return ix.Status()
151-
}
152149
ix.storage.suspendOrResume(false)
153150
if !ix.storage.isReady() {
154151
return false, ix.needBlocks()
155152
}
156-
ix.headNumber = max(ix.headNumber, headers[len(headers)-1].Number.Uint64())
157-
for i, header := range headers {
158-
number, hash := header.Number.Uint64(), header.Hash()
159-
if number > ix.headRenderer.nextBlock {
160-
ix.tryCheckpointInit(number, hash)
161-
}
162-
if number == ix.headRenderer.nextBlock {
163-
if ix.headRenderer.checkNextHash(hash) {
164-
ix.headRenderer.addReceipts(receipts[i])
165-
firstMapIndex, finishedMaps := ix.headRenderer.addHeader(header)
166-
ix.storeFinishedMaps(firstMapIndex, finishedMaps, i == len(headers)-1, true)
167-
if number+maxCanonicalSnapshots > ix.headNumber {
168-
ix.storeHeadIndexView(number, hash)
169-
}
170-
} else {
171-
ix.headRenderer = ix.initMapBoundary(max(ix.headRenderer.renderRange.First(), 1)-1, math.MaxUint32)
153+
ix.headNumber = max(ix.headNumber, header.Number.Uint64())
154+
number, hash := header.Number.Uint64(), header.Hash()
155+
if number > ix.headRenderer.nextBlock {
156+
ix.tryCheckpointInit(number, hash)
157+
}
158+
if number == ix.headRenderer.nextBlock {
159+
if ix.headRenderer.checkNextHash(hash) {
160+
ix.headRenderer.addReceipts(receipts)
161+
firstMapIndex, finishedMaps := ix.headRenderer.addHeader(header)
162+
ix.storeFinishedMaps(firstMapIndex, finishedMaps, true, true)
163+
if number+maxCanonicalSnapshots > ix.headNumber {
164+
ix.storeHeadIndexView(number, hash)
172165
}
173-
ix.updateTailEpoch()
174-
ix.updateTailState()
166+
} else {
167+
ix.headRenderer = ix.initMapBoundary(max(ix.headRenderer.renderRange.First(), 1)-1, math.MaxUint32)
175168
}
176-
if ix.tailRenderer != nil && number == ix.tailRenderer.nextBlock {
177-
if ix.tailRenderer.checkNextHash(hash) {
178-
ix.tailRenderer.addReceipts(receipts[i])
179-
firstMapIndex, finishedMaps := ix.tailRenderer.addHeader(header)
180-
ix.storeFinishedMaps(firstMapIndex, finishedMaps, false, false)
181-
if ix.tailRenderer.finished() {
182-
ix.tailEpoch--
183-
ix.tailRenderer = nil
184-
ix.updateTailState()
185-
}
186-
} else {
187-
// Note that if there is a canonical hash mismatch at the tail epoch then we need to revert the head renderer before this point.
188-
ix.headRenderer = ix.initMapBoundary(max(ix.tailRenderer.renderRange.First(), 1)-1, math.MaxUint32)
169+
ix.updateTailEpoch()
170+
ix.updateTailState()
171+
}
172+
if ix.tailRenderer != nil && number == ix.tailRenderer.nextBlock {
173+
if ix.tailRenderer.checkNextHash(hash) {
174+
ix.tailRenderer.addReceipts(receipts)
175+
firstMapIndex, finishedMaps := ix.tailRenderer.addHeader(header)
176+
ix.storeFinishedMaps(firstMapIndex, finishedMaps, false, false)
177+
if ix.tailRenderer.finished() {
178+
ix.tailEpoch--
189179
ix.tailRenderer = nil
180+
ix.updateTailState()
190181
}
182+
} else {
183+
// Note that if there is a canonical hash mismatch at the tail epoch then we need to revert the head renderer before this point.
184+
ix.headRenderer = ix.initMapBoundary(max(ix.tailRenderer.renderRange.First(), 1)-1, math.MaxUint32)
185+
ix.tailRenderer = nil
191186
}
192187
}
193188
return ix.storage.isReady(), ix.needBlocks()
@@ -626,7 +621,9 @@ func (ix *Indexer) storeHeadIndexView(number uint64, hash common.Hash) {
626621
func (ix *Indexer) exportCheckpoints() {
627622
finalLvPtr, err := ix.storage.getBlockLvPointer(ix.finalized + 1)
628623
if err != nil {
629-
log.Error("Error fetching log value pointer of finalized block", "block", ix.finalized, "error", err)
624+
if err != ErrOutOfRange {
625+
log.Error("Error fetching log value pointer of finalized block", "block", ix.finalized, "error", err)
626+
}
630627
return
631628
}
632629
epochCount := ix.storage.params.mapEpoch(uint32(finalLvPtr >> ix.storage.params.logValuesPerMap))

core/filtermaps/map_storage.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/ethereum/go-ethereum/log"
2828
)
2929

30+
var ErrOutOfRange = errors.New("pointer out of indexed range")
31+
3032
// mapStorage implements a filter map storage layer over mapDatabase that ensures
3133
// efficient database usage while also providing a low latency interface to the
3234
// indexer. It uses a memory layer over mapDatabase allowing consistently quick
@@ -358,7 +360,7 @@ func (m *mapStorage) getBlockLvPointer(blockNumber uint64) (uint64, error) {
358360
if blockNumber < m.knownEpochBlocks || m.validBlocks.includes(blockNumber) {
359361
return m.mapDb.getBlockLvPointer(blockNumber)
360362
}
361-
return 0, errors.New("block log value pointer not found")
363+
return 0, ErrOutOfRange
362364
}
363365

364366
// getLastBlockOfMap returns the number and hash of the block that generated the
@@ -377,7 +379,7 @@ func (m *mapStorage) getLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, er
377379
if mapIndex < m.params.firstEpochMap(m.knownEpochs) || m.valid.includes(mapIndex) || m.valid.includes(mapIndex+1) {
378380
return m.mapDb.getLastBlockOfMap(mapIndex)
379381
}
380-
return 0, common.Hash{}, errors.New("last block of map not found")
382+
return 0, common.Hash{}, ErrOutOfRange
381383
}
382384

383385
// getFilterMapRows returns a batch of filter maps rows from the same row index,

0 commit comments

Comments
 (0)