Skip to content

Commit 25a5ab3

Browse files
committed
core/filtermaps: added comments and small fixes
1 parent a30c570 commit 25a5ab3

File tree

1 file changed

+74
-21
lines changed

1 file changed

+74
-21
lines changed

core/filtermaps/indexer.go

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ var (
4242
mapWriteTimer = metrics.NewRegisteredTimer("filtermaps/maps/writetime", nil) // time elapsed while writing a batch of finished maps to db
4343
)
4444

45+
// Indexer maintains a search data structure based on a parent blockchain that
46+
// is intended to make log event search more efficient. Once indexed up to the
47+
// chain head, it provides IndexView objects for recent chain heads.
48+
// Indexer implements core.Indexer.
4549
type Indexer struct {
4650
config Config
4751
storage *mapStorage
@@ -58,7 +62,7 @@ type Indexer struct {
5862
headMapsCache *lru.Cache[uint32, *finishedMap]
5963
}
6064

61-
// Config contains the configuration options for NewFilterMaps.
65+
// Config contains the configuration options for Indexer.
6266
type Config struct {
6367
History uint64 // number of historical blocks to index
6468
Disabled bool // disables indexing completely
@@ -72,6 +76,7 @@ type Config struct {
7276
HashScheme bool
7377
}
7478

79+
// NewIndexer creates a new Indexer.
7580
func NewIndexer(db ethdb.KeyValueStore, params Params, config Config) *Indexer {
7681
params.sanitize()
7782
mapDb := newMapDatabase(&params, db, config.HashScheme)
@@ -93,13 +98,29 @@ func NewIndexer(db ethdb.KeyValueStore, params Params, config Config) *Indexer {
9398
return ix
9499
}
95100

101+
// Status returns the current indexer status. The ready flag indicates whether
102+
// the indexer is ready to process new block data. The needBlocks range, if not
103+
// empty, indicates that the indexer requests past blocks in order to complete
104+
// the index. These blocks should be delivered in strictly ascending order.
105+
// Note that if ready is false then needBlocks might still be non-empty, in
106+
// which case the blocks are not expected to be delivered yet but the index
107+
// server might already start pre-fetching them.
108+
// Status implements core.Indexer.
96109
func (ix *Indexer) Status() (bool, common.Range[uint64]) {
97110
if ix.config.Disabled {
98111
return false, common.Range[uint64]{}
99112
}
100113
return ix.storage.isReady(), ix.needBlocks()
101114
}
102115

116+
// AddBlockData delivers block data for new heads and requested historical range.
117+
// It returns the indexer status. Unwanted data is silently ignored. If the
118+
// indexer is not ready to process then the received data is also ignored and
119+
// then requested through the needBlocks response either in the current response
120+
// or later.
121+
// Note that this function also resumes the storage layer background process if
122+
// it was previously suspended.
123+
// AddBlockData implements core.Indexer.
103124
func (ix *Indexer) AddBlockData(headers []*types.Header, receipts []types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
104125
if ix.config.Disabled {
105126
return false, common.Range[uint64]{}
@@ -144,12 +165,20 @@ func (ix *Indexer) AddBlockData(headers []*types.Header, receipts []types.Receip
144165
} else {
145166
// Note that if there is a canonical hash mismatch at the tail epoch then we need to revert the head renderer before this point.
146167
ix.headRenderer = ix.initMapBoundary(max(ix.tailRenderer.renderRange.First(), 1)-1, math.MaxUint32)
168+
ix.tailRenderer = nil
147169
}
148170
}
149171
}
150-
return ix.Status()
172+
return ix.storage.isReady(), ix.needBlocks()
151173
}
152174

175+
// Revert resets the index head to the given block number. Note that the indexer
176+
// might have to discard more data if a snapshot is not available for the given
177+
// block number. In this case it will request previously delivered but discarded
178+
// block data through the needBlocks status response.
179+
// Note that Revert works even if the indexer is in a "not ready" status, thereby
180+
// guaranteeing that all index data is always consistent with the canonical chain.
181+
// Revert implements core.Indexer.
153182
func (ix *Indexer) Revert(blockNumber uint64) {
154183
if ix.config.Disabled {
155184
return
@@ -186,56 +215,77 @@ func (ix *Indexer) Revert(blockNumber uint64) {
186215
ix.updateTailEpoch()
187216
}
188217

218+
// SetFinalized notifies the indexer about the latest finalized block number.
219+
// SetFinalized implements core.Indexer.
189220
func (ix *Indexer) SetFinalized(blockNumber uint64) {
190221
ix.finalized = blockNumber
191222
}
192223

224+
// SetHistoryCutoff notifies the indexer about the latest historical cutoff point.
225+
// The indexer will not request block data earlier than this point.
226+
// SetHistoryCutoff implements core.Indexer.
193227
func (ix *Indexer) SetHistoryCutoff(blockNumber uint64) {
194228
ix.historyCutoff = blockNumber
195229
}
196230

231+
// Suspended suspends the asynchronous storage layer background process during
232+
// block processing. The next AddBlockData call will resume this process.
233+
// Suspended implements core.Indexer.
197234
func (ix *Indexer) Suspended() {
198235
if ix.config.Disabled {
199236
return
200237
}
201238
ix.storage.suspendOrResume(true)
202239
}
203240

204-
func (ix *Indexer) initMapBoundary(nextMap, limitMap uint32) *renderState {
241+
// initMapBoundary initializes a new map renderer at the last suitable map
242+
// boundary before startMap. If this boundary is not right before startMap then
243+
// startMap is lowered to right after the boundary. The returned renderState
244+
// will render maps in the startMap..limitMap-1 range.
245+
// Note that the first requested block typically still starts in the previous
246+
// map and in case of tail renderers with an upper map limit, the last requested
247+
// block typically ends after the upper limit. In this case the maps outside the
248+
// rendered range are not modified, the log values outside the range are ignored.
249+
func (ix *Indexer) initMapBoundary(startMap, limitMap uint32) *renderState {
205250
rs := &renderState{
206-
params: ix.storage.params,
207-
renderRange: common.NewRange[uint32](nextMap, limitMap-nextMap),
251+
params: ix.storage.params,
208252
}
209253
for {
210-
nextMap = ix.storage.lastBoundaryBefore(nextMap)
211-
if nextMap == 0 {
212-
// initialize at genesis
213-
rs.currentMap = rs.params.newMemoryMap()
214-
return rs
254+
startMap = ix.storage.lastBoundaryBefore(startMap)
255+
if startMap == 0 {
256+
break
215257
}
216-
lastNumber, lastHash, err := ix.storage.getLastBlockOfMap(nextMap - 1)
258+
lastNumber, lastHash, err := ix.storage.getLastBlockOfMap(startMap - 1)
217259
if err != nil {
218-
log.Error("Last block of map not found, reverting database", "mapIndex", nextMap-1)
219-
nextMap = ix.storage.lastBoundaryBefore(nextMap - 1)
220-
ix.revertMaps(nextMap)
260+
log.Error("Last block of map not found, reverting database", "mapIndex", startMap-1)
261+
startMap = ix.storage.lastBoundaryBefore(startMap - 1)
262+
ix.revertMaps(startMap)
221263
continue
222264
}
223265
lvPointer, err := ix.storage.getBlockLvPointer(lastNumber)
224266
if err != nil {
225-
log.Error("Block pointer of last block of map not found, reverting database", "mapIndex", nextMap-1, "blockNumber", lastNumber)
226-
nextMap = ix.storage.lastBoundaryBefore(nextMap - 1)
227-
ix.revertMaps(nextMap)
267+
log.Error("Block pointer of last block of map not found, reverting database", "mapIndex", startMap-1, "blockNumber", lastNumber)
268+
startMap = ix.storage.lastBoundaryBefore(startMap - 1)
269+
ix.revertMaps(startMap)
228270
continue
229271
}
230272
rs.lvPointer = lvPointer
231273
rs.mapIndex = uint32(lvPointer >> ix.storage.params.logValuesPerMap)
232274
rs.nextBlock = lastNumber
233275
rs.partialBlock = true
234276
rs.partialBlockHash = lastHash
235-
return rs
277+
break
278+
}
279+
rs.renderRange = common.NewRange[uint32](startMap, limitMap-startMap)
280+
if rs.renderRange.Includes(rs.mapIndex) {
281+
rs.currentMap = rs.params.newMemoryMap()
236282
}
283+
return rs
237284
}
238285

286+
// initSnapshot initializes a new map renderer based on a snapshot. Since this
287+
// method is only used to initialize head renderers, a snapshot initialized
288+
// renderState always has an upper render limit of MaxUint32-1.
239289
func (ix *Indexer) initSnapshot(snapshot *IndexView) *renderState {
240290
mapIndex := ix.storage.lastBoundaryBefore(snapshot.firstMemoryMap)
241291
ix.revertMaps(mapIndex)
@@ -253,8 +303,11 @@ func (ix *Indexer) initSnapshot(snapshot *IndexView) *renderState {
253303
}
254304
}
255305

256-
// Note that revertMaps might be called while headRenderer is nil and might set
257-
// headRenderer to nil.
306+
// revertMaps removes all rendered maps starting from mapIndex. It also removes
307+
// active renderers and snapshots invalidated by the revert and purges the map
308+
// cache.
309+
// Note that while headRenderer generally always exists, revertMaps might be
310+
// called while headRenderer is nil and might set headRenderer to nil.
258311
func (ix *Indexer) revertMaps(mapIndex uint32) {
259312
if mapIndex < ix.storage.lastBoundaryBefore(math.MaxUint32) {
260313
for hash, iv := range ix.snapshots {
@@ -372,7 +425,7 @@ func (ix *Indexer) tryCheckpointInit(number uint64, hash common.Hash) {
372425
// apply matching checkpoint, discard other lists
373426
if err := ix.storage.addKnownEpochs(cpList[:epochs]); err == nil {
374427
ix.checkpoints = []checkpointList{cpList}
375-
ix.headRenderer = ix.initMapBoundary(epochs*ix.storage.params.mapsPerEpoch, math.MaxUint32)
428+
ix.headRenderer = ix.initMapBoundary(ix.storage.params.firstEpochMap(epochs), math.MaxUint32)
376429
return
377430
} else {
378431
log.Error("Error initializing epoch boundaries", "error", err)

0 commit comments

Comments
 (0)