From c5e07af8bf20dfc2f63df22f55e9d1e47a132ffd Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 03:01:04 +0000 Subject: [PATCH 01/13] pathdb: lookup account and storage nodes Signed-off-by: jsvisa --- triedb/pathdb/layertree.go | 16 +++++ triedb/pathdb/lookup.go | 112 ++++++++++++++++++++++++++++++- triedb/pathdb/lookup_test.go | 123 +++++++++++++++++++++++++++++++++++ triedb/pathdb/reader.go | 9 ++- 4 files changed, 256 insertions(+), 4 deletions(-) create mode 100644 triedb/pathdb/lookup_test.go diff --git a/triedb/pathdb/layertree.go b/triedb/pathdb/layertree.go index ec45257db56c..df0f84b3abab 100644 --- a/triedb/pathdb/layertree.go +++ b/triedb/pathdb/layertree.go @@ -338,3 +338,19 @@ func (tree *layerTree) lookupStorage(accountHash common.Hash, slotHash common.Ha } return l, nil } + +func (tree *layerTree) lookupNode(accountHash common.Hash, path string, state common.Hash) (layer, error) { + // Hold the read lock to prevent the unexpected layer changes + tree.lock.RLock() + defer tree.lock.RUnlock() + + tip := tree.lookup.nodeTip(accountHash, path, state, tree.base.root) + if tip == (common.Hash{}) { + return nil, fmt.Errorf("[%#x] %w", state, errSnapshotStale) + } + l := tree.layers[tip] + if l == nil { + return nil, fmt.Errorf("triedb layer [%#x] missing", tip) + } + return l, nil +} diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 8b092730f8fc..73ebfdc060ae 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "golang.org/x/sync/errgroup" ) @@ -48,6 +49,9 @@ type lookup struct { // where the slot was modified, with the order from oldest to newest. storages map[[64]byte][]common.Hash + accountNodes map[string][]common.Hash + storageNodes map[string][]common.Hash + // descendant is the callback indicating whether the layer with // given root is a descendant of the one specified by `ancestor`. descendant func(state common.Hash, ancestor common.Hash) bool @@ -64,9 +68,11 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha current = current.parentLayer() } l := &lookup{ - accounts: make(map[common.Hash][]common.Hash), - storages: make(map[[64]byte][]common.Hash), - descendant: descendant, + accounts: make(map[common.Hash][]common.Hash), + storages: make(map[[64]byte][]common.Hash), + accountNodes: make(map[string][]common.Hash), + storageNodes: make(map[string][]common.Hash), + descendant: descendant, } // Apply the diff layers from bottom to top for i := len(layers) - 1; i >= 0; i-- { @@ -161,6 +167,32 @@ func (l *lookup) storageTip(accountHash common.Hash, slotHash common.Hash, state return common.Hash{} } +func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Hash, base common.Hash) common.Hash { + var list []common.Hash + if accountHash == (common.Hash{}) { + list = l.accountNodes[path] + } else { + list = l.storageNodes[accountHash.Hex()+path] + } + for i := len(list) - 1; i >= 0; i-- { + // If the current state matches the stateID, or the requested state is a + // descendant of it, return the current state as the most recent one + // containing the modified data. Otherwise, the current state may be ahead + // of the requested one or belong to a different branch. + if list[i] == stateID || l.descendant(stateID, list[i]) { + return list[i] + } + } + // No layer matching the stateID or its descendants was found. Use the + // current disk layer as a fallback. + if base == stateID || l.descendant(stateID, base) { + return base + } + // The layer associated with 'stateID' is not the descendant of the current + // disk layer, it's already stale, return nothing. + return common.Hash{} +} + // addLayer traverses the state data retained in the specified diff layer and // integrates it into the lookup set. // @@ -176,9 +208,12 @@ func (l *lookup) addLayer(diff *diffLayer) { wg sync.WaitGroup state = diff.rootHash() ) + st00 := time.Now() + var st0, st1, st2, st3 time.Duration wg.Add(1) go func() { defer wg.Done() + st := time.Now() for accountHash := range diff.states.accountData { list, exists := l.accounts[accountHash] if !exists { @@ -187,11 +222,13 @@ func (l *lookup) addLayer(diff *diffLayer) { list = append(list, state) l.accounts[accountHash] = list } + st0 = time.Since(st) }() wg.Add(1) go func() { defer wg.Done() + st := time.Now() for accountHash, slots := range diff.states.storageData { for slotHash := range slots { key := storageKey(accountHash, slotHash) @@ -203,8 +240,44 @@ func (l *lookup) addLayer(diff *diffLayer) { l.storages[key] = list } } + st1 = time.Since(st) + }() + + wg.Add(1) + go func() { + defer wg.Done() + st := time.Now() + for path := range diff.nodes.accountNodes { + list, exists := l.accountNodes[path] + if !exists { + list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool + } + list = append(list, state) + l.accountNodes[path] = list + } + st2 = time.Since(st) + }() + + wg.Add(1) + go func() { + defer wg.Done() + st := time.Now() + for accountHash, slots := range diff.nodes.storageNodes { + for path := range slots { + key := accountHash.Hex() + path + list, exists := l.storageNodes[key] + if !exists { + list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool + } + list = append(list, state) + l.storageNodes[key] = list + } + } + st3 = time.Since(st) }() + wg.Wait() + log.Info("PathDB lookup", "id", diff.id, "block", diff.block, "st0", st0, "st1", st1, "st2", st2, "st3", st3, "elapsed", time.Since(st00)) } // removeFromList removes the specified element from the provided list. @@ -274,5 +347,38 @@ func (l *lookup) removeLayer(diff *diffLayer) error { } return nil }) + + eg.Go(func() error { + for path := range diff.nodes.accountNodes { + found, list := removeFromList(l.accountNodes[path], state) + if !found { + return fmt.Errorf("account lookup is not found, %x, state: %x", path, state) + } + if len(list) != 0 { + l.accountNodes[path] = list + } else { + delete(l.accountNodes, path) + } + } + return nil + }) + + eg.Go(func() error { + for accountHash, slots := range diff.nodes.storageNodes { + for path := range slots { + key := accountHash.Hex() + path + found, list := removeFromList(l.storageNodes[key], state) + if !found { + return fmt.Errorf("storage lookup is not found, %x %x, state: %x", accountHash, path, state) + } + if len(list) != 0 { + l.storageNodes[key] = list + } else { + delete(l.storageNodes, key) + } + } + } + return nil + }) return eg.Wait() } diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go new file mode 100644 index 000000000000..25a658dad986 --- /dev/null +++ b/triedb/pathdb/lookup_test.go @@ -0,0 +1,123 @@ +package pathdb + +import ( + "crypto/rand" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/trie/trienode" +) + +// generateRandomAccountNodes creates a map of random trie nodes +func generateRandomAccountNodes(count int) map[string]*trienode.Node { + nodes := make(map[string]*trienode.Node, count) + + for i := 0; i < count; i++ { + path := make([]byte, 32) + rand.Read(path) + + blob := make([]byte, 64) + rand.Read(blob) + + var hash common.Hash + rand.Read(hash[:]) + + nodes[common.Bytes2Hex(path)] = &trienode.Node{Hash: hash, Blob: blob} + } + + return nodes +} + +// generateRandomStorageNodes creates a map of storage nodes organized by account +func generateRandomStorageNodes(accountCount, nodesPerAccount int) map[common.Hash]map[string]*trienode.Node { + storageNodes := make(map[common.Hash]map[string]*trienode.Node, accountCount) + + for i := 0; i < accountCount; i++ { + var hash common.Hash + rand.Read(hash[:]) + + storageNodes[hash] = generateRandomAccountNodes(nodesPerAccount) + } + + return storageNodes +} + +// addNodes is a helper method for testing that adds nodes to the lookup structure +func (l *lookup) addNodes(stateHash common.Hash, accountNodes map[string]*trienode.Node, storageNodes map[common.Hash]map[string]*trienode.Node) { + // Add account nodes + for path := range accountNodes { + list, exists := l.accountNodes[path] + if !exists { + list = make([]common.Hash, 0, 16) + } + list = append(list, stateHash) + l.accountNodes[path] = list + } + + // Add storage nodes + for accountHash, slots := range storageNodes { + for path := range slots { + key := accountHash.Hex() + path + list, exists := l.storageNodes[key] + if !exists { + list = make([]common.Hash, 0, 16) + } + list = append(list, stateHash) + l.storageNodes[key] = list + } + } +} + +func BenchmarkAddNodes(b *testing.B) { + tests := []struct { + name string + accountNodeCount int + storageAccountCount int + nodesPerAccount int + }{ + { + name: "Small-100-accounts-10-nodes", + accountNodeCount: 100, + storageAccountCount: 100, + nodesPerAccount: 10, + }, + { + name: "Medium-500-accounts-20-nodes", + accountNodeCount: 500, + storageAccountCount: 500, + nodesPerAccount: 20, + }, + { + name: "Large-2000-accounts-40-nodes", + accountNodeCount: 2000, + storageAccountCount: 2000, + nodesPerAccount: 40, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + accountNodes := generateRandomAccountNodes(tc.accountNodeCount) + storageNodes := generateRandomStorageNodes(tc.storageAccountCount, tc.nodesPerAccount) + + lookup := &lookup{ + accountNodes: make(map[string][]common.Hash), + storageNodes: make(map[string][]common.Hash), + } + + var stateHash common.Hash + rand.Read(stateHash[:]) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Clear the nodes maps for each iteration + lookup.accountNodes = make(map[string][]common.Hash) + lookup.storageNodes = make(map[string][]common.Hash) + + lookup.addNodes(stateHash, accountNodes, storageNodes) + } + }) + } +} diff --git a/triedb/pathdb/reader.go b/triedb/pathdb/reader.go index 842ac0972e38..db52844ee674 100644 --- a/triedb/pathdb/reader.go +++ b/triedb/pathdb/reader.go @@ -64,7 +64,14 @@ type reader struct { // node info. Don't modify the returned byte slice since it's not deep-copied // and still be referenced by database. func (r *reader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) { - blob, got, loc, err := r.layer.node(owner, path, 0) + l, err := r.db.tree.lookupNode(owner, string(path), r.state) + if err != nil { + return nil, err + } + blob, got, loc, err := l.node(owner, path, 0) + if errors.Is(err, errSnapshotStale) { + blob, got, loc, err = r.layer.node(owner, path, 0) + } if err != nil { return nil, err } From b3af8b54d195cee710ac01ce1efeea0ca3d2c3fa Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 04:08:31 +0000 Subject: [PATCH 02/13] pathdb: use shared map Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 91 +++++++++++++++++++++++++++++++----- triedb/pathdb/lookup_test.go | 23 ++++++--- 2 files changed, 97 insertions(+), 17 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 73ebfdc060ae..098b10a2b54d 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -50,13 +50,36 @@ type lookup struct { storages map[[64]byte][]common.Hash accountNodes map[string][]common.Hash - storageNodes map[string][]common.Hash + + // Optimized: Use sharded storage with fixed array to reduce map size + // Each account uses 16 smaller maps to distribute storage keys + // This reduces map lookup time from O(log n) with large n to O(log n) with small n + storageNodes map[common.Hash][16]map[string][]common.Hash // descendant is the callback indicating whether the layer with // given root is a descendant of the one specified by `ancestor`. descendant func(state common.Hash, ancestor common.Hash) bool } +// getStorageShardIndex returns the shard index for a given path +func getStorageShardIndex(path string) int { + if len(path) == 0 { + return 0 + } + // Use simple hash of the first few characters to distribute evenly + hash := 0 + for i, r := range path { + hash = hash*31 + int(r) + if i >= 4 { // Only use first 4 characters to avoid expensive computation + break + } + } + if hash < 0 { + hash = -hash + } + return hash % 16 +} + // newLookup initializes the lookup structure. func newLookup(head layer, descendant func(state common.Hash, ancestor common.Hash) bool) *lookup { var ( @@ -71,7 +94,7 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha accounts: make(map[common.Hash][]common.Hash), storages: make(map[[64]byte][]common.Hash), accountNodes: make(map[string][]common.Hash), - storageNodes: make(map[string][]common.Hash), + storageNodes: make(map[common.Hash][16]map[string][]common.Hash), descendant: descendant, } // Apply the diff layers from bottom to top @@ -172,7 +195,10 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha if accountHash == (common.Hash{}) { list = l.accountNodes[path] } else { - list = l.storageNodes[accountHash.Hex()+path] + if accountShards, exists := l.storageNodes[accountHash]; exists { + shardIndex := getStorageShardIndex(path) + list = accountShards[shardIndex][path] + } } for i := len(list) - 1; i >= 0; i-- { // If the current state matches the stateID, or the requested state is a @@ -262,22 +288,64 @@ func (l *lookup) addLayer(diff *diffLayer) { go func() { defer wg.Done() st := time.Now() + + count := 0 + var stHash, stKey, stMapGet, stMapNew, stMapApp, stMapSet time.Duration + for accountHash, slots := range diff.nodes.storageNodes { + st00 := time.Now() + st01 := time.Now() + stHash += st01.Sub(st00) + + // Get or create the account-level shard array + accountShards, exists := l.storageNodes[accountHash] + if !exists { + // Initialize all 16 shards for this account + for i := 0; i < 16; i++ { + accountShards[i] = make(map[string][]common.Hash) + } + l.storageNodes[accountHash] = accountShards + } + for path := range slots { - key := accountHash.Hex() + path - list, exists := l.storageNodes[key] + count += 1 + + st00 := time.Now() + // Calculate shard index + shardIndex := getStorageShardIndex(path) + st01 := time.Now() + + // Access the specific shard map + shardMap := accountShards[shardIndex] + list, exists := shardMap[path] + st02 := time.Now() if !exists { list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool } + st03 := time.Now() list = append(list, state) - l.storageNodes[key] = list + st04 := time.Now() + shardMap[path] = list + st05 := time.Now() + + stKey += st01.Sub(st00) + stMapGet += st02.Sub(st01) + stMapNew += st03.Sub(st02) + stMapApp += st04.Sub(st03) + stMapSet += st05.Sub(st04) } } + st3 = time.Since(st) + if st3 > time.Millisecond { + log.Info("PathDB lookup add storage nodes", "count", count, "accounts", len(diff.nodes.storageNodes), "stHash", stHash, "stKey", stKey, "stMapGet", stMapGet, "stMapNew", stMapNew, "stMapApp", stMapApp, "stMapSet", stMapSet) + } }() wg.Wait() - log.Info("PathDB lookup", "id", diff.id, "block", diff.block, "st0", st0, "st1", st1, "st2", st2, "st3", st3, "elapsed", time.Since(st00)) + if elapsed := time.Since(st00); elapsed > time.Millisecond { + log.Info("PathDB lookup", "id", diff.id, "block", diff.block, "st0", st0, "st1", st1, "st2", st2, "st3", st3, "elapsed", elapsed) + } } // removeFromList removes the specified element from the provided list. @@ -365,16 +433,17 @@ func (l *lookup) removeLayer(diff *diffLayer) error { eg.Go(func() error { for accountHash, slots := range diff.nodes.storageNodes { + accountShards := l.storageNodes[accountHash] for path := range slots { - key := accountHash.Hex() + path - found, list := removeFromList(l.storageNodes[key], state) + shardIndex := getStorageShardIndex(path) + found, list := removeFromList(accountShards[shardIndex][path], state) if !found { return fmt.Errorf("storage lookup is not found, %x %x, state: %x", accountHash, path, state) } if len(list) != 0 { - l.storageNodes[key] = list + accountShards[shardIndex][path] = list } else { - delete(l.storageNodes, key) + delete(accountShards[shardIndex], path) } } } diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index 25a658dad986..4a6a74dce4dd 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -54,16 +54,27 @@ func (l *lookup) addNodes(stateHash common.Hash, accountNodes map[string]*trieno l.accountNodes[path] = list } - // Add storage nodes + // Add storage nodes using sharded structure for accountHash, slots := range storageNodes { + // Get or create the account-level shard array + accountShards, exists := l.storageNodes[accountHash] + if !exists { + // Initialize all 16 shards for this account + for i := 0; i < 16; i++ { + accountShards[i] = make(map[string][]common.Hash) + } + l.storageNodes[accountHash] = accountShards + } + for path := range slots { - key := accountHash.Hex() + path - list, exists := l.storageNodes[key] + shardIndex := getStorageShardIndex(path) + shardMap := accountShards[shardIndex] + list, exists := shardMap[path] if !exists { list = make([]common.Hash, 0, 16) } list = append(list, stateHash) - l.storageNodes[key] = list + shardMap[path] = list } } } @@ -102,7 +113,7 @@ func BenchmarkAddNodes(b *testing.B) { lookup := &lookup{ accountNodes: make(map[string][]common.Hash), - storageNodes: make(map[string][]common.Hash), + storageNodes: make(map[common.Hash][16]map[string][]common.Hash), } var stateHash common.Hash @@ -114,7 +125,7 @@ func BenchmarkAddNodes(b *testing.B) { for i := 0; i < b.N; i++ { // Clear the nodes maps for each iteration lookup.accountNodes = make(map[string][]common.Hash) - lookup.storageNodes = make(map[string][]common.Hash) + lookup.storageNodes = make(map[common.Hash][16]map[string][]common.Hash) lookup.addNodes(stateHash, accountNodes, storageNodes) } From 782af1340734ace45b4bcffbbeed529c555a4b80 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 04:09:32 +0000 Subject: [PATCH 03/13] use path to calculate index Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 60 +++++++++++++++++++----------------- triedb/pathdb/lookup_test.go | 33 ++++++++++---------- 2 files changed, 48 insertions(+), 45 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 098b10a2b54d..6ed9634e3f33 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -51,10 +51,10 @@ type lookup struct { accountNodes map[string][]common.Hash - // Optimized: Use sharded storage with fixed array to reduce map size - // Each account uses 16 smaller maps to distribute storage keys - // This reduces map lookup time from O(log n) with large n to O(log n) with small n - storageNodes map[common.Hash][16]map[string][]common.Hash + // Optimized: Use sharded storage with single-level map + // Key is accountHash.Hex() + path, distributed across 16 shards + // This eliminates the need for two-level map lookups + storageNodes [16]map[string][]common.Hash // descendant is the callback indicating whether the layer with // given root is a descendant of the one specified by `ancestor`. @@ -62,15 +62,19 @@ type lookup struct { } // getStorageShardIndex returns the shard index for a given path +// Uses only the path (not the full key) for sharding to achieve better locality: +// - Same paths across different accounts will be in the same shard +// - Avoids expensive string concatenation for shard calculation +// - Provides better cache locality for similar storage operations func getStorageShardIndex(path string) int { if len(path) == 0 { return 0 } - // Use simple hash of the first few characters to distribute evenly + // Use simple hash of the path to distribute evenly hash := 0 for i, r := range path { hash = hash*31 + int(r) - if i >= 4 { // Only use first 4 characters to avoid expensive computation + if i >= 4 { // Use first 4 characters for good distribution break } } @@ -94,9 +98,13 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha accounts: make(map[common.Hash][]common.Hash), storages: make(map[[64]byte][]common.Hash), accountNodes: make(map[string][]common.Hash), - storageNodes: make(map[common.Hash][16]map[string][]common.Hash), descendant: descendant, } + // Initialize all 16 storage node shards + for i := 0; i < 16; i++ { + l.storageNodes[i] = make(map[string][]common.Hash) + } + // Apply the diff layers from bottom to top for i := len(layers) - 1; i >= 0; i-- { switch diff := layers[i].(type) { @@ -195,10 +203,10 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha if accountHash == (common.Hash{}) { list = l.accountNodes[path] } else { - if accountShards, exists := l.storageNodes[accountHash]; exists { - shardIndex := getStorageShardIndex(path) - list = accountShards[shardIndex][path] - } + // Construct the combined key but use only path for shard calculation + key := accountHash.Hex() + path + shardIndex := getStorageShardIndex(path) // Use only path for sharding + list = l.storageNodes[shardIndex][key] } for i := len(list) - 1; i >= 0; i-- { // If the current state matches the stateID, or the requested state is a @@ -294,30 +302,22 @@ func (l *lookup) addLayer(diff *diffLayer) { for accountHash, slots := range diff.nodes.storageNodes { st00 := time.Now() + accountHex := accountHash.Hex() st01 := time.Now() stHash += st01.Sub(st00) - // Get or create the account-level shard array - accountShards, exists := l.storageNodes[accountHash] - if !exists { - // Initialize all 16 shards for this account - for i := 0; i < 16; i++ { - accountShards[i] = make(map[string][]common.Hash) - } - l.storageNodes[accountHash] = accountShards - } - for path := range slots { count += 1 st00 := time.Now() - // Calculate shard index + // Construct the combined key and find the correct shard + key := accountHex + path shardIndex := getStorageShardIndex(path) st01 := time.Now() // Access the specific shard map - shardMap := accountShards[shardIndex] - list, exists := shardMap[path] + shardMap := l.storageNodes[shardIndex] + list, exists := shardMap[key] st02 := time.Now() if !exists { list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool @@ -325,7 +325,7 @@ func (l *lookup) addLayer(diff *diffLayer) { st03 := time.Now() list = append(list, state) st04 := time.Now() - shardMap[path] = list + shardMap[key] = list st05 := time.Now() stKey += st01.Sub(st00) @@ -433,17 +433,19 @@ func (l *lookup) removeLayer(diff *diffLayer) error { eg.Go(func() error { for accountHash, slots := range diff.nodes.storageNodes { - accountShards := l.storageNodes[accountHash] + accountHex := accountHash.Hex() for path := range slots { + // Construct the combined key and find the correct shard + key := accountHex + path shardIndex := getStorageShardIndex(path) - found, list := removeFromList(accountShards[shardIndex][path], state) + found, list := removeFromList(l.storageNodes[shardIndex][key], state) if !found { return fmt.Errorf("storage lookup is not found, %x %x, state: %x", accountHash, path, state) } if len(list) != 0 { - accountShards[shardIndex][path] = list + l.storageNodes[shardIndex][key] = list } else { - delete(accountShards[shardIndex], path) + delete(l.storageNodes[shardIndex], key) } } } diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index 4a6a74dce4dd..3d9a2be3a768 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -54,27 +54,22 @@ func (l *lookup) addNodes(stateHash common.Hash, accountNodes map[string]*trieno l.accountNodes[path] = list } - // Add storage nodes using sharded structure + // Add storage nodes using single-level sharded structure for accountHash, slots := range storageNodes { - // Get or create the account-level shard array - accountShards, exists := l.storageNodes[accountHash] - if !exists { - // Initialize all 16 shards for this account - for i := 0; i < 16; i++ { - accountShards[i] = make(map[string][]common.Hash) - } - l.storageNodes[accountHash] = accountShards - } + accountHex := accountHash.Hex() for path := range slots { - shardIndex := getStorageShardIndex(path) - shardMap := accountShards[shardIndex] - list, exists := shardMap[path] + // Construct the combined key but use only path for shard calculation + key := accountHex + path + shardIndex := getStorageShardIndex(path) // Use only path for sharding + shardMap := l.storageNodes[shardIndex] + + list, exists := shardMap[key] if !exists { list = make([]common.Hash, 0, 16) } list = append(list, stateHash) - shardMap[path] = list + shardMap[key] = list } } } @@ -113,7 +108,10 @@ func BenchmarkAddNodes(b *testing.B) { lookup := &lookup{ accountNodes: make(map[string][]common.Hash), - storageNodes: make(map[common.Hash][16]map[string][]common.Hash), + } + // Initialize all 16 storage node shards + for i := 0; i < 16; i++ { + lookup.storageNodes[i] = make(map[string][]common.Hash) } var stateHash common.Hash @@ -125,7 +123,10 @@ func BenchmarkAddNodes(b *testing.B) { for i := 0; i < b.N; i++ { // Clear the nodes maps for each iteration lookup.accountNodes = make(map[string][]common.Hash) - lookup.storageNodes = make(map[common.Hash][16]map[string][]common.Hash) + // Reinitialize all 16 storage node shards + for j := 0; j < 16; j++ { + lookup.storageNodes[j] = make(map[string][]common.Hash) + } lookup.addNodes(stateHash, accountNodes, storageNodes) } From f269e94b6ffc4e6e073a04d1beef36ce4a08155b Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 05:14:46 +0000 Subject: [PATCH 04/13] use 16 concurrent workers Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 94 +++++++++++++++++---------- triedb/pathdb/lookup_test.go | 121 +++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 35 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 6ed9634e3f33..50fce47a16c1 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -74,7 +74,7 @@ func getStorageShardIndex(path string) int { hash := 0 for i, r := range path { hash = hash*31 + int(r) - if i >= 4 { // Use first 4 characters for good distribution + if i >= 8 { // Use first 8 characters for better distribution break } } @@ -292,55 +292,79 @@ func (l *lookup) addLayer(diff *diffLayer) { st2 = time.Since(st) }() - wg.Add(1) - go func() { - defer wg.Done() - st := time.Now() + // Use concurrent workers for storage nodes updates, one per shard + var storageWg sync.WaitGroup + storageWg.Add(16) - count := 0 - var stHash, stKey, stMapGet, stMapNew, stMapApp, stMapSet time.Duration + // Create channels to distribute work to workers + type storageWork struct { + accountHash common.Hash + accountHex string + path string + } - for accountHash, slots := range diff.nodes.storageNodes { - st00 := time.Now() - accountHex := accountHash.Hex() - st01 := time.Now() - stHash += st01.Sub(st00) + workChannels := make([]chan storageWork, 16) + for i := 0; i < 16; i++ { + workChannels[i] = make(chan storageWork, 1000) // Buffer to avoid blocking + } - for path := range slots { - count += 1 + // Start 16 workers, each handling its own shard + for shardIndex := 0; shardIndex < 16; shardIndex++ { + go func(shardIdx int) { + defer storageWg.Done() + st := time.Now() + count := 0 - st00 := time.Now() - // Construct the combined key and find the correct shard - key := accountHex + path - shardIndex := getStorageShardIndex(path) - st01 := time.Now() + for work := range workChannels[shardIdx] { + // Construct the combined key + key := work.accountHex + work.path - // Access the specific shard map - shardMap := l.storageNodes[shardIndex] + // Access the specific shard map (no lock needed as each worker owns its shard) + shardMap := l.storageNodes[shardIdx] list, exists := shardMap[key] - st02 := time.Now() if !exists { list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool } - st03 := time.Now() list = append(list, state) - st04 := time.Now() shardMap[key] = list - st05 := time.Now() + count++ + } - stKey += st01.Sub(st00) - stMapGet += st02.Sub(st01) - stMapNew += st03.Sub(st02) - stMapApp += st04.Sub(st03) - stMapSet += st05.Sub(st04) + st3 := time.Since(st) + if st3 > time.Millisecond { + log.Info("PathDB lookup add storage nodes worker", "shard", shardIdx, "count", count, "elapsed", st3) } - } + }(shardIndex) + } - st3 = time.Since(st) - if st3 > time.Millisecond { - log.Info("PathDB lookup add storage nodes", "count", count, "accounts", len(diff.nodes.storageNodes), "stHash", stHash, "stKey", stKey, "stMapGet", stMapGet, "stMapNew", stMapNew, "stMapApp", stMapApp, "stMapSet", stMapSet) + // Distribute work to workers based on shard index + distributeStart := time.Now() + totalCount := 0 + for accountHash, slots := range diff.nodes.storageNodes { + accountHex := accountHash.Hex() + for path := range slots { + shardIndex := getStorageShardIndex(path) + workChannels[shardIndex] <- storageWork{ + accountHash: accountHash, + accountHex: accountHex, + path: path, + } + totalCount++ } - }() + } + + // Close all channels to signal workers to finish + for i := 0; i < 16; i++ { + close(workChannels[i]) + } + + // Wait for all storage workers to complete + storageWg.Wait() + st3 = time.Since(distributeStart) + + if st3 > time.Millisecond { + log.Info("PathDB lookup add storage nodes", "total_count", totalCount, "accounts", len(diff.nodes.storageNodes), "elapsed", st3) + } wg.Wait() if elapsed := time.Since(st00); elapsed > time.Millisecond { diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index 3d9a2be3a768..827f479763a6 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -2,6 +2,7 @@ package pathdb import ( "crypto/rand" + "fmt" "testing" "github.com/ethereum/go-ethereum/common" @@ -133,3 +134,123 @@ func BenchmarkAddNodes(b *testing.B) { }) } } + +func TestConcurrentStorageNodesUpdate(b *testing.T) { + // Create a lookup instance + lookup := &lookup{ + accountNodes: make(map[string][]common.Hash), + } + // Initialize all 16 storage node shards + for i := 0; i < 16; i++ { + lookup.storageNodes[i] = make(map[string][]common.Hash) + } + + // Create test data with known shard distribution + testData := map[common.Hash]map[string]*trienode.Node{} + + // Create accounts that will distribute across different shards + for i := 0; i < 100; i++ { + var accountHash common.Hash + accountHash[0] = byte(i) + + testData[accountHash] = make(map[string]*trienode.Node) + + // Create paths that will hash to different shards + for j := 0; j < 10; j++ { + path := fmt.Sprintf("path_%d_%d", i, j) + var nodeHash common.Hash + nodeHash[0] = byte(j) + + testData[accountHash][path] = &trienode.Node{Hash: nodeHash} + } + } + + // Add nodes using the concurrent method + var stateHash common.Hash + stateHash[0] = 0x42 + lookup.addNodes(stateHash, nil, testData) + + // Verify that all nodes were added correctly + totalNodes := 0 + for accountHash, slots := range testData { + accountHex := accountHash.Hex() + for path := range slots { + key := accountHex + path + shardIndex := getStorageShardIndex(path) + + list, exists := lookup.storageNodes[shardIndex][key] + if !exists { + b.Errorf("Node not found: account=%x, path=%s, shard=%d", accountHash, path, shardIndex) + continue + } + + if len(list) != 1 { + b.Errorf("Expected 1 state hash, got %d: account=%x, path=%s", len(list), accountHash, path) + continue + } + + if list[0] != stateHash { + b.Errorf("Expected state hash %x, got %x: account=%x, path=%s", stateHash, list[0], accountHash, path) + continue + } + + totalNodes++ + } + } + + expectedTotal := 100 * 10 // 100 accounts * 10 nodes each + if totalNodes != expectedTotal { + b.Errorf("Expected %d total nodes, got %d", expectedTotal, totalNodes) + } + + // Verify shard distribution + for i := 0; i < 16; i++ { + shardSize := len(lookup.storageNodes[i]) + if shardSize == 0 { + b.Logf("Shard %d is empty", i) + } else { + b.Logf("Shard %d has %d entries", i, shardSize) + } + } +} + +func TestShardDistribution(b *testing.T) { + // Test shard distribution with different path patterns + paths := []string{ + "path_0_0", "path_0_1", "path_0_2", "path_0_3", + "path_1_0", "path_1_1", "path_1_2", "path_1_3", + "path_2_0", "path_2_1", "path_2_2", "path_2_3", + "path_3_0", "path_3_1", "path_3_2", "path_3_3", + "path_4_0", "path_4_1", "path_4_2", "path_4_3", + "path_5_0", "path_5_1", "path_5_2", "path_5_3", + "path_6_0", "path_6_1", "path_6_2", "path_6_3", + "path_7_0", "path_7_1", "path_7_2", "path_7_3", + "path_8_0", "path_8_1", "path_8_2", "path_8_3", + "path_9_0", "path_9_1", "path_9_2", "path_9_3", + } + + shardCounts := make(map[int]int) + for _, path := range paths { + shardIndex := getStorageShardIndex(path) + shardCounts[shardIndex]++ + b.Logf("Path: %s -> Shard: %d", path, shardIndex) + } + + b.Logf("Shard distribution:") + for i := 0; i < 16; i++ { + count := shardCounts[i] + b.Logf(" Shard %d: %d paths", i, count) + } + + // Check if we have a reasonable distribution + usedShards := 0 + for _, count := range shardCounts { + if count > 0 { + usedShards++ + } + } + + if usedShards < 4 { + b.Logf("Warning: Only %d shards are being used out of 16", usedShards) + } +} From 7b86aa8775659fb658379b3a14f745bb528ddb72 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 07:03:53 +0000 Subject: [PATCH 05/13] use path[0] as shardid Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 140 ++++++++++++++--------------------- triedb/pathdb/lookup_test.go | 47 +----------- 2 files changed, 60 insertions(+), 127 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 50fce47a16c1..6e26e1f9aea3 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -26,6 +26,8 @@ import ( "golang.org/x/sync/errgroup" ) +const storageNodesShardCount = 16 + // storageKey returns a key for uniquely identifying the storage slot. func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte { var key [64]byte @@ -51,10 +53,9 @@ type lookup struct { accountNodes map[string][]common.Hash - // Optimized: Use sharded storage with single-level map // Key is accountHash.Hex() + path, distributed across 16 shards // This eliminates the need for two-level map lookups - storageNodes [16]map[string][]common.Hash + storageNodes [storageNodesShardCount]map[string][]common.Hash // descendant is the callback indicating whether the layer with // given root is a descendant of the one specified by `ancestor`. @@ -62,26 +63,12 @@ type lookup struct { } // getStorageShardIndex returns the shard index for a given path -// Uses only the path (not the full key) for sharding to achieve better locality: -// - Same paths across different accounts will be in the same shard -// - Avoids expensive string concatenation for shard calculation -// - Provides better cache locality for similar storage operations func getStorageShardIndex(path string) int { if len(path) == 0 { return 0 } - // Use simple hash of the path to distribute evenly - hash := 0 - for i, r := range path { - hash = hash*31 + int(r) - if i >= 8 { // Use first 8 characters for better distribution - break - } - } - if hash < 0 { - hash = -hash - } - return hash % 16 + // use the first char of the path to determine the shard index + return int(path[0]) % storageNodesShardCount } // newLookup initializes the lookup structure. @@ -101,7 +88,7 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha descendant: descendant, } // Initialize all 16 storage node shards - for i := 0; i < 16; i++ { + for i := 0; i < storageNodesShardCount; i++ { l.storageNodes[i] = make(map[string][]common.Hash) } @@ -292,84 +279,71 @@ func (l *lookup) addLayer(diff *diffLayer) { st2 = time.Since(st) }() - // Use concurrent workers for storage nodes updates, one per shard - var storageWg sync.WaitGroup - storageWg.Add(16) - - // Create channels to distribute work to workers - type storageWork struct { - accountHash common.Hash - accountHex string - path string - } - - workChannels := make([]chan storageWork, 16) - for i := 0; i < 16; i++ { - workChannels[i] = make(chan storageWork, 1000) // Buffer to avoid blocking - } + wg.Add(1) + go func() { + defer wg.Done() - // Start 16 workers, each handling its own shard - for shardIndex := 0; shardIndex < 16; shardIndex++ { - go func(shardIdx int) { - defer storageWg.Done() - st := time.Now() - count := 0 + // Use concurrent workers for storage nodes updates, one per shard + var storageWg sync.WaitGroup + storageWg.Add(storageNodesShardCount) - for work := range workChannels[shardIdx] { - // Construct the combined key - key := work.accountHex + work.path + workChannels := make([]chan string, storageNodesShardCount) + for i := 0; i < storageNodesShardCount; i++ { + workChannels[i] = make(chan string, 10) // Buffer to avoid blocking + } - // Access the specific shard map (no lock needed as each worker owns its shard) - shardMap := l.storageNodes[shardIdx] - list, exists := shardMap[key] - if !exists { - list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool + // Start 16 workers, each handling its own shard + for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { + go func(shardIdx int) { + defer storageWg.Done() + st := time.Now() + count := 0 + + shard := l.storageNodes[shardIdx] + for key := range workChannels[shardIdx] { + // Access the specific shard map (no lock needed as each worker owns its shard) + list, exists := shard[key] + if !exists { + list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool + } + list = append(list, state) + shard[key] = list + count++ } - list = append(list, state) - shardMap[key] = list - count++ - } - st3 := time.Since(st) - if st3 > time.Millisecond { - log.Info("PathDB lookup add storage nodes worker", "shard", shardIdx, "count", count, "elapsed", st3) - } - }(shardIndex) - } + st3 := time.Since(st) + if st3 > time.Millisecond { + log.Info("PathDB lookup add storage nodes worker", "shard", shardIdx, "count", count, "elapsed", st3) + } + }(shardIndex) + } - // Distribute work to workers based on shard index - distributeStart := time.Now() - totalCount := 0 - for accountHash, slots := range diff.nodes.storageNodes { - accountHex := accountHash.Hex() - for path := range slots { - shardIndex := getStorageShardIndex(path) - workChannels[shardIndex] <- storageWork{ - accountHash: accountHash, - accountHex: accountHex, - path: path, + // Distribute work to workers based on shard index + distributeStart := time.Now() + totalCount := 0 + for accountHash, slots := range diff.nodes.storageNodes { + accountHex := accountHash.Hex() + for path := range slots { + shardIndex := getStorageShardIndex(path) + workChannels[shardIndex] <- accountHex + path + totalCount++ } - totalCount++ } - } - // Close all channels to signal workers to finish - for i := 0; i < 16; i++ { - close(workChannels[i]) - } + // Close all channels to signal workers to finish + for i := 0; i < storageNodesShardCount; i++ { + close(workChannels[i]) + } - // Wait for all storage workers to complete - storageWg.Wait() - st3 = time.Since(distributeStart) + // Wait for all storage workers to complete + storageWg.Wait() + st3 = time.Since(distributeStart) - if st3 > time.Millisecond { log.Info("PathDB lookup add storage nodes", "total_count", totalCount, "accounts", len(diff.nodes.storageNodes), "elapsed", st3) - } + }() wg.Wait() - if elapsed := time.Since(st00); elapsed > time.Millisecond { - log.Info("PathDB lookup", "id", diff.id, "block", diff.block, "st0", st0, "st1", st1, "st2", st2, "st3", st3, "elapsed", elapsed) - } + log.Info("PathDB lookup", "id", diff.id, "block", diff.block, "st0", st0, "st1", st1, "st2", st2, "st3", st3, "elapsed", time.Since(st00)) } // removeFromList removes the specified element from the provided list. diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index 827f479763a6..271ad8d8d61f 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -140,8 +140,8 @@ func TestConcurrentStorageNodesUpdate(b *testing.T) { lookup := &lookup{ accountNodes: make(map[string][]common.Hash), } - // Initialize all 16 storage node shards - for i := 0; i < 16; i++ { + // Initialize all storage node shards + for i := 0; i < storageNodesShardCount; i++ { lookup.storageNodes[i] = make(map[string][]common.Hash) } @@ -204,7 +204,7 @@ func TestConcurrentStorageNodesUpdate(b *testing.T) { } // Verify shard distribution - for i := 0; i < 16; i++ { + for i := 0; i < storageNodesShardCount; i++ { shardSize := len(lookup.storageNodes[i]) if shardSize == 0 { b.Logf("Shard %d is empty", i) @@ -213,44 +213,3 @@ func TestConcurrentStorageNodesUpdate(b *testing.T) { } } } - -func TestShardDistribution(b *testing.T) { - // Test shard distribution with different path patterns - paths := []string{ - "path_0_0", "path_0_1", "path_0_2", "path_0_3", - "path_1_0", "path_1_1", "path_1_2", "path_1_3", - "path_2_0", "path_2_1", "path_2_2", "path_2_3", - "path_3_0", "path_3_1", "path_3_2", "path_3_3", - "path_4_0", "path_4_1", "path_4_2", "path_4_3", - "path_5_0", "path_5_1", "path_5_2", "path_5_3", - "path_6_0", "path_6_1", "path_6_2", "path_6_3", - "path_7_0", "path_7_1", "path_7_2", "path_7_3", - "path_8_0", "path_8_1", "path_8_2", "path_8_3", - "path_9_0", "path_9_1", "path_9_2", "path_9_3", - } - - shardCounts := make(map[int]int) - for _, path := range paths { - shardIndex := getStorageShardIndex(path) - shardCounts[shardIndex]++ - b.Logf("Path: %s -> Shard: %d", path, shardIndex) - } - - b.Logf("Shard distribution:") - for i := 0; i < 16; i++ { - count := shardCounts[i] - b.Logf(" Shard %d: %d paths", i, count) - } - - // Check if we have a reasonable distribution - usedShards := 0 - for _, count := range shardCounts { - if count > 0 { - usedShards++ - } - } - - if usedShards < 4 { - b.Logf("Warning: Only %d shards are being used out of 16", usedShards) - } -} From 8fb247a9b4c22f9a1e83686f72aeea88f0c18198 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 07:14:05 +0000 Subject: [PATCH 06/13] remove log Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 6e26e1f9aea3..61bdf19c508a 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -223,18 +223,16 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha func (l *lookup) addLayer(diff *diffLayer) { defer func(now time.Time) { lookupAddLayerTimer.UpdateSince(now) + log.Debug("PathDB lookup add layer", "id", diff.id, "block", diff.block, "elapsed", time.Since(now)) }(time.Now()) var ( wg sync.WaitGroup state = diff.rootHash() ) - st00 := time.Now() - var st0, st1, st2, st3 time.Duration wg.Add(1) go func() { defer wg.Done() - st := time.Now() for accountHash := range diff.states.accountData { list, exists := l.accounts[accountHash] if !exists { @@ -243,13 +241,11 @@ func (l *lookup) addLayer(diff *diffLayer) { list = append(list, state) l.accounts[accountHash] = list } - st0 = time.Since(st) }() wg.Add(1) go func() { defer wg.Done() - st := time.Now() for accountHash, slots := range diff.states.storageData { for slotHash := range slots { key := storageKey(accountHash, slotHash) @@ -261,13 +257,11 @@ func (l *lookup) addLayer(diff *diffLayer) { l.storages[key] = list } } - st1 = time.Since(st) }() wg.Add(1) go func() { defer wg.Done() - st := time.Now() for path := range diff.nodes.accountNodes { list, exists := l.accountNodes[path] if !exists { @@ -276,7 +270,6 @@ func (l *lookup) addLayer(diff *diffLayer) { list = append(list, state) l.accountNodes[path] = list } - st2 = time.Since(st) }() wg.Add(1) @@ -292,41 +285,29 @@ func (l *lookup) addLayer(diff *diffLayer) { workChannels[i] = make(chan string, 10) // Buffer to avoid blocking } - // Start 16 workers, each handling its own shard + // Start all workers, each handling its own shard for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { go func(shardIdx int) { defer storageWg.Done() - st := time.Now() - count := 0 shard := l.storageNodes[shardIdx] for key := range workChannels[shardIdx] { - // Access the specific shard map (no lock needed as each worker owns its shard) list, exists := shard[key] if !exists { list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool } list = append(list, state) shard[key] = list - count++ - } - - st3 := time.Since(st) - if st3 > time.Millisecond { - log.Info("PathDB lookup add storage nodes worker", "shard", shardIdx, "count", count, "elapsed", st3) } }(shardIndex) } // Distribute work to workers based on shard index - distributeStart := time.Now() - totalCount := 0 for accountHash, slots := range diff.nodes.storageNodes { accountHex := accountHash.Hex() for path := range slots { shardIndex := getStorageShardIndex(path) workChannels[shardIndex] <- accountHex + path - totalCount++ } } @@ -337,13 +318,9 @@ func (l *lookup) addLayer(diff *diffLayer) { // Wait for all storage workers to complete storageWg.Wait() - st3 = time.Since(distributeStart) - - log.Info("PathDB lookup add storage nodes", "total_count", totalCount, "accounts", len(diff.nodes.storageNodes), "elapsed", st3) }() wg.Wait() - log.Info("PathDB lookup", "id", diff.id, "block", diff.block, "st0", st0, "st1", st1, "st2", st2, "st3", st3, "elapsed", time.Since(st00)) } // removeFromList removes the specified element from the provided list. @@ -375,6 +352,7 @@ func removeFromList(list []common.Hash, element common.Hash) (bool, []common.Has func (l *lookup) removeLayer(diff *diffLayer) error { defer func(now time.Time) { lookupRemoveLayerTimer.UpdateSince(now) + log.Debug("PathDB lookup remove layer", "id", diff.id, "block", diff.block, "elapsed", time.Since(now)) }(time.Now()) var ( From f5f0129f856835de68a8af373c4e568c0317f5f2 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 07:36:58 +0000 Subject: [PATCH 07/13] pathdb: remove in concurrent Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 151 ++++++++++++++++++++++++++++++---------- 1 file changed, 116 insertions(+), 35 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 61bdf19c508a..80771cd7b85f 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie/trienode" "golang.org/x/sync/errgroup" ) @@ -275,50 +276,74 @@ func (l *lookup) addLayer(diff *diffLayer) { wg.Add(1) go func() { defer wg.Done() + l.addStorageNodes(state, diff.nodes.storageNodes) + }() - // Use concurrent workers for storage nodes updates, one per shard - var storageWg sync.WaitGroup - storageWg.Add(storageNodesShardCount) - - workChannels := make([]chan string, storageNodesShardCount) - for i := 0; i < storageNodesShardCount; i++ { - workChannels[i] = make(chan string, 10) // Buffer to avoid blocking - } + wg.Wait() +} - // Start all workers, each handling its own shard - for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { - go func(shardIdx int) { - defer storageWg.Done() - - shard := l.storageNodes[shardIdx] - for key := range workChannels[shardIdx] { - list, exists := shard[key] - if !exists { - list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool - } - list = append(list, state) - shard[key] = list - } - }(shardIndex) - } +func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) { + count := 0 + for _, slots := range nodes { + count += len(slots) + } - // Distribute work to workers based on shard index - for accountHash, slots := range diff.nodes.storageNodes { + // If the number of storage nodes is small, use a single-threaded approach + if count <= 1000 { + for accountHash, slots := range nodes { accountHex := accountHash.Hex() for path := range slots { + key := accountHex + path shardIndex := getStorageShardIndex(path) - workChannels[shardIndex] <- accountHex + path + list, exists := l.storageNodes[shardIndex][key] + if !exists { + list = make([]common.Hash, 0, 16) + } + list = append(list, state) + l.storageNodes[shardIndex][key] = list } } + return + } + + // Use concurrent workers for storage nodes updates, one per shard + var wg sync.WaitGroup + wg.Add(storageNodesShardCount) - // Close all channels to signal workers to finish - for i := 0; i < storageNodesShardCount; i++ { - close(workChannels[i]) + workChannels := make([]chan string, storageNodesShardCount) + for i := 0; i < storageNodesShardCount; i++ { + workChannels[i] = make(chan string, 10) // Buffer to avoid blocking + } + + // Start all workers, each handling its own shard + for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { + go func(shardIdx int) { + defer wg.Done() + + shard := l.storageNodes[shardIdx] + for key := range workChannels[shardIdx] { + list, exists := shard[key] + if !exists { + list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool + } + list = append(list, state) + shard[key] = list + } + }(shardIndex) + } + + for accountHash, slots := range nodes { + accountHex := accountHash.Hex() + for path := range slots { + shardIndex := getStorageShardIndex(path) + workChannels[shardIndex] <- accountHex + path } + } - // Wait for all storage workers to complete - storageWg.Wait() - }() + // Close all channels to signal workers to finish + for i := 0; i < storageNodesShardCount; i++ { + close(workChannels[i]) + } wg.Wait() } @@ -408,7 +433,19 @@ func (l *lookup) removeLayer(diff *diffLayer) error { }) eg.Go(func() error { - for accountHash, slots := range diff.nodes.storageNodes { + return l.removeStorageNodes(state, diff.nodes.storageNodes) + }) + return eg.Wait() +} + +func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) error { + count := 0 + for _, slots := range nodes { + count += len(slots) + } + + if count <= 1000 { + for accountHash, slots := range nodes { accountHex := accountHash.Hex() for path := range slots { // Construct the combined key and find the correct shard @@ -426,6 +463,50 @@ func (l *lookup) removeLayer(diff *diffLayer) error { } } return nil - }) + } + + // Use concurrent workers for storage nodes removal, one per shard + var eg errgroup.Group + + // Create work channels for each shard + workChannels := make([]chan string, storageNodesShardCount) + + for i := 0; i < storageNodesShardCount; i++ { + workChannels[i] = make(chan string, 10) // Buffer to avoid blocking + } + + // Start all workers, each handling its own shard + for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { + shardIdx := shardIndex // Capture the variable + eg.Go(func() error { + shard := l.storageNodes[shardIdx] + for key := range workChannels[shardIdx] { + found, list := removeFromList(shard[key], state) + if !found { + return fmt.Errorf("storage lookup is not found, key: %s, state: %x", key, state) + } + if len(list) != 0 { + shard[key] = list + } else { + delete(shard, key) + } + } + return nil + }) + } + + for accountHash, slots := range nodes { + accountHex := accountHash.Hex() + for path := range slots { + key := accountHex + path + shardIndex := getStorageShardIndex(path) + workChannels[shardIndex] <- key + } + } + + for i := 0; i < storageNodesShardCount; i++ { + close(workChannels[i]) + } + return eg.Wait() } From 0281e226bf2950f041276daf1e8f95e40ef6ac72 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 7 Jul 2025 11:27:36 +0000 Subject: [PATCH 08/13] doc Signed-off-by: jsvisa --- triedb/pathdb/layertree.go | 2 + triedb/pathdb/lookup.go | 32 +++++-- triedb/pathdb/lookup_test.go | 158 +++++------------------------------ 3 files changed, 47 insertions(+), 145 deletions(-) diff --git a/triedb/pathdb/layertree.go b/triedb/pathdb/layertree.go index df0f84b3abab..221fdda844d5 100644 --- a/triedb/pathdb/layertree.go +++ b/triedb/pathdb/layertree.go @@ -339,6 +339,8 @@ func (tree *layerTree) lookupStorage(accountHash common.Hash, slotHash common.Ha return l, nil } +// lookupNode returns the layer that is guaranteed to contain the trie node +// data corresponding to the specified state root being queried. func (tree *layerTree) lookupNode(accountHash common.Hash, path string, state common.Hash) (layer, error) { // Hold the read lock to prevent the unexpected layer changes tree.lock.RLock() diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 80771cd7b85f..576fa81e395c 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -27,6 +27,7 @@ import ( "golang.org/x/sync/errgroup" ) +// storageNodesShardCount is the number of shards used for storage nodes. const storageNodesShardCount = 16 // storageKey returns a key for uniquely identifying the storage slot. @@ -52,10 +53,17 @@ type lookup struct { // where the slot was modified, with the order from oldest to newest. storages map[[64]byte][]common.Hash + // accountNodes represents the mutation history for specific account + // trie nodes. The key is the trie path of the node, and the value is a slice + // of **diff layer** IDs indicating where the account was modified, + // with the order from oldest to newest. accountNodes map[string][]common.Hash - // Key is accountHash.Hex() + path, distributed across 16 shards - // This eliminates the need for two-level map lookups + // storageNodes represents the mutation history for specific storage + // slot trie nodes, distributed across 16 shards for efficiency. + // The key is the account address hash and the trie path of the node, + // the value is a slice of **diff layer** IDs indicating where the + // slot was modified, with the order from oldest to newest. storageNodes [storageNodesShardCount]map[string][]common.Hash // descendant is the callback indicating whether the layer with @@ -186,6 +194,17 @@ func (l *lookup) storageTip(accountHash common.Hash, slotHash common.Hash, state return common.Hash{} } +// nodeTip traverses the layer list associated with the given account and path +// in reverse order to locate the first entry that either matches +// the specified stateID or is a descendant of it. +// +// If found, the trie node data corresponding to the supplied stateID resides +// in that layer. Otherwise, two scenarios are possible: +// +// (a) the trie node remains unmodified from the current disk layer up to +// the state layer specified by the stateID: fallback to the disk layer for +// data retrieval, (b) or the layer specified by the stateID is stale: reject +// the data retrieval. func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Hash, base common.Hash) common.Hash { var list []common.Hash if accountHash == (common.Hash{}) { @@ -448,17 +467,16 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map for accountHash, slots := range nodes { accountHex := accountHash.Hex() for path := range slots { - // Construct the combined key and find the correct shard + shard := l.storageNodes[getStorageShardIndex(path)] key := accountHex + path - shardIndex := getStorageShardIndex(path) - found, list := removeFromList(l.storageNodes[shardIndex][key], state) + found, list := removeFromList(shard[key], state) if !found { return fmt.Errorf("storage lookup is not found, %x %x, state: %x", accountHash, path, state) } if len(list) != 0 { - l.storageNodes[shardIndex][key] = list + shard[key] = list } else { - delete(l.storageNodes[shardIndex], key) + delete(shard, key) } } } diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index 271ad8d8d61f..aac7df95b13d 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -2,7 +2,6 @@ package pathdb import ( "crypto/rand" - "fmt" "testing" "github.com/ethereum/go-ethereum/common" @@ -43,173 +42,56 @@ func generateRandomStorageNodes(accountCount, nodesPerAccount int) map[common.Ha return storageNodes } -// addNodes is a helper method for testing that adds nodes to the lookup structure -func (l *lookup) addNodes(stateHash common.Hash, accountNodes map[string]*trienode.Node, storageNodes map[common.Hash]map[string]*trienode.Node) { - // Add account nodes - for path := range accountNodes { - list, exists := l.accountNodes[path] - if !exists { - list = make([]common.Hash, 0, 16) - } - list = append(list, stateHash) - l.accountNodes[path] = list - } - - // Add storage nodes using single-level sharded structure - for accountHash, slots := range storageNodes { - accountHex := accountHash.Hex() - - for path := range slots { - // Construct the combined key but use only path for shard calculation - key := accountHex + path - shardIndex := getStorageShardIndex(path) // Use only path for sharding - shardMap := l.storageNodes[shardIndex] - - list, exists := shardMap[key] - if !exists { - list = make([]common.Hash, 0, 16) - } - list = append(list, stateHash) - shardMap[key] = list - } - } -} - func BenchmarkAddNodes(b *testing.B) { tests := []struct { - name string - accountNodeCount int - storageAccountCount int - nodesPerAccount int + name string + accountNodeCount int + nodesPerAccount int }{ { - name: "Small-100-accounts-10-nodes", - accountNodeCount: 100, - storageAccountCount: 100, - nodesPerAccount: 10, + name: "Small-100-accounts-10-nodes", + accountNodeCount: 100, + nodesPerAccount: 10, }, { - name: "Medium-500-accounts-20-nodes", - accountNodeCount: 500, - storageAccountCount: 500, - nodesPerAccount: 20, + name: "Medium-500-accounts-20-nodes", + accountNodeCount: 500, + nodesPerAccount: 20, }, { - name: "Large-2000-accounts-40-nodes", - accountNodeCount: 2000, - storageAccountCount: 2000, - nodesPerAccount: 40, + name: "Large-2000-accounts-40-nodes", + accountNodeCount: 2000, + nodesPerAccount: 40, }, } for _, tc := range tests { b.Run(tc.name, func(b *testing.B) { - accountNodes := generateRandomAccountNodes(tc.accountNodeCount) - storageNodes := generateRandomStorageNodes(tc.storageAccountCount, tc.nodesPerAccount) + storageNodes := generateRandomStorageNodes(tc.accountNodeCount, tc.nodesPerAccount) lookup := &lookup{ accountNodes: make(map[string][]common.Hash), } + // Initialize all 16 storage node shards - for i := 0; i < 16; i++ { + for i := 0; i < storageNodesShardCount; i++ { lookup.storageNodes[i] = make(map[string][]common.Hash) } - var stateHash common.Hash - rand.Read(stateHash[:]) + var state common.Hash + rand.Read(state[:]) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - // Clear the nodes maps for each iteration - lookup.accountNodes = make(map[string][]common.Hash) - // Reinitialize all 16 storage node shards - for j := 0; j < 16; j++ { + // Reset the lookup instance for each benchmark iteration + for j := 0; j < storageNodesShardCount; j++ { lookup.storageNodes[j] = make(map[string][]common.Hash) } - lookup.addNodes(stateHash, accountNodes, storageNodes) + lookup.addStorageNodes(state, storageNodes) } }) } } - -func TestConcurrentStorageNodesUpdate(b *testing.T) { - // Create a lookup instance - lookup := &lookup{ - accountNodes: make(map[string][]common.Hash), - } - // Initialize all storage node shards - for i := 0; i < storageNodesShardCount; i++ { - lookup.storageNodes[i] = make(map[string][]common.Hash) - } - - // Create test data with known shard distribution - testData := map[common.Hash]map[string]*trienode.Node{} - - // Create accounts that will distribute across different shards - for i := 0; i < 100; i++ { - var accountHash common.Hash - accountHash[0] = byte(i) - - testData[accountHash] = make(map[string]*trienode.Node) - - // Create paths that will hash to different shards - for j := 0; j < 10; j++ { - path := fmt.Sprintf("path_%d_%d", i, j) - var nodeHash common.Hash - nodeHash[0] = byte(j) - - testData[accountHash][path] = &trienode.Node{Hash: nodeHash} - } - } - - // Add nodes using the concurrent method - var stateHash common.Hash - stateHash[0] = 0x42 - lookup.addNodes(stateHash, nil, testData) - - // Verify that all nodes were added correctly - totalNodes := 0 - for accountHash, slots := range testData { - accountHex := accountHash.Hex() - for path := range slots { - key := accountHex + path - shardIndex := getStorageShardIndex(path) - - list, exists := lookup.storageNodes[shardIndex][key] - if !exists { - b.Errorf("Node not found: account=%x, path=%s, shard=%d", accountHash, path, shardIndex) - continue - } - - if len(list) != 1 { - b.Errorf("Expected 1 state hash, got %d: account=%x, path=%s", len(list), accountHash, path) - continue - } - - if list[0] != stateHash { - b.Errorf("Expected state hash %x, got %x: account=%x, path=%s", stateHash, list[0], accountHash, path) - continue - } - - totalNodes++ - } - } - - expectedTotal := 100 * 10 // 100 accounts * 10 nodes each - if totalNodes != expectedTotal { - b.Errorf("Expected %d total nodes, got %d", expectedTotal, totalNodes) - } - - // Verify shard distribution - for i := 0; i < storageNodesShardCount; i++ { - shardSize := len(lookup.storageNodes[i]) - if shardSize == 0 { - b.Logf("Shard %d is empty", i) - } else { - b.Logf("Shard %d has %d entries", i, shardSize) - } - } -} From 88c4ac66ab5c1e1251be25d721ccc8fc2076a98c Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 15 Jul 2025 14:15:02 +0800 Subject: [PATCH 09/13] triedb/pathdb: polish the code Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 104 +++++++++++---------------------------- triedb/pathdb/metrics.go | 6 ++- 2 files changed, 32 insertions(+), 78 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 576fa81e395c..e4239a55e123 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -38,6 +38,11 @@ func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte { return key } +// trienodeKey returns a key for uniquely identifying the trie node. +func trienodeKey(accountHash common.Hash, path string) string { + return accountHash.Hex() + path +} + // lookup is an internal structure used to efficiently determine the layer in // which a state entry resides. type lookup struct { @@ -210,10 +215,8 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha if accountHash == (common.Hash{}) { list = l.accountNodes[path] } else { - // Construct the combined key but use only path for shard calculation - key := accountHash.Hex() + path shardIndex := getStorageShardIndex(path) // Use only path for sharding - list = l.storageNodes[shardIndex][key] + list = l.storageNodes[shardIndex][trienodeKey(accountHash, path)] } for i := len(list) - 1; i >= 0; i-- { // If the current state matches the stateID, or the requested state is a @@ -302,45 +305,25 @@ func (l *lookup) addLayer(diff *diffLayer) { } func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) { - count := 0 - for _, slots := range nodes { - count += len(slots) - } - - // If the number of storage nodes is small, use a single-threaded approach - if count <= 1000 { - for accountHash, slots := range nodes { - accountHex := accountHash.Hex() - for path := range slots { - key := accountHex + path - shardIndex := getStorageShardIndex(path) - list, exists := l.storageNodes[shardIndex][key] - if !exists { - list = make([]common.Hash, 0, 16) - } - list = append(list, state) - l.storageNodes[shardIndex][key] = list - } - } - return - } + defer func(start time.Time) { + lookupAddTrienodeLayerTimer.UpdateSince(start) + }(time.Now()) - // Use concurrent workers for storage nodes updates, one per shard - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + tasks = make([]chan string, storageNodesShardCount) + ) wg.Add(storageNodesShardCount) - - workChannels := make([]chan string, storageNodesShardCount) for i := 0; i < storageNodesShardCount; i++ { - workChannels[i] = make(chan string, 10) // Buffer to avoid blocking + tasks[i] = make(chan string, 10) // Buffer to avoid blocking } - // Start all workers, each handling its own shard for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { go func(shardIdx int) { defer wg.Done() shard := l.storageNodes[shardIdx] - for key := range workChannels[shardIdx] { + for key := range tasks[shardIdx] { list, exists := shard[key] if !exists { list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool @@ -352,18 +335,15 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st } for accountHash, slots := range nodes { - accountHex := accountHash.Hex() for path := range slots { shardIndex := getStorageShardIndex(path) - workChannels[shardIndex] <- accountHex + path + tasks[shardIndex] <- trienodeKey(accountHash, path) } } - // Close all channels to signal workers to finish for i := 0; i < storageNodesShardCount; i++ { - close(workChannels[i]) + close(tasks[i]) } - wg.Wait() } @@ -458,47 +438,23 @@ func (l *lookup) removeLayer(diff *diffLayer) error { } func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map[string]*trienode.Node) error { - count := 0 - for _, slots := range nodes { - count += len(slots) - } - - if count <= 1000 { - for accountHash, slots := range nodes { - accountHex := accountHash.Hex() - for path := range slots { - shard := l.storageNodes[getStorageShardIndex(path)] - key := accountHex + path - found, list := removeFromList(shard[key], state) - if !found { - return fmt.Errorf("storage lookup is not found, %x %x, state: %x", accountHash, path, state) - } - if len(list) != 0 { - shard[key] = list - } else { - delete(shard, key) - } - } - } - return nil - } - - // Use concurrent workers for storage nodes removal, one per shard - var eg errgroup.Group - - // Create work channels for each shard - workChannels := make([]chan string, storageNodesShardCount) + defer func(start time.Time) { + lookupRemoveTrienodeLayerTimer.UpdateSince(start) + }(time.Now()) + var ( + eg errgroup.Group + tasks = make([]chan string, storageNodesShardCount) + ) for i := 0; i < storageNodesShardCount; i++ { - workChannels[i] = make(chan string, 10) // Buffer to avoid blocking + tasks[i] = make(chan string, 10) // Buffer to avoid blocking } - // Start all workers, each handling its own shard for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { shardIdx := shardIndex // Capture the variable eg.Go(func() error { shard := l.storageNodes[shardIdx] - for key := range workChannels[shardIdx] { + for key := range tasks[shardIdx] { found, list := removeFromList(shard[key], state) if !found { return fmt.Errorf("storage lookup is not found, key: %s, state: %x", key, state) @@ -514,17 +470,13 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map } for accountHash, slots := range nodes { - accountHex := accountHash.Hex() for path := range slots { - key := accountHex + path shardIndex := getStorageShardIndex(path) - workChannels[shardIndex] <- key + tasks[shardIndex] <- trienodeKey(accountHash, path) } } - for i := 0; i < storageNodesShardCount; i++ { - close(workChannels[i]) + close(tasks[i]) } - return eg.Wait() } diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index 31c40053fc26..0ee0d84748de 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -85,8 +85,10 @@ var ( trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil) trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil) - lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil) - lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil) + lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil) + lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil) + lookupAddTrienodeLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/trienode/add/time", nil) + lookupRemoveTrienodeLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/trienode/remove/time", nil) historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil) historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil) From 25d3b5d88b1fc25ee1acfa7b0437fcfa8d78a40e Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 16 Jul 2025 09:05:37 +0800 Subject: [PATCH 10/13] triedb/pathdb: track state count Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 12 ++++++++++++ triedb/pathdb/metrics.go | 2 ++ 2 files changed, 14 insertions(+) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index e4239a55e123..65b74b1e38d5 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -301,6 +301,18 @@ func (l *lookup) addLayer(diff *diffLayer) { l.addStorageNodes(state, diff.nodes.storageNodes) }() + states := len(diff.states.accountData) + for _, slots := range diff.states.storageData { + states += len(slots) + } + lookupStateMeter.Mark(int64(states)) + + trienodes := len(diff.nodes.accountNodes) + for _, nodes := range diff.nodes.storageNodes { + trienodes += len(nodes) + } + lookupTrienodeMeter.Mark(int64(trienodes)) + wg.Wait() } diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index 0ee0d84748de..98381832ec82 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -89,6 +89,8 @@ var ( lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil) lookupAddTrienodeLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/trienode/add/time", nil) lookupRemoveTrienodeLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/trienode/remove/time", nil) + lookupStateMeter = metrics.NewRegisteredMeter("pathdb/lookup/state/count", nil) + lookupTrienodeMeter = metrics.NewRegisteredMeter("pathdb/lookup/trienode/count", nil) historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil) historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil) From b45b0419d733578064d052ca7a051da5cc062d43 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Mon, 18 Aug 2025 12:01:53 +0800 Subject: [PATCH 11/13] use batch instead of channel Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 104 ++++++++++++++++++++++------------- triedb/pathdb/lookup_test.go | 9 ++- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 65b74b1e38d5..fee2573a3560 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -38,9 +38,21 @@ func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte { return key } -// trienodeKey returns a key for uniquely identifying the trie node. -func trienodeKey(accountHash common.Hash, path string) string { - return accountHash.Hex() + path +// trienodeKey uses a fixed-size byte array instead of string to avoid string allocations. +type trienodeKey [96]byte // 32 bytes for hash + up to 64 bytes for path + +// makeTrienodeKey returns a key for uniquely identifying the trie node. +func makeTrienodeKey(accountHash common.Hash, path string) trienodeKey { + var key trienodeKey + copy(key[:32], accountHash[:]) + copy(key[32:], path) + return key +} + +// shardTask used to batch task by shard to minimize lock contention +type shardTask struct { + accountHash common.Hash + path string } // lookup is an internal structure used to efficiently determine the layer in @@ -69,7 +81,7 @@ type lookup struct { // The key is the account address hash and the trie path of the node, // the value is a slice of **diff layer** IDs indicating where the // slot was modified, with the order from oldest to newest. - storageNodes [storageNodesShardCount]map[string][]common.Hash + storageNodes [storageNodesShardCount]map[trienodeKey][]common.Hash // descendant is the callback indicating whether the layer with // given root is a descendant of the one specified by `ancestor`. @@ -103,7 +115,7 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha } // Initialize all 16 storage node shards for i := 0; i < storageNodesShardCount; i++ { - l.storageNodes[i] = make(map[string][]common.Hash) + l.storageNodes[i] = make(map[trienodeKey][]common.Hash) } // Apply the diff layers from bottom to top @@ -216,7 +228,7 @@ func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Ha list = l.accountNodes[path] } else { shardIndex := getStorageShardIndex(path) // Use only path for sharding - list = l.storageNodes[shardIndex][trienodeKey(accountHash, path)] + list = l.storageNodes[shardIndex][makeTrienodeKey(accountHash, path)] } for i := len(list) - 1; i >= 0; i-- { // If the current state matches the stateID, or the requested state is a @@ -323,19 +335,38 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st var ( wg sync.WaitGroup - tasks = make([]chan string, storageNodesShardCount) + locks [storageNodesShardCount]sync.Mutex + tasks = make([][]shardTask, storageNodesShardCount) ) - wg.Add(storageNodesShardCount) - for i := 0; i < storageNodesShardCount; i++ { - tasks[i] = make(chan string, 10) // Buffer to avoid blocking + + // Pre-allocate work lists + for accountHash, slots := range nodes { + for path := range slots { + shardIndex := getStorageShardIndex(path) + tasks[shardIndex] = append(tasks[shardIndex], shardTask{ + accountHash: accountHash, + path: path, + }) + } } + // Start all workers, each handling its own shard + wg.Add(storageNodesShardCount) for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { go func(shardIdx int) { defer wg.Done() + taskList := tasks[shardIdx] + if len(taskList) == 0 { + return + } + + locks[shardIdx].Lock() + defer locks[shardIdx].Unlock() + shard := l.storageNodes[shardIdx] - for key := range tasks[shardIdx] { + for _, task := range taskList { + key := makeTrienodeKey(task.accountHash, task.path) list, exists := shard[key] if !exists { list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool @@ -343,18 +374,8 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st list = append(list, state) shard[key] = list } - }(shardIndex) - } - for accountHash, slots := range nodes { - for path := range slots { - shardIndex := getStorageShardIndex(path) - tasks[shardIndex] <- trienodeKey(accountHash, path) - } - } - // Close all channels to signal workers to finish - for i := 0; i < storageNodesShardCount; i++ { - close(tasks[i]) + }(shardIndex) } wg.Wait() } @@ -456,20 +477,39 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map var ( eg errgroup.Group - tasks = make([]chan string, storageNodesShardCount) + locks [storageNodesShardCount]sync.Mutex + tasks = make([][]shardTask, storageNodesShardCount) ) - for i := 0; i < storageNodesShardCount; i++ { - tasks[i] = make(chan string, 10) // Buffer to avoid blocking + + // Pre-allocate work lists + for accountHash, slots := range nodes { + for path := range slots { + shardIndex := getStorageShardIndex(path) + tasks[shardIndex] = append(tasks[shardIndex], shardTask{ + accountHash: accountHash, + path: path, + }) + } } + // Start all workers, each handling its own shard for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { shardIdx := shardIndex // Capture the variable eg.Go(func() error { + taskList := tasks[shardIdx] + if len(taskList) == 0 { + return nil + } + + locks[shardIdx].Lock() + defer locks[shardIdx].Unlock() + shard := l.storageNodes[shardIdx] - for key := range tasks[shardIdx] { + for _, task := range taskList { + key := makeTrienodeKey(task.accountHash, task.path) found, list := removeFromList(shard[key], state) if !found { - return fmt.Errorf("storage lookup is not found, key: %s, state: %x", key, state) + return fmt.Errorf("storage lookup is not found, key: %x, state: %x", key, state) } if len(list) != 0 { shard[key] = list @@ -480,15 +520,5 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map return nil }) } - - for accountHash, slots := range nodes { - for path := range slots { - shardIndex := getStorageShardIndex(path) - tasks[shardIndex] <- trienodeKey(accountHash, path) - } - } - for i := 0; i < storageNodesShardCount; i++ { - close(tasks[i]) - } return eg.Wait() } diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index aac7df95b13d..0e690da9879d 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -63,6 +63,11 @@ func BenchmarkAddNodes(b *testing.B) { accountNodeCount: 2000, nodesPerAccount: 40, }, + { + name: "XLarge-5000-accounts-50-nodes", + accountNodeCount: 5000, + nodesPerAccount: 50, + }, } for _, tc := range tests { @@ -75,7 +80,7 @@ func BenchmarkAddNodes(b *testing.B) { // Initialize all 16 storage node shards for i := 0; i < storageNodesShardCount; i++ { - lookup.storageNodes[i] = make(map[string][]common.Hash) + lookup.storageNodes[i] = make(map[trienodeKey][]common.Hash) } var state common.Hash @@ -87,7 +92,7 @@ func BenchmarkAddNodes(b *testing.B) { for i := 0; i < b.N; i++ { // Reset the lookup instance for each benchmark iteration for j := 0; j < storageNodesShardCount; j++ { - lookup.storageNodes[j] = make(map[string][]common.Hash) + lookup.storageNodes[j] = make(map[trienodeKey][]common.Hash) } lookup.addStorageNodes(state, storageNodes) From c9cd48a8fb4f73ad0cd64f29a591dd3b47a5f7be Mon Sep 17 00:00:00 2001 From: jsvisa Date: Sat, 25 Oct 2025 03:19:20 +0000 Subject: [PATCH 12/13] triedb/pathdb: no need lock Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index fee2573a3560..37d90c4ce3e1 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -335,7 +335,6 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st var ( wg sync.WaitGroup - locks [storageNodesShardCount]sync.Mutex tasks = make([][]shardTask, storageNodesShardCount) ) @@ -361,9 +360,6 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st return } - locks[shardIdx].Lock() - defer locks[shardIdx].Unlock() - shard := l.storageNodes[shardIdx] for _, task := range taskList { key := makeTrienodeKey(task.accountHash, task.path) @@ -477,7 +473,6 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map var ( eg errgroup.Group - locks [storageNodesShardCount]sync.Mutex tasks = make([][]shardTask, storageNodesShardCount) ) @@ -501,9 +496,6 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map return nil } - locks[shardIdx].Lock() - defer locks[shardIdx].Unlock() - shard := l.storageNodes[shardIdx] for _, task := range taskList { key := makeTrienodeKey(task.accountHash, task.path) From 2a0b2f0511abbef0d73423242b4adda4d0e08bd7 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Sat, 25 Oct 2025 03:22:23 +0000 Subject: [PATCH 13/13] triedb/pathdb: account shared ``` go test ./triedb/pathdb/ -bench BenchmarkAddNodes ``` > old ``` goos: linux goarch: amd64 pkg: github.com/ethereum/go-ethereum/triedb/pathdb cpu: AMD Ryzen 7 5700G with Radeon Graphics BenchmarkAddNodes/Small-100-accounts-10-nodes-16 3624 308151 ns/op 1138262 B/op 1236 allocs/op BenchmarkAddNodes/Medium-500-accounts-20-nodes-16 738 2216662 ns/op 10793841 B/op 10352 allocs/op BenchmarkAddNodes/Large-2000-accounts-40-nodes-16 44 27227664 ns/op 92272295 B/op 80897 allocs/op BenchmarkAddNodes/XLarge-5000-accounts-50-nodes-16 13 92061992 ns/op 328729686 B/op 252497 allocs/op PASS ok github.com/ethereum/go-ethereum/triedb/pathdb 20.772s ``` > new ``` goos: linux goarch: amd64 pkg: github.com/ethereum/go-ethereum/triedb/pathdb cpu: AMD Ryzen 7 5700G with Radeon Graphics BenchmarkAddNodes/Small-100-accounts-10-nodes-16 6706 232594 ns/op 673550 B/op 1215 allocs/op BenchmarkAddNodes/Medium-500-accounts-20-nodes-16 804 1292732 ns/op 5910825 B/op 10328 allocs/op BenchmarkAddNodes/Large-2000-accounts-40-nodes-16 60 20907603 ns/op 53871128 B/op 80873 allocs/op BenchmarkAddNodes/XLarge-5000-accounts-50-nodes-16 13 78489762 ns/op 208748856 B/op 252475 allocs/op PASS ok github.com/ethereum/go-ethereum/triedb/pathdb 19.420s ``` Signed-off-by: jsvisa --- triedb/pathdb/lookup.go | 173 +++++++++++++++++++++-------------- triedb/pathdb/lookup_test.go | 25 ++++- 2 files changed, 123 insertions(+), 75 deletions(-) diff --git a/triedb/pathdb/lookup.go b/triedb/pathdb/lookup.go index 37d90c4ce3e1..73dca9710094 100644 --- a/triedb/pathdb/lookup.go +++ b/triedb/pathdb/lookup.go @@ -27,8 +27,8 @@ import ( "golang.org/x/sync/errgroup" ) -// storageNodesShardCount is the number of shards used for storage nodes. -const storageNodesShardCount = 16 +// trienodeShardCount is the number of shards used for trie nodes. +const trienodeShardCount = 16 // storageKey returns a key for uniquely identifying the storage slot. func storageKey(accountHash common.Hash, slotHash common.Hash) [64]byte { @@ -71,30 +71,31 @@ type lookup struct { storages map[[64]byte][]common.Hash // accountNodes represents the mutation history for specific account - // trie nodes. The key is the trie path of the node, and the value is a slice + // trie nodes, distributed across 16 shards for efficiency. + // The key is the trie path of the node, and the value is a slice // of **diff layer** IDs indicating where the account was modified, // with the order from oldest to newest. - accountNodes map[string][]common.Hash + accountNodes [trienodeShardCount]map[string][]common.Hash // storageNodes represents the mutation history for specific storage // slot trie nodes, distributed across 16 shards for efficiency. // The key is the account address hash and the trie path of the node, // the value is a slice of **diff layer** IDs indicating where the // slot was modified, with the order from oldest to newest. - storageNodes [storageNodesShardCount]map[trienodeKey][]common.Hash + storageNodes [trienodeShardCount]map[trienodeKey][]common.Hash // descendant is the callback indicating whether the layer with // given root is a descendant of the one specified by `ancestor`. descendant func(state common.Hash, ancestor common.Hash) bool } -// getStorageShardIndex returns the shard index for a given path -func getStorageShardIndex(path string) int { +// getNodeShardIndex returns the shard index for a given path +func getNodeShardIndex(path string) int { if len(path) == 0 { return 0 } // use the first char of the path to determine the shard index - return int(path[0]) % storageNodesShardCount + return int(path[0]) % trienodeShardCount } // newLookup initializes the lookup structure. @@ -108,14 +109,14 @@ func newLookup(head layer, descendant func(state common.Hash, ancestor common.Ha current = current.parentLayer() } l := &lookup{ - accounts: make(map[common.Hash][]common.Hash), - storages: make(map[[64]byte][]common.Hash), - accountNodes: make(map[string][]common.Hash), - descendant: descendant, + accounts: make(map[common.Hash][]common.Hash), + storages: make(map[[64]byte][]common.Hash), + descendant: descendant, } // Initialize all 16 storage node shards - for i := 0; i < storageNodesShardCount; i++ { + for i := 0; i < trienodeShardCount; i++ { l.storageNodes[i] = make(map[trienodeKey][]common.Hash) + l.accountNodes[i] = make(map[string][]common.Hash) } // Apply the diff layers from bottom to top @@ -225,9 +226,10 @@ func (l *lookup) storageTip(accountHash common.Hash, slotHash common.Hash, state func (l *lookup) nodeTip(accountHash common.Hash, path string, stateID common.Hash, base common.Hash) common.Hash { var list []common.Hash if accountHash == (common.Hash{}) { - list = l.accountNodes[path] + shardIndex := getNodeShardIndex(path) + list = l.accountNodes[shardIndex][path] } else { - shardIndex := getStorageShardIndex(path) // Use only path for sharding + shardIndex := getNodeShardIndex(path) // Use only path for sharding list = l.storageNodes[shardIndex][makeTrienodeKey(accountHash, path)] } for i := len(list) - 1; i >= 0; i-- { @@ -297,14 +299,7 @@ func (l *lookup) addLayer(diff *diffLayer) { wg.Add(1) go func() { defer wg.Done() - for path := range diff.nodes.accountNodes { - list, exists := l.accountNodes[path] - if !exists { - list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool - } - list = append(list, state) - l.accountNodes[path] = list - } + l.addAccountNodes(state, diff.nodes.accountNodes) }() wg.Add(1) @@ -335,43 +330,61 @@ func (l *lookup) addStorageNodes(state common.Hash, nodes map[common.Hash]map[st var ( wg sync.WaitGroup - tasks = make([][]shardTask, storageNodesShardCount) + tasks = make([][]shardTask, trienodeShardCount) ) - - // Pre-allocate work lists for accountHash, slots := range nodes { for path := range slots { - shardIndex := getStorageShardIndex(path) + shardIndex := getNodeShardIndex(path) tasks[shardIndex] = append(tasks[shardIndex], shardTask{ accountHash: accountHash, path: path, }) } } - - // Start all workers, each handling its own shard - wg.Add(storageNodesShardCount) - for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { - go func(shardIdx int) { + for shardIdx := 0; shardIdx < trienodeShardCount; shardIdx++ { + taskList := tasks[shardIdx] + if len(taskList) == 0 { + continue + } + wg.Add(1) + go func() { defer wg.Done() - - taskList := tasks[shardIdx] - if len(taskList) == 0 { - return - } - shard := l.storageNodes[shardIdx] for _, task := range taskList { key := makeTrienodeKey(task.accountHash, task.path) - list, exists := shard[key] - if !exists { - list = make([]common.Hash, 0, 16) // TODO(rjl493456442) use sync pool - } - list = append(list, state) - shard[key] = list + shard[key] = append(shard[key], state) } + }() + } + wg.Wait() +} - }(shardIndex) +func (l *lookup) addAccountNodes(state common.Hash, nodes map[string]*trienode.Node) { + defer func(start time.Time) { + lookupAddTrienodeLayerTimer.UpdateSince(start) + }(time.Now()) + + var ( + wg sync.WaitGroup + tasks = make([][]string, trienodeShardCount) + ) + for path := range nodes { + shardIndex := getNodeShardIndex(path) + tasks[shardIndex] = append(tasks[shardIndex], path) + } + for shardIdx := 0; shardIdx < trienodeShardCount; shardIdx++ { + taskList := tasks[shardIdx] + if len(taskList) == 0 { + continue + } + wg.Add(1) + go func() { + defer wg.Done() + shard := l.accountNodes[shardIdx] + for _, path := range taskList { + shard[path] = append(shard[path], state) + } + }() } wg.Wait() } @@ -446,18 +459,7 @@ func (l *lookup) removeLayer(diff *diffLayer) error { }) eg.Go(func() error { - for path := range diff.nodes.accountNodes { - found, list := removeFromList(l.accountNodes[path], state) - if !found { - return fmt.Errorf("account lookup is not found, %x, state: %x", path, state) - } - if len(list) != 0 { - l.accountNodes[path] = list - } else { - delete(l.accountNodes, path) - } - } - return nil + return l.removeAccountNodes(state, diff.nodes.accountNodes) }) eg.Go(func() error { @@ -473,29 +475,23 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map var ( eg errgroup.Group - tasks = make([][]shardTask, storageNodesShardCount) + tasks = make([][]shardTask, trienodeShardCount) ) - - // Pre-allocate work lists for accountHash, slots := range nodes { for path := range slots { - shardIndex := getStorageShardIndex(path) + shardIndex := getNodeShardIndex(path) tasks[shardIndex] = append(tasks[shardIndex], shardTask{ accountHash: accountHash, path: path, }) } } - - // Start all workers, each handling its own shard - for shardIndex := 0; shardIndex < storageNodesShardCount; shardIndex++ { - shardIdx := shardIndex // Capture the variable + for shardIdx := 0; shardIdx < trienodeShardCount; shardIdx++ { + taskList := tasks[shardIdx] + if len(taskList) == 0 { + continue + } eg.Go(func() error { - taskList := tasks[shardIdx] - if len(taskList) == 0 { - return nil - } - shard := l.storageNodes[shardIdx] for _, task := range taskList { key := makeTrienodeKey(task.accountHash, task.path) @@ -514,3 +510,40 @@ func (l *lookup) removeStorageNodes(state common.Hash, nodes map[common.Hash]map } return eg.Wait() } + +func (l *lookup) removeAccountNodes(state common.Hash, nodes map[string]*trienode.Node) error { + defer func(start time.Time) { + lookupRemoveTrienodeLayerTimer.UpdateSince(start) + }(time.Now()) + + var ( + eg errgroup.Group + tasks = make([][]string, trienodeShardCount) + ) + for path := range nodes { + shardIndex := getNodeShardIndex(path) + tasks[shardIndex] = append(tasks[shardIndex], path) + } + for shardIdx := 0; shardIdx < trienodeShardCount; shardIdx++ { + taskList := tasks[shardIdx] + if len(taskList) == 0 { + continue + } + eg.Go(func() error { + shard := l.accountNodes[shardIdx] + for _, path := range taskList { + found, list := removeFromList(shard[path], state) + if !found { + return fmt.Errorf("account lookup is not found, %x, state: %x", path, state) + } + if len(list) != 0 { + shard[path] = list + } else { + delete(shard, path) + } + } + return nil + }) + } + return eg.Wait() +} diff --git a/triedb/pathdb/lookup_test.go b/triedb/pathdb/lookup_test.go index 0e690da9879d..5c3e77938df7 100644 --- a/triedb/pathdb/lookup_test.go +++ b/triedb/pathdb/lookup_test.go @@ -1,3 +1,19 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + package pathdb import ( @@ -74,13 +90,12 @@ func BenchmarkAddNodes(b *testing.B) { b.Run(tc.name, func(b *testing.B) { storageNodes := generateRandomStorageNodes(tc.accountNodeCount, tc.nodesPerAccount) - lookup := &lookup{ - accountNodes: make(map[string][]common.Hash), - } + lookup := &lookup{} // Initialize all 16 storage node shards - for i := 0; i < storageNodesShardCount; i++ { + for i := 0; i < trienodeShardCount; i++ { lookup.storageNodes[i] = make(map[trienodeKey][]common.Hash) + lookup.accountNodes[i] = make(map[string][]common.Hash) } var state common.Hash @@ -91,7 +106,7 @@ func BenchmarkAddNodes(b *testing.B) { for i := 0; i < b.N; i++ { // Reset the lookup instance for each benchmark iteration - for j := 0; j < storageNodesShardCount; j++ { + for j := 0; j < trienodeShardCount; j++ { lookup.storageNodes[j] = make(map[trienodeKey][]common.Hash) }