Skip to content
30 changes: 24 additions & 6 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/eval"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
Expand Down Expand Up @@ -98,6 +99,9 @@
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool

txnGroupTester *eval.TransactionGroupTester
txnGroupTesterMu deadlock.RWMutex

// shutdown is set to true when the pool is being shut down. It is checked in exported methods
// to prevent pool operations like remember and recomputing the block evaluator
// from using down stream resources like ledger that may be shutting down.
Expand All @@ -106,7 +110,6 @@

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
type BlockEvaluator interface {
TestTransactionGroup(txgroup []transactions.SignedTxn) error
Round() basics.Round
PaySetSize() int
TransactionGroup(txads []transactions.SignedTxnWithAD) error
Expand Down Expand Up @@ -394,19 +397,20 @@

// Test performs basic duplicate detection and well-formedness checks
// on a transaction group without storing the group.
// It may be called concurrently.
func (pool *TransactionPool) Test(txgroup []transactions.SignedTxn) error {
if err := pool.checkPendingQueueSize(txgroup); err != nil {
return err
}

pool.mu.Lock()
defer pool.mu.Unlock()
pool.txnGroupTesterMu.RLock()
defer pool.txnGroupTesterMu.RUnlock()

Check warning on line 407 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L406-L407

Added lines #L406 - L407 were not covered by tests

if pool.pendingBlockEvaluator == nil {
return fmt.Errorf("Test: pendingBlockEvaluator is nil")
if pool.txnGroupTester == nil {
return fmt.Errorf("Test: txnGroupTester is nil")

Check warning on line 410 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L409-L410

Added lines #L409 - L410 were not covered by tests
}

return pool.pendingBlockEvaluator.TestTransactionGroup(txgroup)
return pool.txnGroupTester.TestTransactionGroup(txgroup)

Check warning on line 413 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L413

Added line #L413 was not covered by tests
}

type poolIngestParams struct {
Expand Down Expand Up @@ -753,6 +757,20 @@
return
}

nextProto := config.Consensus[next.CurrentProtocol]
pool.txnGroupTesterMu.Lock()
pool.txnGroupTester = eval.NewTransactionGroupTester(
nextProto,
transactions.SpecialAddresses{
FeeSink: next.FeeSink,
RewardsPool: next.RewardsPool,
},
next,
func(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
return pool.ledger.CheckDup(nextProto, next.BlockHeader.Round, firstValid, lastValid, txid, txl)
})

Check warning on line 771 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L770-L771

Added lines #L770 - L771 were not covered by tests
pool.txnGroupTesterMu.Unlock()

var asmStats telemetryspec.AssembleBlockMetrics
asmStats.StartCount = len(txgroups)
asmStats.StopReason = telemetryspec.AssembleBlockEmpty
Expand Down
3 changes: 2 additions & 1 deletion data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func (handler *TxHandler) Start() {
{Tag: protocol.TxnTag, MessageHandler: network.ValidateHandleFunc(handler.validateIncomingTxMessage)},
})

handler.backlogWg.Add(2)
handler.backlogWg.Add(3)
go handler.backlogWorker()
go handler.backlogWorker()
go handler.backlogGaugeThread()
handler.streamVerifier.Start(handler.ctx)
Expand Down
75 changes: 53 additions & 22 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,21 +2204,46 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxnDead))

txn1 := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[0],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: poolAddr,
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
makeTxn := func() transactions.Transaction {
return transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[0],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: poolAddr,
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
}
}
txn1 := makeTxn()

// add a round 1 block with oldTxn1 and oldTxn2 in it (for txID and lease checking later)
oldTxn1 := makeTxn()
oldTxn2 := oldTxn1
crypto.RandBytes(oldTxn2.Lease[:])
prev, err := ledger.BlockHdr(ledger.Latest())
require.NoError(t, err)
next := bookkeeping.MakeBlock(prev)
blockEval, err := ledger.StartEvaluator(next.BlockHeader, 0, 0, nil)
require.NoError(t, err)
err = blockEval.Transaction(oldTxn1.Sign(secrets[0]), transactions.ApplyData{})
require.NoError(t, err)
err = blockEval.Transaction(oldTxn2.Sign(secrets[0]), transactions.ApplyData{})
require.NoError(t, err)

// simulate this transaction was applied
ufblk, err := blockEval.GenerateBlock(nil)
require.NoError(t, err)
block := ledgercore.MakeValidatedBlock(ufblk.UnfinishedBlock(), ufblk.UnfinishedDeltas())
err = ledger.AddValidatedBlock(block, agreement.Certificate{})
require.NoError(t, err)

// trigger hitting pending queue max
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
for i := 0; i <= cfg.TxPoolSize; i++ {
txn := txn1
Expand Down Expand Up @@ -2288,24 +2313,30 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
// trigger TransactionInLedgerError (txid) error
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
wi.rawmsg = &network.IncomingMessage{}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember
handler.postProcessCheckedTxn(&wi) // calls Remember again
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxIDEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxIDEval))
// check transaction committed in round 1 (calls Ledger.CheckDup)
wi.unverifiedTxGroup = []transactions.SignedTxn{oldTxn1.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi) // calls Test
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxID))

// trigger LeaseInLedgerError (lease) error
txn2 = txn1
crypto.RandBytes(txn2.Lease[:])
txn3 := txn2
txn3.Receiver = addr
wi.unverifiedTxGroup = []transactions.SignedTxn{txn2.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember
wi.unverifiedTxGroup = []transactions.SignedTxn{txn3.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember again
require.Equal(t, 1, getMetricCounter(txPoolRememberTagLeaseEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLeaseEval))
// check transaction lease conflict with round 1 txn (calls Ledger.CheckDup)
oldTxn3 := oldTxn2
oldTxn3.Receiver = addr
wi.unverifiedTxGroup = []transactions.SignedTxn{oldTxn3.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi) // calls Test
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLease))

// TODO: not sure how to trigger fee error - need to return ErrNoSpace from ledger
// trigger pool fee error
Expand All @@ -2323,7 +2354,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
ledger.RegisterBlockListeners(blockListeners)

// add few blocks: on ci sometimes blockTicker is not fired in time in case of a single block
for i := basics.Round(1); i <= 3; i++ {
for i := basics.Round(2); i <= 4; i++ {
hdr := bookkeeping.BlockHeader{
Round: i,
UpgradeState: bookkeeping.UpgradeState{
Expand Down
34 changes: 30 additions & 4 deletions ledger/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,10 +904,34 @@ func (eval *BlockEvaluator) ResetTxnBytes() {
eval.blockTxBytes = 0
}

// TestTransactionGroup is only called by tests.
func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then move it to eval_test.go?

return NewTransactionGroupTester(eval.proto, eval.specials, eval.block, eval.state.checkDup).TestTransactionGroup(txgroup)
}

// TransactionGroupTester performs basic transaction checks for well-formedness and duplicate detection.
type TransactionGroupTester struct {
proto config.ConsensusParams
specials transactions.SpecialAddresses
txnContext transactions.TxnContext
checkDup func(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error
}

// NewTransactionGroupTester creates a new TransactionGroupTester for use in calling TestTransactionGroup.
func NewTransactionGroupTester(proto config.ConsensusParams, specials transactions.SpecialAddresses, txnContext transactions.TxnContext, checkDup func(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error) *TransactionGroupTester {
return &TransactionGroupTester{
proto: proto,
specials: specials,
txnContext: txnContext,
checkDup: checkDup,
}
}

// TestTransactionGroup performs basic duplicate detection and well-formedness checks
// on a transaction group, but does not actually add the transactions to the block
// evaluator, or modify the block evaluator state in any other visible way.
func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
// It uses a TestEvalContext to access needed recent ledger state.
func (eval TransactionGroupTester) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
// Nothing to do if there are no transactions.
if len(txgroup) == 0 {
return nil
Expand Down Expand Up @@ -966,9 +990,9 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx
// TestTransaction performs basic duplicate detection and well-formedness checks
// on a single transaction, but does not actually add the transaction to the block
// evaluator, or modify the block evaluator state in any other visible way.
func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {
func (eval TransactionGroupTester) TestTransaction(txn transactions.SignedTxn) error {
// Transaction valid (not expired)?
err := txn.Txn.Alive(eval.block)
err := txn.Txn.Alive(eval.txnContext)
if err != nil {
return err
}
Expand All @@ -981,7 +1005,8 @@ func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {

// Transaction already in the ledger?
txid := txn.ID()
err = eval.state.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
// BlockEvaluator.transaction will check again using cow.checkDup later, if the pool tries to add this transaction to the block.
err = eval.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
if err != nil {
return err
}
Expand Down Expand Up @@ -1163,6 +1188,7 @@ func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, evalParams *
}

// Transaction already in the ledger?
// this checks against the txns added to this evaluator; testTransaction currently only checks against committed txns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// this checks against the txns added to this evaluator; testTransaction currently only checks against committed txns.

This is not 100% accurate, checkDup checks the ledger as well, maybe reword to "against the txns added to this evaluator and the ledger" ?

err = cow.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
if err != nil {
return err
Expand Down
Loading