Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f246a99
core/txpool/blobpool: adjust slot size for cell proof transactions
MariusVanDerWijden Jun 4, 2025
ae2cd38
core/txpool/blobpool: adjust slot size for cell proof transactions
MariusVanDerWijden Jun 4, 2025
f599be1
core/txpool/blobpool: factor out peerdas slotter
MariusVanDerWijden Jun 4, 2025
56bccbb
core/txpool/blobpool: add migration functionality
MariusVanDerWijden Jun 4, 2025
ea15365
core/txpool/blobpool: fixed tests
MariusVanDerWijden Jun 20, 2025
71c1d7c
core/txpool/blobpool: migrate blobpool
MariusVanDerWijden Jul 7, 2025
1036704
core/txpool/blobpool: migrate blobpool
MariusVanDerWijden Jul 7, 2025
2583c77
core/txpool/blobpool: migrate blobpool
MariusVanDerWijden Jul 7, 2025
72c3a94
go.mod: run go mod tidy
lightclient Jul 8, 2025
fc8d926
Update core/txpool/blobpool/slotter.go
MariusVanDerWijden Jul 9, 2025
0147ee1
core/txpool/blobpool: correctly assign error from os.Stat when checki…
lightclient Jul 11, 2025
b8340ac
core/txpool/blobpool: read slotter version from billy before doing mi…
lightclient Jul 14, 2025
89f21d2
core/txpool/blobpool: fix slotter slot size
lightclient Aug 25, 2025
dd99cf0
core/txpool/blobpool: update blobpool test for new sidecar version
lightclient Aug 28, 2025
0a061be
core/txpool/blobpool: bump per blob overhead
lightclient Sep 4, 2025
30832e2
core/txpool/blobpool: remove check for version byte in normal slotter…
lightclient Sep 4, 2025
38ed51c
core/txpool/blobpool: migrate billy if osaka is scheduled, not just a…
lightclient Sep 11, 2025
681716a
core/txpool/blobpool: fix overhead constant
MariusVanDerWijden Sep 12, 2025
ace873c
core/txpool/blobpool: fix overhead constant
MariusVanDerWijden Sep 12, 2025
e6ea290
core/txpool/blobpool: also migrate limbo
MariusVanDerWijden Sep 12, 2025
2e37dd8
core/txpool/blobpool: correct blobpool datadir
rjl493456442 Sep 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ const (
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
txAvgSize = 4 * 1024

// txBlobOverhead is an approximation of the overhead that an additional blob
// has on transaction size. This is added to the slotter to avoid tiny
// overflows causing all txs to move a shelf higher, wasting disk space. A
// small buffer is added to the proof overhead.
txBlobOverhead = uint32(kzg4844.CellProofsPerBlob*len(kzg4844.Proof{}) + 64)

// txMaxSize is the maximum size a single transaction can have, outside
// the included blobs. Since blob transactions are pulled instead of pushed,
// and only a small metadata is kept in ram, the rest is on disk, there is
Expand Down Expand Up @@ -83,6 +89,10 @@ const (
// limboedTransactionStore is the subfolder containing the currently included
// but not yet finalized transaction blobs.
limboedTransactionStore = "limbo"

// storeVersion is the current slotter layout used for the billy.Database
// store.
storeVersion = 1
)

// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
Expand Down Expand Up @@ -385,14 +395,22 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
}
p.head, p.state = head, state

// Create new slotter for pre-Osaka blob configuration.
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))

// See if we need to migrate the limbo after fusaka.
slotter, err = tryMigrate(p.chain.Config(), slotter, p.config.Datadir)
if err != nil {
return err
}

// Index all transactions on disk and delete anything unprocessable
var fails []uint64
index := func(id uint64, size uint32, blob []byte) {
if p.parseTransaction(id, size, blob) != nil {
fails = append(fails, id)
}
}
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index)
if err != nil {
return err
Expand Down Expand Up @@ -426,7 +444,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser

// Pool initialized, attach the blob limbo to it to track blobs included
// recently but not yet finalized
p.limbo, err = newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
p.limbo, err = newLimbo(p.chain.Config(), limbodir)
if err != nil {
p.Close()
return err
Expand Down
109 changes: 109 additions & 0 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,115 @@ func TestChangingSlotterSize(t *testing.T) {
}
}

// TestBillyMigration tests the billy migration from the default slotter to
// the PeerDAS slotter. This tests both the migration of the slotter
// as well as increasing the slotter size of the new slotter.
func TestBillyMigration(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))

// Create a temporary folder for the persistent backend
storage := t.TempDir()

os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
os.MkdirAll(filepath.Join(storage, limboedTransactionStore), 0700)
// Create the billy with the old slotter
oldSlotter := newSlotterEIP7594(6)
store, _ := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, oldSlotter, nil)

// Create transactions from a few accounts.
var (
key1, _ = crypto.GenerateKey()
key2, _ = crypto.GenerateKey()
key3, _ = crypto.GenerateKey()

addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
addr3 = crypto.PubkeyToAddress(key3.PublicKey)

tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)

blob1, _ = rlp.EncodeToBytes(tx1)
blob2, _ = rlp.EncodeToBytes(tx2)
)

// Write the two safely sized txs to store. note: although the store is
// configured for a blob count of 6, it can also support around ~1mb of call
// data - all this to say that we aren't using the the absolute largest shelf
// available.
store.Put(blob1)
store.Put(blob2)
store.Close()

// Mimic a blobpool with max blob count of 6 upgrading to a max blob count of 24.
for _, maxBlobs := range []int{6, 24} {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr3, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.Commit(0, true, false)

// Make custom chain config where the max blob count changes based on the loop variable.
zero := uint64(0)
config := &params.ChainConfig{
ChainID: big.NewInt(1),
LondonBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
CancunTime: &zero,
OsakaTime: &zero,
BlobScheduleConfig: &params.BlobScheduleConfig{
Cancun: &params.BlobConfig{
Target: maxBlobs / 2,
Max: maxBlobs,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
Osaka: &params.BlobConfig{
Target: maxBlobs / 2,
Max: maxBlobs,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
},
}
chain := &testBlockChain{
config: config,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}

// Try to add the big blob tx. In the initial iteration it should overflow
// the pool. On the subsequent iteration it should be accepted.
errs := pool.Add([]*types.Transaction{tx3}, true)
if _, ok := pool.index[addr3]; ok && maxBlobs == 6 {
t.Errorf("expected insert of oversized blob tx to fail: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0])
} else if !ok && maxBlobs == 10 {
t.Errorf("expected insert of oversized blob tx to succeed: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0])
}

// Verify the regular two txs are always available.
if got := pool.Get(tx1.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx1.Hash(), addr1)
}
if got := pool.Get(tx2.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx2.Hash(), addr2)
}

// Verify all the calculated pool internals. Interestingly, this is **not**
// a duplication of the above checks, this actually validates the verifier
// using the above already hard coded checks.
//
// Do not remove this, nor alter the above to be generic.
verifyPoolInternals(t, pool)

pool.Close()
}
}

// TestBlobCountLimit tests the blobpool enforced limits on the max blob count.
func TestBlobCountLimit(t *testing.T) {
var (
Expand Down
16 changes: 14 additions & 2 deletions core/txpool/blobpool/limbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"errors"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/billy"
)
Expand All @@ -48,19 +50,29 @@ type limbo struct {
}

// newLimbo opens and indexes a set of limboed blob transactions.
func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) {
func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
l := &limbo{
index: make(map[common.Hash]uint64),
groups: make(map[uint64]map[uint64]common.Hash),
}

// Create new slotter for pre-Osaka blob configuration.
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(config))

// See if we need to migrate the limbo after fusaka.
slotter, err := tryMigrate(config, slotter, datadir)
if err != nil {
return nil, err
}

// Index all limboed blobs on disk and delete anything unprocessable
var fails []uint64
index := func(id uint64, size uint32, data []byte) {
if l.parseBlob(id, data) != nil {
fails = append(fails, id)
}
}
store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, newSlotter(maxBlobsPerTransaction), index)
store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, slotter, index)
if err != nil {
return nil, err
}
Expand Down
84 changes: 83 additions & 1 deletion core/txpool/blobpool/slotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,49 @@

package blobpool

import (
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/billy"
)

// tryMigrate checks if the billy needs to be migrated and migrates if needed.
// Returns a slotter that can be used for the database.
func tryMigrate(config *params.ChainConfig, slotter billy.SlotSizeFn, datadir string) (billy.SlotSizeFn, error) {
// Check if we need to migrate our blob db to the new slotter.
if config.OsakaTime != nil {
// Open the store using the version slotter to see if any version has been
// written.
var version int
index := func(_ uint64, _ uint32, blob []byte) {
version = max(version, parseSlotterVersion(blob))
}
store, err := billy.Open(billy.Options{Path: datadir}, newVersionSlotter(), index)
if err != nil {
return nil, err
}
store.Close()

// If the version found is less than the currently configured store version,
// perform a migration then write the updated version of the store.
if version < storeVersion {
newSlotter := newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(config))
if err := billy.Migrate(billy.Options{Path: datadir, Repair: true}, slotter, newSlotter); err != nil {
return nil, err
}
store, err = billy.Open(billy.Options{Path: datadir}, newVersionSlotter(), nil)
if err != nil {
return nil, err
}
writeSlotterVersion(store, storeVersion)
store.Close()
}
// Set the slotter to the format now that the Osaka is active.
slotter = newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(config))
}
return slotter, nil
}

// newSlotter creates a helper method for the Billy datastore that returns the
// individual shelf sizes used to store transactions in.
//
Expand All @@ -25,7 +68,7 @@ package blobpool
// The slotter also creates a shelf for 0-blob transactions. Whilst those are not
// allowed in the current protocol, having an empty shelf is not a relevant use
// of resources, but it makes stress testing with junk transactions simpler.
func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) {
func newSlotter(maxBlobsPerTransaction int) billy.SlotSizeFn {
slotsize := uint32(txAvgSize)
slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return

Expand All @@ -36,3 +79,42 @@ func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) {
return slotsize, finished
}
}

// newSlotterEIP7594 creates a different slotter for EIP-7594 transactions.
// EIP-7594 (PeerDAS) changes the average transaction size which means the current
// static 4KB average size is not enough anymore.
// This slotter adds a dynamic overhead component to the slotter, which also
// captures the notion that blob transactions with more blobs are also more likely to
// to have more calldata.
func newSlotterEIP7594(maxBlobsPerTransaction int) billy.SlotSizeFn {
slotsize := uint32(txAvgSize)
slotsize -= uint32(blobSize) + txBlobOverhead // underflows, it's ok, will overflow back in the first return

return func() (size uint32, done bool) {
slotsize += blobSize + txBlobOverhead
finished := slotsize > uint32(maxBlobsPerTransaction)*(blobSize+txBlobOverhead)+txMaxSize

return slotsize, finished
}
}

// newVersionSlotter creates a slotter with a single 8 byte shelf to store
// version metadata in.
func newVersionSlotter() billy.SlotSizeFn {
return func() (size uint32, done bool) {
return 8, true
}
}

// parseSlotterVersion will parse the slotter's version from a given data blob.
func parseSlotterVersion(blob []byte) int {
if len(blob) > 0 {
return int(blob[0])
}
return 0
}

// writeSlotterVersion writes the current slotter version into the store.
func writeSlotterVersion(store billy.Database, version int) {
store.Put([]byte{byte(version)})
}
45 changes: 44 additions & 1 deletion core/txpool/blobpool/slotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package blobpool

import "testing"
import (
"testing"
)

// Tests that the slotter creates the expected database shelves.
func TestNewSlotter(t *testing.T) {
Expand Down Expand Up @@ -58,3 +60,44 @@ func TestNewSlotter(t *testing.T) {
}
}
}

// Tests that the slotter creates the expected database shelves.
func TestNewSlotterEIP7594(t *testing.T) {
// Generate the database shelve sizes
slotter := newSlotterEIP7594(6)

var shelves []uint32
for {
shelf, done := slotter()
shelves = append(shelves, shelf)
if done {
break
}
}
// Compare the database shelves to the expected ones
want := []uint32{
0*blobSize + 0*txBlobOverhead + txAvgSize, // 0 blob + some expected tx infos
1*blobSize + 1*txBlobOverhead + txAvgSize, // 1 blob + some expected tx infos
2*blobSize + 2*txBlobOverhead + txAvgSize, // 2 blob + some expected tx infos (could be fewer blobs and more tx data)
3*blobSize + 3*txBlobOverhead + txAvgSize, // 3 blob + some expected tx infos (could be fewer blobs and more tx data)
4*blobSize + 4*txBlobOverhead + txAvgSize, // 4 blob + some expected tx infos (could be fewer blobs and more tx data)
5*blobSize + 5*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
6*blobSize + 6*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
7*blobSize + 7*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
8*blobSize + 8*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
9*blobSize + 9*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
10*blobSize + 10*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
11*blobSize + 11*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
12*blobSize + 12*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
13*blobSize + 13*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size
14*blobSize + 14*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos >= 4 blobs + max tx metadata size
}
if len(shelves) != len(want) {
t.Errorf("shelves count mismatch: have %d, want %d", len(shelves), len(want))
}
for i := 0; i < len(shelves) && i < len(want); i++ {
if shelves[i] != want[i] {
t.Errorf("shelf %d mismatch: have %d, want %d", i, shelves[i], want[i])
}
}
}
4 changes: 2 additions & 2 deletions crypto/kzg4844/kzg4844.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ var (
blobT = reflect.TypeFor[Blob]()
commitmentT = reflect.TypeFor[Commitment]()
proofT = reflect.TypeFor[Proof]()

CellProofsPerBlob = 128
)

const CellProofsPerBlob = 128

// Blob represents a 4844 data blob.
type Blob [131072]byte

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v1.3.0
github.com/hashicorp/go-bexpr v0.1.10
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4
github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db
github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.3.2
github.com/huin/goupnp v1.3.0
Expand Down
Loading