Skip to content

Commit df0bd89

Browse files
MariusVanDerWijdenlightclientrjl493456442
authored
core/txpool/blobpool: migrate billy to new slot size (#31966)
Implements a migration path for the blobpool slotter --------- Co-authored-by: lightclient <[email protected]> Co-authored-by: lightclient <[email protected]> Co-authored-by: Gary Rong <[email protected]>
1 parent 4824942 commit df0bd89

File tree

8 files changed

+274
-11
lines changed

8 files changed

+274
-11
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ const (
5555
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
5656
txAvgSize = 4 * 1024
5757

58+
// txBlobOverhead is an approximation of the overhead that an additional blob
59+
// has on transaction size. This is added to the slotter to avoid tiny
60+
// overflows causing all txs to move a shelf higher, wasting disk space. A
61+
// small buffer is added to the proof overhead.
62+
txBlobOverhead = uint32(kzg4844.CellProofsPerBlob*len(kzg4844.Proof{}) + 64)
63+
5864
// txMaxSize is the maximum size a single transaction can have, outside
5965
// the included blobs. Since blob transactions are pulled instead of pushed,
6066
// and only a small metadata is kept in ram, the rest is on disk, there is
@@ -83,6 +89,10 @@ const (
8389
// limboedTransactionStore is the subfolder containing the currently included
8490
// but not yet finalized transaction blobs.
8591
limboedTransactionStore = "limbo"
92+
93+
// storeVersion is the current slotter layout used for the billy.Database
94+
// store.
95+
storeVersion = 1
8696
)
8797

8898
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
@@ -392,14 +402,21 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
392402
}
393403
p.head, p.state = head, state
394404

405+
// Create new slotter for pre-Osaka blob configuration.
406+
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
407+
408+
// See if we need to migrate the queue blob store after fusaka
409+
slotter, err = tryMigrate(p.chain.Config(), slotter, queuedir)
410+
if err != nil {
411+
return err
412+
}
395413
// Index all transactions on disk and delete anything unprocessable
396414
var fails []uint64
397415
index := func(id uint64, size uint32, blob []byte) {
398416
if p.parseTransaction(id, size, blob) != nil {
399417
fails = append(fails, id)
400418
}
401419
}
402-
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
403420
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index)
404421
if err != nil {
405422
return err
@@ -433,7 +450,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
433450

434451
// Pool initialized, attach the blob limbo to it to track blobs included
435452
// recently but not yet finalized
436-
p.limbo, err = newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
453+
p.limbo, err = newLimbo(p.chain.Config(), limbodir)
437454
if err != nil {
438455
p.Close()
439456
return err

core/txpool/blobpool/blobpool_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,115 @@ func TestChangingSlotterSize(t *testing.T) {
11651165
}
11661166
}
11671167

1168+
// TestBillyMigration tests the billy migration from the default slotter to
1169+
// the PeerDAS slotter. This tests both the migration of the slotter
1170+
// as well as increasing the slotter size of the new slotter.
1171+
func TestBillyMigration(t *testing.T) {
1172+
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
1173+
1174+
// Create a temporary folder for the persistent backend
1175+
storage := t.TempDir()
1176+
1177+
os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
1178+
os.MkdirAll(filepath.Join(storage, limboedTransactionStore), 0700)
1179+
// Create the billy with the old slotter
1180+
oldSlotter := newSlotterEIP7594(6)
1181+
store, _ := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, oldSlotter, nil)
1182+
1183+
// Create transactions from a few accounts.
1184+
var (
1185+
key1, _ = crypto.GenerateKey()
1186+
key2, _ = crypto.GenerateKey()
1187+
key3, _ = crypto.GenerateKey()
1188+
1189+
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
1190+
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
1191+
addr3 = crypto.PubkeyToAddress(key3.PublicKey)
1192+
1193+
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
1194+
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
1195+
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)
1196+
1197+
blob1, _ = rlp.EncodeToBytes(tx1)
1198+
blob2, _ = rlp.EncodeToBytes(tx2)
1199+
)
1200+
1201+
// Write the two safely sized txs to store. note: although the store is
1202+
// configured for a blob count of 6, it can also support around ~1mb of call
1203+
// data - all this to say that we aren't using the the absolute largest shelf
1204+
// available.
1205+
store.Put(blob1)
1206+
store.Put(blob2)
1207+
store.Close()
1208+
1209+
// Mimic a blobpool with max blob count of 6 upgrading to a max blob count of 24.
1210+
for _, maxBlobs := range []int{6, 24} {
1211+
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
1212+
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
1213+
statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
1214+
statedb.AddBalance(addr3, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
1215+
statedb.Commit(0, true, false)
1216+
1217+
// Make custom chain config where the max blob count changes based on the loop variable.
1218+
zero := uint64(0)
1219+
config := &params.ChainConfig{
1220+
ChainID: big.NewInt(1),
1221+
LondonBlock: big.NewInt(0),
1222+
BerlinBlock: big.NewInt(0),
1223+
CancunTime: &zero,
1224+
OsakaTime: &zero,
1225+
BlobScheduleConfig: &params.BlobScheduleConfig{
1226+
Cancun: &params.BlobConfig{
1227+
Target: maxBlobs / 2,
1228+
Max: maxBlobs,
1229+
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
1230+
},
1231+
Osaka: &params.BlobConfig{
1232+
Target: maxBlobs / 2,
1233+
Max: maxBlobs,
1234+
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
1235+
},
1236+
},
1237+
}
1238+
chain := &testBlockChain{
1239+
config: config,
1240+
basefee: uint256.NewInt(1050),
1241+
blobfee: uint256.NewInt(105),
1242+
statedb: statedb,
1243+
}
1244+
pool := New(Config{Datadir: storage}, chain, nil)
1245+
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
1246+
t.Fatalf("failed to create blob pool: %v", err)
1247+
}
1248+
1249+
// Try to add the big blob tx. In the initial iteration it should overflow
1250+
// the pool. On the subsequent iteration it should be accepted.
1251+
errs := pool.Add([]*types.Transaction{tx3}, true)
1252+
if _, ok := pool.index[addr3]; ok && maxBlobs == 6 {
1253+
t.Errorf("expected insert of oversized blob tx to fail: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0])
1254+
} else if !ok && maxBlobs == 10 {
1255+
t.Errorf("expected insert of oversized blob tx to succeed: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0])
1256+
}
1257+
1258+
// Verify the regular two txs are always available.
1259+
if got := pool.Get(tx1.Hash()); got == nil {
1260+
t.Errorf("expected tx %s from %s in pool", tx1.Hash(), addr1)
1261+
}
1262+
if got := pool.Get(tx2.Hash()); got == nil {
1263+
t.Errorf("expected tx %s from %s in pool", tx2.Hash(), addr2)
1264+
}
1265+
1266+
// Verify all the calculated pool internals. Interestingly, this is **not**
1267+
// a duplication of the above checks, this actually validates the verifier
1268+
// using the above already hard coded checks.
1269+
//
1270+
// Do not remove this, nor alter the above to be generic.
1271+
verifyPoolInternals(t, pool)
1272+
1273+
pool.Close()
1274+
}
1275+
}
1276+
11681277
// TestBlobCountLimit tests the blobpool enforced limits on the max blob count.
11691278
func TestBlobCountLimit(t *testing.T) {
11701279
var (

core/txpool/blobpool/limbo.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import (
2020
"errors"
2121

2222
"github.com/ethereum/go-ethereum/common"
23+
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
2324
"github.com/ethereum/go-ethereum/core/types"
2425
"github.com/ethereum/go-ethereum/log"
26+
"github.com/ethereum/go-ethereum/params"
2527
"github.com/ethereum/go-ethereum/rlp"
2628
"github.com/holiman/billy"
2729
)
@@ -48,19 +50,29 @@ type limbo struct {
4850
}
4951

5052
// newLimbo opens and indexes a set of limboed blob transactions.
51-
func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) {
53+
func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
5254
l := &limbo{
5355
index: make(map[common.Hash]uint64),
5456
groups: make(map[uint64]map[uint64]common.Hash),
5557
}
58+
59+
// Create new slotter for pre-Osaka blob configuration.
60+
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(config))
61+
62+
// See if we need to migrate the limbo after fusaka.
63+
slotter, err := tryMigrate(config, slotter, datadir)
64+
if err != nil {
65+
return nil, err
66+
}
67+
5668
// Index all limboed blobs on disk and delete anything unprocessable
5769
var fails []uint64
5870
index := func(id uint64, size uint32, data []byte) {
5971
if l.parseBlob(id, data) != nil {
6072
fails = append(fails, id)
6173
}
6274
}
63-
store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, newSlotter(maxBlobsPerTransaction), index)
75+
store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, slotter, index)
6476
if err != nil {
6577
return nil, err
6678
}

core/txpool/blobpool/slotter.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,49 @@
1616

1717
package blobpool
1818

19+
import (
20+
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
21+
"github.com/ethereum/go-ethereum/params"
22+
"github.com/holiman/billy"
23+
)
24+
25+
// tryMigrate checks if the billy needs to be migrated and migrates if needed.
26+
// Returns a slotter that can be used for the database.
27+
func tryMigrate(config *params.ChainConfig, slotter billy.SlotSizeFn, datadir string) (billy.SlotSizeFn, error) {
28+
// Check if we need to migrate our blob db to the new slotter.
29+
if config.OsakaTime != nil {
30+
// Open the store using the version slotter to see if any version has been
31+
// written.
32+
var version int
33+
index := func(_ uint64, _ uint32, blob []byte) {
34+
version = max(version, parseSlotterVersion(blob))
35+
}
36+
store, err := billy.Open(billy.Options{Path: datadir}, newVersionSlotter(), index)
37+
if err != nil {
38+
return nil, err
39+
}
40+
store.Close()
41+
42+
// If the version found is less than the currently configured store version,
43+
// perform a migration then write the updated version of the store.
44+
if version < storeVersion {
45+
newSlotter := newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(config))
46+
if err := billy.Migrate(billy.Options{Path: datadir, Repair: true}, slotter, newSlotter); err != nil {
47+
return nil, err
48+
}
49+
store, err = billy.Open(billy.Options{Path: datadir}, newVersionSlotter(), nil)
50+
if err != nil {
51+
return nil, err
52+
}
53+
writeSlotterVersion(store, storeVersion)
54+
store.Close()
55+
}
56+
// Set the slotter to the format now that the Osaka is active.
57+
slotter = newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(config))
58+
}
59+
return slotter, nil
60+
}
61+
1962
// newSlotter creates a helper method for the Billy datastore that returns the
2063
// individual shelf sizes used to store transactions in.
2164
//
@@ -25,7 +68,7 @@ package blobpool
2568
// The slotter also creates a shelf for 0-blob transactions. Whilst those are not
2669
// allowed in the current protocol, having an empty shelf is not a relevant use
2770
// of resources, but it makes stress testing with junk transactions simpler.
28-
func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) {
71+
func newSlotter(maxBlobsPerTransaction int) billy.SlotSizeFn {
2972
slotsize := uint32(txAvgSize)
3073
slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return
3174

@@ -36,3 +79,42 @@ func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) {
3679
return slotsize, finished
3780
}
3881
}
82+
83+
// newSlotterEIP7594 creates a different slotter for EIP-7594 transactions.
84+
// EIP-7594 (PeerDAS) changes the average transaction size which means the current
85+
// static 4KB average size is not enough anymore.
86+
// This slotter adds a dynamic overhead component to the slotter, which also
87+
// captures the notion that blob transactions with more blobs are also more likely to
88+
// to have more calldata.
89+
func newSlotterEIP7594(maxBlobsPerTransaction int) billy.SlotSizeFn {
90+
slotsize := uint32(txAvgSize)
91+
slotsize -= uint32(blobSize) + txBlobOverhead // underflows, it's ok, will overflow back in the first return
92+
93+
return func() (size uint32, done bool) {
94+
slotsize += blobSize + txBlobOverhead
95+
finished := slotsize > uint32(maxBlobsPerTransaction)*(blobSize+txBlobOverhead)+txMaxSize
96+
97+
return slotsize, finished
98+
}
99+
}
100+
101+
// newVersionSlotter creates a slotter with a single 8 byte shelf to store
102+
// version metadata in.
103+
func newVersionSlotter() billy.SlotSizeFn {
104+
return func() (size uint32, done bool) {
105+
return 8, true
106+
}
107+
}
108+
109+
// parseSlotterVersion will parse the slotter's version from a given data blob.
110+
func parseSlotterVersion(blob []byte) int {
111+
if len(blob) > 0 {
112+
return int(blob[0])
113+
}
114+
return 0
115+
}
116+
117+
// writeSlotterVersion writes the current slotter version into the store.
118+
func writeSlotterVersion(store billy.Database, version int) {
119+
store.Put([]byte{byte(version)})
120+
}

core/txpool/blobpool/slotter_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package blobpool
1818

19-
import "testing"
19+
import (
20+
"testing"
21+
)
2022

2123
// Tests that the slotter creates the expected database shelves.
2224
func TestNewSlotter(t *testing.T) {
@@ -58,3 +60,44 @@ func TestNewSlotter(t *testing.T) {
5860
}
5961
}
6062
}
63+
64+
// Tests that the slotter creates the expected database shelves.
65+
func TestNewSlotterEIP7594(t *testing.T) {
66+
// Generate the database shelve sizes
67+
slotter := newSlotterEIP7594(6)
68+
69+
var shelves []uint32
70+
for {
71+
shelf, done := slotter()
72+
shelves = append(shelves, shelf)
73+
if done {
74+
break
75+
}
76+
}
77+
// Compare the database shelves to the expected ones
78+
want := []uint32{
79+
0*blobSize + 0*txBlobOverhead + txAvgSize, // 0 blob + some expected tx infos
80+
1*blobSize + 1*txBlobOverhead + txAvgSize, // 1 blob + some expected tx infos
81+
2*blobSize + 2*txBlobOverhead + txAvgSize, // 2 blob + some expected tx infos (could be fewer blobs and more tx data)
82+
3*blobSize + 3*txBlobOverhead + txAvgSize, // 3 blob + some expected tx infos (could be fewer blobs and more tx data)
83+
4*blobSize + 4*txBlobOverhead + txAvgSize, // 4 blob + some expected tx infos (could be fewer blobs and more tx data)
84+
5*blobSize + 5*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
85+
6*blobSize + 6*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
86+
7*blobSize + 7*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
87+
8*blobSize + 8*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
88+
9*blobSize + 9*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
89+
10*blobSize + 10*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
90+
11*blobSize + 11*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
91+
12*blobSize + 12*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
92+
13*blobSize + 13*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
93+
14*blobSize + 14*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos >= 4 blobs + max tx metadata size
94+
}
95+
if len(shelves) != len(want) {
96+
t.Errorf("shelves count mismatch: have %d, want %d", len(shelves), len(want))
97+
}
98+
for i := 0; i < len(shelves) && i < len(want); i++ {
99+
if shelves[i] != want[i] {
100+
t.Errorf("shelf %d mismatch: have %d, want %d", i, shelves[i], want[i])
101+
}
102+
}
103+
}

crypto/kzg4844/kzg4844.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ var (
3434
blobT = reflect.TypeFor[Blob]()
3535
commitmentT = reflect.TypeFor[Commitment]()
3636
proofT = reflect.TypeFor[Proof]()
37-
38-
CellProofsPerBlob = 128
3937
)
4038

39+
const CellProofsPerBlob = 128
40+
4141
// Blob represents a 4844 data blob.
4242
type Blob [131072]byte
4343

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ require (
3737
github.com/gorilla/websocket v1.4.2
3838
github.com/graph-gophers/graphql-go v1.3.0
3939
github.com/hashicorp/go-bexpr v0.1.10
40-
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4
40+
github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db
4141
github.com/holiman/bloomfilter/v2 v2.0.3
4242
github.com/holiman/uint256 v1.3.2
4343
github.com/huin/goupnp v1.3.0

0 commit comments

Comments
 (0)