Skip to content

Commit 33f0c97

Browse files
committed
core: simplified index server
1 parent 6dfdbd7 commit 33f0c97

File tree

6 files changed

+608
-281
lines changed

6 files changed

+608
-281
lines changed

core/blockchain.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,7 +1180,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
11801180
log.Crit("Failed to write genesis block", "err", err)
11811181
}
11821182
bc.writeHeadBlock(genesis)
1183-
bc.indexServers.broadcast(genesis.Header(), true)
1183+
bc.indexServers.broadcast(genesis.Header())
11841184

11851185
// Last update all in-memory chain markers
11861186
bc.genesisBlock = genesis
@@ -1599,7 +1599,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
15991599
}
16001600
}
16011601
bc.writeHeadBlock(block)
1602-
bc.indexServers.broadcast(block.Header(), true)
1602+
bc.indexServers.broadcast(block.Header())
16031603
return nil
16041604
}
16051605

@@ -1713,7 +1713,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
17131713

17141714
// Set new head.
17151715
bc.writeHeadBlock(block)
1716-
bc.indexServers.broadcast(block.Header(), true)
1716+
bc.indexServers.broadcast(block.Header())
17171717

17181718
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
17191719
if len(logs) > 0 {
@@ -1780,7 +1780,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
17801780
}
17811781

17821782
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
1783-
bc.indexServers.suspended()
1783+
bc.indexServers.suspend()
17841784
bc.blockProcFeed.Send(true)
17851785
}
17861786
defer func() {
@@ -2534,7 +2534,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
25342534
}
25352535
// Update the head block
25362536
bc.writeHeadBlock(block)
2537-
bc.indexServers.broadcast(block.Header(), false)
2537+
bc.indexServers.broadcast(block.Header())
25382538
}
25392539
if len(rebirthLogs) > 0 {
25402540
bc.logsFeed.Send(rebirthLogs)
@@ -2610,7 +2610,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
26102610
}
26112611
}
26122612
bc.writeHeadBlock(head)
2613-
bc.indexServers.broadcast(head.Header(), true)
2613+
bc.indexServers.broadcast(head.Header())
26142614

26152615
// Emit events
26162616
logs := bc.collectLogs(head, false)

core/filtermaps/index_view.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (iv *IndexView) GetBlockLvPointer(blockNumber uint64) (uint64, error) {
9595
return iv.headLvPointer, nil
9696
}
9797
if blockNumber > iv.blockRange.AfterLast() {
98-
return 0, errors.New("block number out of range")
98+
return 0, ErrOutOfRange
9999
}
100100
for _, fm := range iv.finishedMaps {
101101
if blockNumber >= fm.firstBlock() && blockNumber <= fm.lastBlock.number {
@@ -123,7 +123,7 @@ func (iv *IndexView) GetLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, er
123123
return lastNumber, lastHash, err
124124
}
125125
if mapIndex > iv.headMapIndex {
126-
return 0, common.Hash{}, errors.New("map index out of range")
126+
return 0, common.Hash{}, ErrOutOfRange
127127
}
128128
if mapIndex == iv.headMapIndex {
129129
return iv.headMap.lastBlock.number, iv.headMap.lastBlock.hash, nil

core/filtermaps/indexer.go

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -142,52 +142,47 @@ func (ix *Indexer) Status() (bool, common.Range[uint64]) {
142142
// Note that this function also resumes the storage layer background process if
143143
// it was previously suspended.
144144
// AddBlockData implements core.Indexer.
145-
func (ix *Indexer) AddBlockData(headers []*types.Header, receipts []types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
145+
func (ix *Indexer) AddBlockData(header *types.Header, receipts types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
146146
if ix.config.Disabled {
147147
return false, common.Range[uint64]{}
148148
}
149-
if len(headers) == 0 {
150-
return ix.Status()
151-
}
152149
ix.storage.suspendOrResume(false)
153150
if !ix.storage.isReady() {
154151
return false, ix.needBlocks()
155152
}
156-
ix.headNumber = max(ix.headNumber, headers[len(headers)-1].Number.Uint64())
157-
for i, header := range headers {
158-
number, hash := header.Number.Uint64(), header.Hash()
159-
if number > ix.headRenderer.nextBlock {
160-
ix.tryCheckpointInit(number, hash)
161-
}
162-
if number == ix.headRenderer.nextBlock {
163-
if ix.headRenderer.checkNextHash(hash) {
164-
ix.headRenderer.addReceipts(receipts[i])
165-
firstMapIndex, finishedMaps := ix.headRenderer.addHeader(header)
166-
ix.storeFinishedMaps(firstMapIndex, finishedMaps, i == len(headers)-1, true)
167-
if number+maxCanonicalSnapshots > ix.headNumber {
168-
ix.storeHeadIndexView(number, hash)
169-
}
170-
} else {
171-
ix.headRenderer = ix.initMapBoundary(max(ix.headRenderer.renderRange.First(), 1)-1, math.MaxUint32)
153+
ix.headNumber = max(ix.headNumber, header.Number.Uint64())
154+
number, hash := header.Number.Uint64(), header.Hash()
155+
if number > ix.headRenderer.nextBlock {
156+
ix.tryCheckpointInit(number, hash)
157+
}
158+
if number == ix.headRenderer.nextBlock {
159+
if ix.headRenderer.checkNextHash(hash) {
160+
ix.headRenderer.addReceipts(receipts)
161+
firstMapIndex, finishedMaps := ix.headRenderer.addHeader(header)
162+
ix.storeFinishedMaps(firstMapIndex, finishedMaps, true, true)
163+
if number+maxCanonicalSnapshots > ix.headNumber {
164+
ix.storeHeadIndexView(number, hash)
172165
}
173-
ix.updateTailEpoch()
174-
ix.updateTailState()
166+
} else {
167+
ix.headRenderer = ix.initMapBoundary(max(ix.headRenderer.renderRange.First(), 1)-1, math.MaxUint32)
175168
}
176-
if ix.tailRenderer != nil && number == ix.tailRenderer.nextBlock {
177-
if ix.tailRenderer.checkNextHash(hash) {
178-
ix.tailRenderer.addReceipts(receipts[i])
179-
firstMapIndex, finishedMaps := ix.tailRenderer.addHeader(header)
180-
ix.storeFinishedMaps(firstMapIndex, finishedMaps, false, false)
181-
if ix.tailRenderer.finished() {
182-
ix.tailEpoch--
183-
ix.tailRenderer = nil
184-
ix.updateTailState()
185-
}
186-
} else {
187-
// Note that if there is a canonical hash mismatch at the tail epoch then we need to revert the head renderer before this point.
188-
ix.headRenderer = ix.initMapBoundary(max(ix.tailRenderer.renderRange.First(), 1)-1, math.MaxUint32)
169+
ix.updateTailEpoch()
170+
ix.updateTailState()
171+
}
172+
if ix.tailRenderer != nil && number == ix.tailRenderer.nextBlock {
173+
if ix.tailRenderer.checkNextHash(hash) {
174+
ix.tailRenderer.addReceipts(receipts)
175+
firstMapIndex, finishedMaps := ix.tailRenderer.addHeader(header)
176+
ix.storeFinishedMaps(firstMapIndex, finishedMaps, false, false)
177+
if ix.tailRenderer.finished() {
178+
ix.tailEpoch--
189179
ix.tailRenderer = nil
180+
ix.updateTailState()
190181
}
182+
} else {
183+
// Note that if there is a canonical hash mismatch at the tail epoch then we need to revert the head renderer before this point.
184+
ix.headRenderer = ix.initMapBoundary(max(ix.tailRenderer.renderRange.First(), 1)-1, math.MaxUint32)
185+
ix.tailRenderer = nil
191186
}
192187
}
193188
return ix.storage.isReady(), ix.needBlocks()
@@ -626,7 +621,9 @@ func (ix *Indexer) storeHeadIndexView(number uint64, hash common.Hash) {
626621
func (ix *Indexer) exportCheckpoints() {
627622
finalLvPtr, err := ix.storage.getBlockLvPointer(ix.finalized + 1)
628623
if err != nil {
629-
log.Error("Error fetching log value pointer of finalized block", "block", ix.finalized, "error", err)
624+
if err != ErrOutOfRange {
625+
log.Error("Error fetching log value pointer of finalized block", "block", ix.finalized, "error", err)
626+
}
630627
return
631628
}
632629
epochCount := ix.storage.params.mapEpoch(uint32(finalLvPtr >> ix.storage.params.logValuesPerMap))

core/filtermaps/map_storage.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/ethereum/go-ethereum/log"
2828
)
2929

30+
var ErrOutOfRange = errors.New("pointer out of indexed range")
31+
3032
// mapStorage implements a filter map storage layer over mapDatabase that ensures
3133
// efficient database usage while also providing a low latency interface to the
3234
// indexer. It uses a memory layer over mapDatabase allowing consistently quick
@@ -358,7 +360,7 @@ func (m *mapStorage) getBlockLvPointer(blockNumber uint64) (uint64, error) {
358360
if blockNumber < m.knownEpochBlocks || m.validBlocks.includes(blockNumber) {
359361
return m.mapDb.getBlockLvPointer(blockNumber)
360362
}
361-
return 0, errors.New("block log value pointer not found")
363+
return 0, ErrOutOfRange
362364
}
363365

364366
// getLastBlockOfMap returns the number and hash of the block that generated the
@@ -377,7 +379,7 @@ func (m *mapStorage) getLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, er
377379
if mapIndex < m.params.firstEpochMap(m.knownEpochs) || m.valid.includes(mapIndex) || m.valid.includes(mapIndex+1) {
378380
return m.mapDb.getLastBlockOfMap(mapIndex)
379381
}
380-
return 0, common.Hash{}, errors.New("last block of map not found")
382+
return 0, common.Hash{}, ErrOutOfRange
381383
}
382384

383385
// getFilterMapRows returns a batch of filter maps rows from the same row index,

0 commit comments

Comments
 (0)