Skip to content

Commit d54270a

Browse files
committed
1
1 parent b6e887a commit d54270a

File tree

4 files changed

+74
-19
lines changed

4 files changed

+74
-19
lines changed

core/filtermaps/indexer.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package filtermaps
1818

1919
import (
20+
"fmt"
2021
"math"
2122
"sync"
2223

@@ -75,6 +76,7 @@ type Config struct {
7576
// TODO blockId vs blockHash?
7677
// TODO disable, export, history, finalized
7778
func NewIndexer(db ethdb.KeyValueStore, params *Params, config Config) *Indexer {
79+
params.sanitize()
7880
mapDb := newMapDatabase(params, db, config.HashScheme)
7981
ix := &Indexer{
8082
config: config,
@@ -88,17 +90,25 @@ func NewIndexer(db ethdb.KeyValueStore, params *Params, config Config) *Indexer
8890
ix.updateTargetTailEpoch()
8991
ix.updateActiveViewTailEpoch()
9092
ix.updateTailState()
93+
fmt.Println("init tail epoch", ix.tailEpoch, "tail target", ix.targetTailEpoch, "head number", ix.headNumber)
9194
return ix
9295
}
9396

9497
func (ix *Indexer) initMapBoundary(nextMap, limitMap uint32) *renderState {
98+
fmt.Println("initMapBoundary", nextMap, limitMap)
9599
rs := &renderState{
96100
params: ix.storage.params,
97101
renderRange: common.NewRange[uint32](nextMap, limitMap-nextMap),
98102
currentMap: ix.storage.params.newMemoryMap(),
99103
}
100-
for nextMap > 0 {
104+
for {
101105
nextMap = ix.storage.lastBoundaryBefore(nextMap)
106+
fmt.Println(" lbb", nextMap)
107+
if nextMap == 0 {
108+
// initialize at genesis
109+
fmt.Println(" genesis")
110+
return rs
111+
}
102112
lastNumber, lastHash, err := ix.storage.getLastBlockOfMap(nextMap - 1)
103113
if err != nil {
104114
log.Error("Last block of map not found, reverting database", "mapIndex", nextMap)
@@ -118,10 +128,9 @@ func (ix *Indexer) initMapBoundary(nextMap, limitMap uint32) *renderState {
118128
rs.nextBlock = lastNumber
119129
rs.partialBlock = true
120130
rs.partialBlockHash = lastHash
131+
fmt.Println(" nextBlock", rs.nextBlock, "mapIndex", rs.mapIndex)
121132
return rs
122133
}
123-
// initialize at genesis
124-
return rs
125134
}
126135

127136
func (ix *Indexer) initSnapshot(snapshot *IndexView) *renderState {
@@ -132,6 +141,7 @@ func (ix *Indexer) initSnapshot(snapshot *IndexView) *renderState {
132141
return nil
133142
}
134143

144+
fmt.Println("initSnapshot", snapshot.headBlockHash)
135145
return &renderState{
136146
params: ix.storage.params,
137147
renderRange: common.NewRange[uint32](snapshot.headMapIndex, math.MaxUint32-snapshot.headMapIndex),
@@ -142,6 +152,7 @@ func (ix *Indexer) initSnapshot(snapshot *IndexView) *renderState {
142152
}
143153

144154
func (ix *Indexer) revertMaps(mapIndex uint32) {
155+
fmt.Println("revertMaps", mapIndex)
145156
if mapIndex < ix.storage.lastBoundaryBefore(math.MaxUint32) {
146157
for hash, iv := range ix.snapshots {
147158
if iv.firstMemoryMap > mapIndex {
@@ -157,7 +168,7 @@ func (ix *Indexer) revertMaps(mapIndex uint32) {
157168
if mapIndex <= ix.headRenderer.mapIndex {
158169
ix.headRenderer = nil
159170
}
160-
if mapIndex <= ix.tailRenderer.mapIndex {
171+
if ix.tailRenderer != nil && mapIndex <= ix.tailRenderer.mapIndex {
161172
ix.tailRenderer = nil
162173
}
163174
}
@@ -276,26 +287,38 @@ func (ix *Indexer) AddBlockData(headers []*types.Header, receipts []types.Receip
276287
return ix.Status()
277288
}
278289

290+
// epochsUntilBlock returns the numer of epochs in the checkpoint list whose
291+
// last block number is less than or equal to the specified number.
279292
func (cpList checkpointList) epochsUntilBlock(number uint64) uint32 {
280-
first, afterLast := uint32(0), uint32(len(cpList))
281-
for first+1 < afterLast {
282-
mid := (first + afterLast) / 2
293+
fmt.Println("epochsUntilBlock", number)
294+
first, last := uint32(0), uint32(len(cpList))
295+
for first < last {
296+
fmt.Println(" *", first, last)
297+
mid := (first + last) / 2
283298
if cpList[mid].BlockNumber > number {
284-
afterLast = mid
299+
last = mid
285300
} else {
286-
first = mid
301+
first = mid + 1
287302
}
288303
}
304+
fmt.Println(" **", first)
289305
return first
290306
}
291307

292308
func (ix *Indexer) tryCheckpointInit(number uint64, id common.Hash) {
309+
fmt.Println("tryCheckpointInit", number, id)
293310
var ci int
294311
for ci < len(ix.checkpoints) {
295312
cpList := ix.checkpoints[ci]
296313
epochs := cpList.epochsUntilBlock(number)
314+
fmt.Println(" cpList", len(cpList), epochs)
297315
if epochs == 0 || cpList[epochs-1].BlockNumber != number {
298-
// no matching block number, skip list (a relevant block might match later)
316+
if epochs == 0 {
317+
fmt.Println(" skip *", number)
318+
} else {
319+
fmt.Println(" skip", cpList[epochs-1].BlockNumber, number)
320+
}
321+
// block number does not match, skip list (a relevant block might match later)
299322
ci++
300323
continue
301324
}
@@ -304,6 +327,7 @@ func (ix *Indexer) tryCheckpointInit(number uint64, id common.Hash) {
304327
if err := ix.storage.addKnownEpochs(cpList[:epochs]); err == nil {
305328
ix.checkpoints = []checkpointList{cpList}
306329
ix.headRenderer = ix.initMapBoundary(epochs*ix.storage.params.mapsPerEpoch, math.MaxUint32)
330+
fmt.Println(" success")
307331
return
308332
} else {
309333
log.Error("Error initializing epoch boundaries", "error", err)
@@ -313,6 +337,7 @@ func (ix *Indexer) tryCheckpointInit(number uint64, id common.Hash) {
313337
ix.checkpoints[ci] = ix.checkpoints[len(ix.checkpoints)-1]
314338
ix.checkpoints = ix.checkpoints[:len(ix.checkpoints)-1]
315339
}
340+
fmt.Println(" no match")
316341
}
317342

318343
func (ix *Indexer) SetFinalized(blockNumber uint64) {
@@ -361,23 +386,29 @@ func (ix *Indexer) Status() (bool, common.Range[uint64]) {
361386
}
362387

363388
func (ix *Indexer) needBlocks() common.Range[uint64] {
389+
fmt.Println("needBlocks", ix.finalized, ix.headRenderer.nextBlock, ix.tailRenderer != nil)
364390
if ix.finalized > ix.headRenderer.nextBlock {
365391
// request potential checkpoint in this range if available
366392
for _, cpList := range ix.checkpoints {
393+
fmt.Println("cpList", len(cpList))
367394
if epochs := cpList.epochsUntilBlock(ix.headNumber); epochs > 0 {
368395
blockNumber := cpList[epochs-1].BlockNumber
396+
fmt.Println("epochs", epochs, "blockNumber", blockNumber)
369397
if ix.storage.lastBoundaryBefore(math.MaxUint32) >= epochs*ix.storage.params.mapsPerEpoch ||
370398
blockNumber <= ix.headRenderer.nextBlock || blockNumber < ix.historyCutoff {
399+
fmt.Println(" cont", ix.storage.lastBoundaryBefore(math.MaxUint32), ix.historyCutoff, ix.storage.params.mapsPerEpoch)
371400
continue
372401
}
402+
fmt.Println(" chk", blockNumber)
373403
return common.NewRange[uint64](blockNumber, 1)
374404
}
375405
}
376406
}
377407
if ix.headRenderer.nextBlock <= ix.headNumber && ix.headRenderer.nextBlock > ix.historyCutoff {
378408
return common.NewRange[uint64](ix.headRenderer.nextBlock, ix.headNumber+1-ix.headRenderer.nextBlock)
379409
}
380-
if ix.tailRenderer.nextBlock <= ix.tailRenderLast && ix.tailRenderer.nextBlock > ix.historyCutoff {
410+
if ix.tailRenderer != nil &&
411+
ix.tailRenderer.nextBlock <= ix.tailRenderLast && ix.tailRenderer.nextBlock > ix.historyCutoff {
381412
return common.NewRange[uint64](ix.tailRenderer.nextBlock, ix.tailRenderLast+1-ix.tailRenderer.nextBlock)
382413
}
383414
return common.Range[uint64]{}

core/filtermaps/map_database.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func (m *mapDatabase) getLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, e
137137
return lastBlock.number, lastBlock.id, nil
138138
}
139139
number, id, err := rawdb.ReadFilterMapLastBlock(m.db, mapIndex)
140+
fmt.Println(" read lbm", mapIndex, number, err)
140141
if err != nil {
141142
return 0, common.Hash{}, fmt.Errorf("failed to retrieve last block of map %d: %v", mapIndex, err)
142143
}
@@ -149,6 +150,7 @@ func (m *mapDatabase) getLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, e
149150
func (m *mapDatabase) storeLastBlockOfMap(mapIndex uint32, number uint64, id common.Hash) {
150151
m.lastBlockCache.Add(mapIndex, lastBlockOfMap{number: number, id: id})
151152
rawdb.WriteFilterMapLastBlock(m.db, mapIndex, number, id)
153+
fmt.Println(" store lbm", mapIndex, number)
152154
}
153155

154156
// deleteLastBlockOfMap deletes the number of the block that generated the last
@@ -338,6 +340,7 @@ type writePatterItem struct {
338340

339341
// dirty: before setting write range to dirty
340342
func (m *mapDatabase) writeMaps(writeMaps, valid, dirty common.Range[uint32], maps map[uint32]*finishedMap, stopCallback func() bool) (bool, error) {
343+
fmt.Println("writeMaps", writeMaps, valid, dirty)
341344
writePattern := m.makeWritePattern(writeMaps, valid, dirty)
342345
batch := m.db.NewBatch()
343346
rowsPerBatch := uint32(max(maxWritesPerBatch/len(writePattern), 1))
@@ -384,6 +387,7 @@ func (m *mapDatabase) writeMaps(writeMaps, valid, dirty common.Range[uint32], ma
384387
if err := batch.Write(); err != nil {
385388
return false, err
386389
}
390+
fmt.Println("writeMaps success")
387391
return true, nil
388392
}
389393

core/filtermaps/map_storage.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type mapStorage struct {
3535

3636
lock sync.RWMutex
3737
initialized bool
38-
knownEpochs uint32 // epochs initialized with last map block pointer and corresponding reverse block lv pointer
38+
knownEpochs uint32 // epochs initialized with last map block pointer and corresponding reverse block lv pointer
39+
knownEpochBlocks uint64
3940
valid, dirty rangeSet[uint32] // valid and dirty maps in database
4041
overlay rangeSet[uint32] // memory maps
4142
overlayCount uint32
@@ -96,9 +97,9 @@ func (m *mapStorage) lastBoundaryBefore(mapIndex uint32) uint32 {
9697
if mapIndex == 0 {
9798
return 0
9899
}
99-
var lastBoundary uint32
100+
lastBoundary := m.params.firstEpochMap(min(m.params.mapEpoch(mapIndex), m.knownEpochs))
100101
if m, ok := m.valid.closestLte(mapIndex - 1); ok {
101-
lastBoundary = m + 1
102+
lastBoundary = max(lastBoundary, m+1)
102103
}
103104
if m, ok := m.overlay.closestLte(mapIndex - 1); ok {
104105
lastBoundary = max(lastBoundary, m+1)
@@ -133,6 +134,7 @@ func (m *mapStorage) addKnownEpochs(cpList checkpointList) error {
133134
m.lock.Lock()
134135
defer m.lock.Unlock()
135136

137+
fmt.Println("addKnownEpochs", m.knownEpochs, len(cpList))
136138
if uint32(len(cpList)) <= m.knownEpochs {
137139
return errors.New("checkpoint init list has no new epochs")
138140
}
@@ -151,10 +153,12 @@ func (m *mapStorage) addKnownEpochs(cpList checkpointList) error {
151153
}
152154

153155
for epoch := m.knownEpochs; epoch < uint32(len(cpList)); epoch++ {
156+
fmt.Println(" store", epoch, cpList[epoch].BlockNumber)
154157
m.mapDb.storeLastBlockOfMap(m.params.lastEpochMap(epoch), cpList[epoch].BlockNumber, cpList[epoch].BlockId)
155158
m.mapDb.storeBlockLvPointer(cpList[epoch].BlockNumber, cpList[epoch].FirstIndex)
156159
}
157160
m.knownEpochs = uint32(len(cpList))
161+
m.knownEpochBlocks = cpList[len(cpList)-1].BlockNumber + 1
158162
m.mapDb.storeMapRange(m.valid, m.dirty, m.knownEpochs)
159163
return nil
160164
}
@@ -163,6 +167,7 @@ func (m *mapStorage) addMap(mapIndex uint32, fm *finishedMap, forceCommit bool)
163167
m.lock.Lock()
164168
defer m.lock.Unlock()
165169

170+
fmt.Println("addMap", mapIndex, forceCommit, m.overlay.count())
166171
if m.valid.includes(mapIndex) || m.overlay.includes(mapIndex) {
167172
panic("addMap to non-empty map index")
168173
}
@@ -175,6 +180,7 @@ func (m *mapStorage) addMap(mapIndex uint32, fm *finishedMap, forceCommit bool)
175180
m.overlayUpdated()
176181
if epoch >= m.knownEpochs && mapIndex == m.params.lastEpochMap(epoch) {
177182
m.knownEpochs = epoch + 1
183+
m.knownEpochBlocks = fm.lastBlock.number + 1
178184
}
179185
m.maps[mapIndex] = fm
180186
if forceCommit || (mapIndex+1)%m.params.rowGroupSize[0] == 0 {
@@ -187,6 +193,7 @@ func (m *mapStorage) deleteMaps(maps common.Range[uint32]) {
187193
m.lock.Lock()
188194
defer m.lock.Unlock()
189195

196+
fmt.Println("deleteMaps", maps)
190197
dr := rangeSet[uint32]{maps}
191198
for i := range dr.intersection(m.overlay).iter() {
192199
delete(m.maps, i)
@@ -196,7 +203,20 @@ func (m *mapStorage) deleteMaps(maps common.Range[uint32]) {
196203
m.dirty = m.dirty.union(dr.intersection(m.valid))
197204
m.valid = m.valid.exclude(dr)
198205
if m.params.mapEpoch(maps.AfterLast()) >= m.knownEpochs {
199-
m.knownEpochs = min(m.knownEpochs, m.params.mapEpoch(maps.First()))
206+
if epochs := m.params.mapEpoch(maps.First()); epochs < m.knownEpochs {
207+
m.knownEpochs = epochs
208+
if epochs > 0 {
209+
last, _, err := m.mapDb.getLastBlockOfMap(m.params.lastEpochMap(epochs - 1))
210+
if err != nil {
211+
m.resetWithError(fmt.Sprintf("could not revert valid block range: %v", err))
212+
m.trigger()
213+
return
214+
}
215+
m.knownEpochBlocks = last + 1
216+
} else {
217+
m.knownEpochBlocks = 0
218+
}
219+
}
200220
}
201221
if err := m.validUpdated(); err != nil {
202222
m.resetWithError(fmt.Sprintf("could not revert valid block range: %v", err))
@@ -291,7 +311,7 @@ func (m *mapStorage) getBlockLvPointer(blockNumber uint64) (uint64, error) {
291311
}
292312
return 0, errors.New("memory overlay block pointer not found")
293313
}
294-
if m.validBlocks.includes(blockNumber) {
314+
if blockNumber < m.knownEpochBlocks || m.validBlocks.includes(blockNumber) {
295315
return m.mapDb.getBlockLvPointer(blockNumber)
296316
}
297317
return 0, errors.New("block log value pointer not found")
@@ -308,7 +328,7 @@ func (m *mapStorage) getLastBlockOfMap(mapIndex uint32) (uint64, common.Hash, er
308328
}
309329
return fm.lastBlock.number, fm.lastBlock.id, nil
310330
}
311-
if m.valid.includes(mapIndex) || m.valid.includes(mapIndex+1) {
331+
if mapIndex < m.params.firstEpochMap(m.knownEpochs) || m.valid.includes(mapIndex) || m.valid.includes(mapIndex+1) {
312332
return m.mapDb.getLastBlockOfMap(mapIndex)
313333
}
314334
return 0, common.Hash{}, errors.New("last block of map not found")

core/index_server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,13 @@ func (s *indexServer) sendHeadBlockData(headers []*types.Header, receipts []type
352352
if len(headers) == 0 {
353353
return
354354
}
355-
lastHash := s.lastHead.Hash()
355+
/*lastHash := s.lastHead.Hash()
356356
for _, header := range headers {
357357
if header.ParentHash != lastHash {
358358
panic("non-continuous head header chain sent to indexer")
359359
}
360360
lastHash = header.Hash()
361-
}
361+
}*/
362362
s.ready, s.needBlocks = s.indexer.AddBlockData(headers, receipts)
363363
s.suspendCh = nil
364364
s.lastHead = headers[len(headers)-1]

0 commit comments

Comments
 (0)