Skip to content

Commit d703cab

Browse files
committed
triedb/pathdb: polish the code
1 parent 35532a2 commit d703cab

File tree

2 files changed

+32
-78
lines changed

2 files changed

+32
-78
lines changed

triedb/pathdb/lookup.go

Lines changed: 28 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte {
3838
return key
3939
}
4040

41+
// trienodeKey returns a key for uniquely identifying the trie node.
42+
func trienodeKey(accountHash common.Hash, path string) string {
43+
return accountHash.Hex() + path
44+
}
45+
4146
// lookup is an internal structure used to efficiently determine the layer in
4247
// which a state entry resides.
4348
type lookup struct {
@@ -210,10 +215,8 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha
210215
if accountHash == (common.Hash{}) {
211216
list = l.accountNodes[path]
212217
} else {
213-
// Construct the combined key but use only path for shard calculation
214-
key := accountHash.Hex() + path
215218
shardIndex := getStorageShardIndex(path) // Use only path for sharding
216-
list = l.storageNodes[shardIndex][key]
219+
list = l.storageNodes[shardIndex][trienodeKey(accountHash, path)]
217220
}
218221
for i := len(list) - 1; i >= 0; i-- {
219222
// If the current state matches the stateID, or the requested state is a
@@ -302,45 +305,25 @@ func (l *lookup) addLayer(diff *diffLayer) {
302305
}
303306

304307
func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) {
305-
count := 0
306-
for _, slots := range nodes {
307-
count += len(slots)
308-
}
309-
310-
// If the number of storage nodes is small, use a single-threaded approach
311-
if count <= 1000 {
312-
for accountHash, slots := range nodes {
313-
accountHex := accountHash.Hex()
314-
for path := range slots {
315-
key := accountHex + path
316-
shardIndex := getStorageShardIndex(path)
317-
list, exists := l.storageNodes[shardIndex][key]
318-
if !exists {
319-
list = make([]common.Hash, 0, 16)
320-
}
321-
list = append(list, state)
322-
l.storageNodes[shardIndex][key] = list
323-
}
324-
}
325-
return
326-
}
308+
defer func(start time.Time) {
309+
lookupAddTrienodeLayerTimer.UpdateSince(start)
310+
}(time.Now())
327311

328-
// Use concurrent workers for storage nodes updates, one per shard
329-
var wg sync.WaitGroup
312+
var (
313+
wg sync.WaitGroup
314+
tasks = make([]chan string, storageNodesShardCount)
315+
)
330316
wg.Add(storageNodesShardCount)
331-
332-
workChannels := make([]chan string, storageNodesShardCount)
333317
for i := 0; i < storageNodesShardCount; i++ {
334-
workChannels[i] = make(chan string, 10) // Buffer to avoid blocking
318+
tasks[i] = make(chan string, 10) // Buffer to avoid blocking
335319
}
336-
337320
// Start all workers, each handling its own shard
338321
for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ {
339322
go func(shardIdx int) {
340323
defer wg.Done()
341324

342325
shard := l.storageNodes[shardIdx]
343-
for key := range workChannels[shardIdx] {
326+
for key := range tasks[shardIdx] {
344327
list, exists := shard[key]
345328
if !exists {
346329
list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool
@@ -352,18 +335,15 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st
352335
}
353336

354337
for accountHash, slots := range nodes {
355-
accountHex := accountHash.Hex()
356338
for path := range slots {
357339
shardIndex := getStorageShardIndex(path)
358-
workChannels[shardIndex] <- accountHex + path
340+
tasks[shardIndex] <- trienodeKey(accountHash, path)
359341
}
360342
}
361-
362343
// Close all channels to signal workers to finish
363344
for i := 0; i < storageNodesShardCount; i++ {
364-
close(workChannels[i])
345+
close(tasks[i])
365346
}
366-
367347
wg.Wait()
368348
}
369349

@@ -458,47 +438,23 @@ func (l *lookup) removeLayer(diff *diffLayer) error {
458438
}
459439

460440
func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) error {
461-
count := 0
462-
for _, slots := range nodes {
463-
count += len(slots)
464-
}
465-
466-
if count <= 1000 {
467-
for accountHash, slots := range nodes {
468-
accountHex := accountHash.Hex()
469-
for path := range slots {
470-
shard := l.storageNodes[getStorageShardIndex(path)]
471-
key := accountHex + path
472-
found, list := removeFromList(shard[key], state)
473-
if !found {
474-
return fmt.Errorf("storage lookup is not found, %x %x, state: %x", accountHash, path, state)
475-
}
476-
if len(list) != 0 {
477-
shard[key] = list
478-
} else {
479-
delete(shard, key)
480-
}
481-
}
482-
}
483-
return nil
484-
}
485-
486-
// Use concurrent workers for storage nodes removal, one per shard
487-
var eg errgroup.Group
488-
489-
// Create work channels for each shard
490-
workChannels := make([]chan string, storageNodesShardCount)
441+
defer func(start time.Time) {
442+
lookupRemoveTrienodeLayerTimer.UpdateSince(start)
443+
}(time.Now())
491444

445+
var (
446+
eg errgroup.Group
447+
tasks = make([]chan string, storageNodesShardCount)
448+
)
492449
for i := 0; i < storageNodesShardCount; i++ {
493-
workChannels[i] = make(chan string, 10) // Buffer to avoid blocking
450+
tasks[i] = make(chan string, 10) // Buffer to avoid blocking
494451
}
495-
496452
// Start all workers, each handling its own shard
497453
for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ {
498454
shardIdx := shardIndex // Capture the variable
499455
eg.Go(func() error {
500456
shard := l.storageNodes[shardIdx]
501-
for key := range workChannels[shardIdx] {
457+
for key := range tasks[shardIdx] {
502458
found, list := removeFromList(shard[key], state)
503459
if !found {
504460
return fmt.Errorf("storage lookup is not found, key: %s, state: %x", key, state)
@@ -514,17 +470,13 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map
514470
}
515471

516472
for accountHash, slots := range nodes {
517-
accountHex := accountHash.Hex()
518473
for path := range slots {
519-
key := accountHex + path
520474
shardIndex := getStorageShardIndex(path)
521-
workChannels[shardIndex] <- key
475+
tasks[shardIndex] <- trienodeKey(accountHash, path)
522476
}
523477
}
524-
525478
for i := 0; i < storageNodesShardCount; i++ {
526-
close(workChannels[i])
479+
close(tasks[i])
527480
}
528-
529481
return eg.Wait()
530482
}

triedb/pathdb/metrics.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ var (
7676
indexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/index/time", nil)
7777
unindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/unindex/time", nil)
7878

79-
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
80-
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)
79+
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
80+
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)
81+
lookupAddTrienodeLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/trienode/add/time", nil)
82+
lookupRemoveTrienodeLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/trienode/remove/time", nil)
8183

8284
historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil)
8385
historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil)

0 commit comments

Comments
 (0)