From cc6daee901fd699edbb42f99eb7b8d528d1c0223 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 26 Sep 2025 19:34:20 +0200 Subject: [PATCH] Streamline block routing from gossip to clearance In order to avoid unnecessary `Forky`/`Forked` transitions and reduce the risk for block / blob / column / fork confusion, this PR simplifies the block processor to predominantly work with `Forky` types and sidecars typed accordingly. To achieve this, the `AsyncQueue` is replaced with `AsyncLock` that ends up working in a similar way for the given use case, ie both maintain a list of pending work - the advantage of `AsyncLock` is that we don't have to make extra copies of the block and instead queue the in-flight future (and its closure context where the block lives) inside the lock. Apart from the performance and memory usage benefits of not having to make so many copies along the call chain, this also reduces the risk of sidecar type contamination across different forks. --- .../blob_quarantine.nim | 3 +- .../gossip_processing/block_processor.nim | 432 +++++++----------- .../gossip_processing/eth2_processor.nim | 108 ++--- beacon_chain/nimbus_beacon_node.nim | 75 +-- beacon_chain/validators/message_router.nim | 18 +- tests/test_block_processor.nim | 18 +- 6 files changed, 267 insertions(+), 387 deletions(-) diff --git a/beacon_chain/consensus_object_pools/blob_quarantine.nim b/beacon_chain/consensus_object_pools/blob_quarantine.nim index a00cd68a78..24721b8e59 100644 --- a/beacon_chain/consensus_object_pools/blob_quarantine.nim +++ b/beacon_chain/consensus_object_pools/blob_quarantine.nim @@ -492,8 +492,7 @@ func hasSidecars*( proc popSidecars*( quarantine: var BlobQuarantine, blockRoot: Eth2Digest, - blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock | - fulu.SignedBeaconBlock | gloas.SignedBeaconBlock + blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock ): Opt[seq[ref BlobSidecar]] = ## Function returns sequence of blob sidecars for block root ``blockRoot`` and ## block ``blck``. diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index a9e14cf377..42ea3ed2cf 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -54,17 +54,6 @@ const ## Number of slots from wall time that we start processing every payload type - BlockEntry = object - blck*: ForkedSignedBeaconBlock - blobs*: Opt[BlobSidecars] - columns*: Opt[DataColumnSidecars] - maybeFinalized*: bool - ## The block source claims the block has been finalized already - resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) - queueTick*: Moment # Moment when block was enqueued - validationDur*: Duration # Time it took to perform gossip validation - src*: MsgSource - BlockProcessor* = object ## This manages the processing of blocks from different sources ## Blocks and attestations are enqueued in a gossip-validated state @@ -92,7 +81,10 @@ type # Producers # ---------------------------------------------------------------- - blockQueue: AsyncQueue[BlockEntry] + storeLock: AsyncLock + ## storeLock ensures that storeBlock is only called by one async task at + ## a time, queueing the others for processing in order + pendingStores: int # Consumer # ---------------------------------------------------------------- @@ -110,9 +102,11 @@ type ## The slot at which we sent a payload to the execution client the last ## time - NoSidecars = typeof(()) + NoSidecars* = typeof(()) SomeOptSidecars = NoSidecars | Opt[BlobSidecars] | Opt[DatacolumnSidecars] +const noSidecars* = default(NoSidecars) + # Initialization # ------------------------------------------------------------------------------ @@ -135,7 +129,7 @@ proc new*(T: type BlockProcessor, dumpDirInvalid: dumpDirInvalid, dumpDirIncoming: dumpDirIncoming, invalidBlockRoots: invalidBlockRoots, - blockQueue: newAsyncQueue[BlockEntry](), + storeLock: newAsyncLock(), consensusManager: consensusManager, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, @@ -148,7 +142,7 @@ proc new*(T: type BlockProcessor, # ------------------------------------------------------------------------------ func hasBlocks*(self: BlockProcessor): bool = - self.blockQueue.len() > 0 + self.pendingStores > 0 # Storage # ------------------------------------------------------------------------------ @@ -158,10 +152,10 @@ proc dumpInvalidBlock*( if self.dumpEnabled: dump(self.dumpDirInvalid, signedBlock) -proc dumpBlock[T]( +proc dumpBlock( self: BlockProcessor, signedBlock: ForkySignedBeaconBlock, - res: Result[T, VerifierError]) = + res: Result[void, VerifierError]) = if self.dumpEnabled and res.isErr: case res.error of VerifierError.Invalid: @@ -174,25 +168,6 @@ proc dumpBlock[T]( from ../consensus_object_pools/block_clearance import addBackfillBlock, addHeadBlockWithParent, checkHeadBlock -template selectSidecars( - consensusFork: static ConsensusFork, - blobsOpt: Opt[BlobSidecars], - columnsOpt: Opt[DataColumnSidecars], -): untyped = - # The when jungle here must be kept consistent with `verifySidecars` - when consensusFork in ConsensusFork.Fulu .. ConsensusFork.Gloas: - doAssert blobsOpt.isNone(), "No blobs in " & $consensusFork - columnsOpt - elif consensusFork in ConsensusFork.Deneb .. ConsensusFork.Electra: - doAssert columnsOpt.isNone(), "No columns in " & $consensusFork - blobsOpt - elif consensusFork in ConsensusFork.Phase0 .. ConsensusFork.Capella: - doAssert blobsOpt.isNone and columnsOpt.isNone(), - "No blobs/columns in " & $consensusFork - default(NoSidecars) - else: - {.error: "Unkown fork " & $consensusFork.} - proc verifySidecars( signedBlock: ForkySignedBeaconBlock, sidecarsOpt: SomeOptSidecars, @@ -377,33 +352,39 @@ proc checkBlobOrColumnlessSignature( return err("checkBlobOrColumnlessSignature: Invalid proposer signature") ok() +proc addBlock*( + self: ref BlockProcessor, + src: MsgSource, + blck: ForkySignedBeaconBlock, + sidecarsOpt: SomeOptSidecars, + maybeFinalized = false, + validationDur = Duration(), +): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + proc enqueueBlock*( - self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], - resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, + self: ref BlockProcessor, + src: MsgSource, + blck: ForkySignedBeaconBlock, + sidecarsOpt: SomeOptSidecars, maybeFinalized = false, - validationDur = Duration()) = - withBlck(blck): - if forkyBlck.message.slot <= self.consensusManager.dag.finalizedHead.slot: - # let backfill blocks skip the queue - these are always "fast" to process - # because there are no state rewinds to deal with - let sidecars = selectSidecars(consensusFork, blobs, data_columns) - resfut.complete(self.storeBackfillBlock(forkyBlck, sidecars)) - return - - try: - self.blockQueue.addLastNoWait(BlockEntry( - blck: blck, - blobs: blobs, - columns: data_columns, - maybeFinalized: maybeFinalized, - resfut: resfut, queueTick: Moment.now(), - validationDur: validationDur, - src: src)) - except AsyncQueueFullError: - raiseAssert "unbounded queue" - -proc enqueueQuarantine(self: var BlockProcessor, root: Eth2Digest) = + validationDur = Duration(), +) = + if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot: + # let backfill blocks skip the queue - these are always "fast" to process + # because there are no state rewinds to deal with + discard self[].storeBackfillBlock(blck, sidecarsOpt) + return + + # `discard` here means that the `async` task will continue running even though + # this function returns, similar to `asyncSpawn` (which we cannot use because + # of the return type) - therefore, processing of the block cannot be cancelled + # and its result is lost - this is fine however: callers of `enqueueBlock` + # don't care. However, because this acts as an unbounded queue, they have to + # be careful not to enqueue too many blocks or we'll run out of memory - + # `addBlock` should be used where managing backpressure is appropriate. + discard self.addBlock(src, blck, sidecarsOpt, maybeFinalized, validationDur) + +proc enqueueQuarantine(self: ref BlockProcessor, root: Eth2Digest) = ## Enqueue blocks whose parent is `root` - ie when `root` has been added to ## the blockchain dag, its direct descendants are now candidates for ## processing @@ -415,61 +396,39 @@ proc enqueueQuarantine(self: var BlockProcessor, root: Eth2Digest) = withBlck(quarantined): when consensusFork == ConsensusFork.Gloas: debugGloasComment "" - self.enqueueBlock( - MsgSource.gossip, - quarantined, - Opt.none(BlobSidecars), - Opt.none(DataColumnSidecars), - ) + self.enqueueBlock(MsgSource.gossip, forkyBlck, Opt.none(DataColumnSidecars)) elif consensusFork == ConsensusFork.Fulu: if len(forkyBlck.message.body.blob_kzg_commitments) == 0: self.enqueueBlock( - MsgSource.gossip, - quarantined, - Opt.none(BlobSidecars), - Opt.some(DataColumnSidecars @[]), + MsgSource.gossip, forkyBlck, Opt.some(DataColumnSidecars @[]) ) else: - if (let res = checkBlobOrColumnlessSignature(self, forkyBlck); res.isErr): + if (let res = checkBlobOrColumnlessSignature(self[], forkyBlck); res.isErr): warn "Failed to verify signature of unorphaned blobless block", blck = shortLog(forkyBlck), error = res.error() continue let cres = self.dataColumnQuarantine[].popSidecars(forkyBlck.root, forkyBlck) if cres.isSome: - self.enqueueBlock( - MsgSource.gossip, quarantined, Opt.none(BlobSidecars), cres - ) + self.enqueueBlock(MsgSource.gossip, forkyBlck, cres) else: discard self.consensusManager.quarantine[].addSidecarless( self.consensusManager[].dag.finalizedHead.slot, forkyBlck ) elif consensusFork in ConsensusFork.Deneb .. ConsensusFork.Electra: if len(forkyBlck.message.body.blob_kzg_commitments) == 0: - self.enqueueBlock( - MsgSource.gossip, - quarantined, - Opt.some(BlobSidecars @[]), - Opt.none(DataColumnSidecars), - ) + self.enqueueBlock(MsgSource.gossip, forkyBlck, Opt.some(BlobSidecars @[])) else: - if (let res = checkBlobOrColumnlessSignature(self, forkyBlck); res.isErr): + if (let res = checkBlobOrColumnlessSignature(self[], forkyBlck); res.isErr): warn "Failed to verify signature of unorphaned columnless block", blck = shortLog(forkyBlck), error = res.error() continue let bres = self.blobQuarantine[].popSidecars(forkyBlck.root, forkyBlck) if bres.isSome(): - self.enqueueBlock( - MsgSource.gossip, quarantined, bres, Opt.none(DataColumnSidecars) - ) + self.enqueueBlock(MsgSource.gossip, forkyBlck, bres) else: self.consensusManager.quarantine[].addSidecarless(forkyBlck) elif consensusFork in ConsensusFork.Phase0 .. ConsensusFork.Capella: - self.enqueueBlock( - MsgSource.gossip, - quarantined, - Opt.none(BlobSidecars), - Opt.none(DataColumnSidecars), - ) + self.enqueueBlock(MsgSource.gossip, forkyBlck, noSidecars) else: {.error: "Unknown consensus fork " & $consensusFork.} @@ -559,6 +518,46 @@ proc verifyPayload( else: ok OptimisticStatus.valid +proc enqueueFromDb(self: ref BlockProcessor, root: Eth2Digest) = + # TODO This logic can be removed if the database schema is extended + # to store non-canonical heads on top of the canonical head and learns to keep + # track of non-canonical forks - it was added during a time when there were + # many forks and the client needed frequent restarting leading to a database + # that contained semi-downloaded branches that couldn't be added via BlockRef. + let + dag = self.consensusManager.dag + blck = dag.getForkedBlock(root).valueOr: + return + + withBlck(blck): + var sidecarsOk = true + + let sidecarsOpt = + when consensusFork >= ConsensusFork.Fulu: + var data_column_sidecars: fulu.DataColumnSidecars + for i in self.dataColumnQuarantine[].custodyColumns: + let data_column = fulu.DataColumnSidecar.new() + if not dag.db.getDataColumnSidecar(root, i, data_column[]): + sidecarsOk = false # Pruned, or inconsistent DB + break + data_column_sidecars.add data_column + Opt.some data_column_sidecars + elif consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: + var blob_sidecars: BlobSidecars + for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: + let blob = BlobSidecar.new() + if not dag.db.getBlobSidecar(root, i.BlobIndex, blob[]): + sidecarsOk = false # Pruned, or inconsistent DB + break + blob_sidecars.add blob + Opt.some blob_sidecars + else: + noSidecars + + if sidecarsOk: + debug "Loaded block from storage", root + self.enqueueBlock(MsgSource.gossip, forkyBlck.asSigned(), sidecarsOpt) + proc storeBlock( self: ref BlockProcessor, src: MsgSource, @@ -568,7 +567,7 @@ proc storeBlock( maybeFinalized: bool, queueTick: Moment, validationDur: Duration, -): Future[Result[BlockRef, VerifierError]] {.async: (raises: [CancelledError]).} = +): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = ## storeBlock is the main entry point for unvalidated blocks - all untrusted ## blocks, regardless of origin, pass through here. When storing a block, ## we will add it to the dag and pass it to all block consumers that need @@ -605,10 +604,6 @@ proc storeBlock( # be invalidated (ie a block could be added while we wait for EL response # here) let parent = dag.checkHeadBlock(signedBlock).valueOr: - # TODO This logic can be removed if the database schema is extended - # to store non-canonical heads on top of the canonical head! - # If that is done, the database no longer contains extra blocks - # that have not yet been assigned a `BlockRef` if error == VerifierError.MissingParent: # This indicates that no `BlockRef` is available for the `parent_root`. # However, the block may still be available in local storage. On startup, @@ -620,60 +615,7 @@ proc storeBlock( # lot of time, especially when a non-canonical branch has non-trivial # depth. Note that if it turns out that a non-canonical branch eventually # becomes canonical, it is vital to import it as quickly as possible. - let - parent_root = signedBlock.message.parent_root - parentBlck = dag.getForkedBlock(parent_root) - if parentBlck.isSome(): - var columnsOk = true - let columns = - withBlck(parentBlck.get()): - when consensusFork >= ConsensusFork.Fulu: - var data_column_sidecars: fulu.DataColumnSidecars - for i in self.dataColumnQuarantine[].custodyColumns: - let data_column = fulu.DataColumnSidecar.new() - if not dag.db.getDataColumnSidecar(parent_root, i.ColumnIndex, data_column[]): - columnsOk = false - break - data_column_sidecars.add data_column - Opt.some data_column_sidecars - else: - Opt.none fulu.DataColumnSidecars - - var blobsOk = true - let blobs = - withBlck(parentBlck.get()): - when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: - var blob_sidecars: BlobSidecars - for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: - let blob = BlobSidecar.new() - if not dag.db.getBlobSidecar(parent_root, i.BlobIndex, blob[]): - blobsOk = false # Pruned, or inconsistent DB - break - blob_sidecars.add blob - Opt.some blob_sidecars - else: - Opt.none BlobSidecars - # Blobs and columns can never co-exist in the same block - # Block has neither blob sidecar nor data column sidecar - if blobs.isNone and columns.isNone: - debug "Loaded parent block from storage", parent_root - self[].enqueueBlock( - MsgSource.gossip, parentBlck.unsafeGet().asSigned(), Opt.none(BlobSidecars), - Opt.none(DataColumnSidecars)) - # Block has blob sidecars associated and NO data column sidecars - # as they cannot co-exist. - if blobsOk and blobs.isSome: - debug "Loaded parent block from storage", parent_root - self[].enqueueBlock( - MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs, - Opt.none(DataColumnSidecars)) - # Block has data column sidecars associated and NO blob sidecars - # as they cannot co-exist. - if columnsOk and columns.isSome: - debug "Loaded parent block from storage", parent_root - self[].enqueueBlock( - MsgSource.gossip, parentBlck.unsafeGet().asSigned(), Opt.none(BlobSidecars), - columns) + self.enqueueFromDb(signedBlock.message.parent_root) return err(error) @@ -778,57 +720,16 @@ proc storeBlock( blck = shortLog(blck), validationDur, queueDur, newPayloadDur, addHeadBlockDur, updateHeadDur - ok blck - -# Enqueue -# ------------------------------------------------------------------------------ + ok() -# Beacon block with no blobs and no data columns. proc addBlock*( - self: var BlockProcessor, src: MsgSource, - blck: ForkedSignedBeaconBlock, + self: ref BlockProcessor, + src: MsgSource, + blck: ForkySignedBeaconBlock, + sidecarsOpt: SomeOptSidecars, maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = - ## Enqueue a Gossip-validated block for consensus verification - # Backpressure: - # There is no backpressure here - producers must wait for `resfut` to - # constrain their own processing - # Producers: - # - Gossip (when synced) - # - SyncManager (during sync) - # - RequestManager (missing ancestor blocks) - # - API - let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") - enqueueBlock(self, src, blck, Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), - resfut, maybeFinalized, validationDur) - resfut - -# Post-Deneb and pre-Fulu block which MAY have blobs. -proc addBlock*( - self: var BlockProcessor, src: MsgSource, - blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = - ## Enqueue a Gossip-validated block for consensus verification - # Backpressure: - # There is no backpressure here - producers must wait for `resfut` to - # constrain their own processing - # Producers: - # - Gossip (when synced) - # - SyncManager (during sync) - # - RequestManager (missing ancestor blocks) - # - API - let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") - enqueueBlock(self, src, blck, blobs, Opt.none(DataColumnSidecars), - resfut, maybeFinalized, validationDur) - resfut - -# Post-Fulu block which MAY have data columns. -proc addBlock*( - self: var BlockProcessor, src: MsgSource, - blck: ForkedSignedBeaconBlock, - data_columns: Opt[DataColumnSidecars], maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + validationDur = Duration(), +): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: # There is no backpressure here - producers must wait for `resfut` to @@ -838,93 +739,98 @@ proc addBlock*( # - SyncManager (during sync) # - RequestManager (missing ancestor blocks) # - API - let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") - enqueueBlock(self, src, blck, Opt.none(BlobSidecars), data_columns, - resfut, maybeFinalized, validationDur) - resfut + let blockRoot = blck.root -# Event Loop -# ------------------------------------------------------------------------------ - -proc processBlock( - self: ref BlockProcessor, entry: BlockEntry) {.async: (raises: [CancelledError]).} = logScope: - blockRoot = shortLog(entry.blck.root) + blockRoot = shortLog(blockRoot) + + if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot: + # let backfill blocks skip the queue - these are always "fast" to process + # because there are no state rewinds to deal with + return self[].storeBackfillBlock(blck, sidecarsOpt) + + let queueTick = Moment.now() + let res = + try: + # If the lock is acquired already, the current block will be put on hold + # meaning that we'll form an unbounded queue of blocks to be processed + # waiting for the lock - this is similar to using an `AsyncQueue` but + # without the copying and transition to/from `Forked`. + # The lock is important to ensure that we don't process blocks out-of-order + # which both would upset the `storeBlock` logic and cause unnecessary + # quarantine traffic. + self.pendingStores += 1 + await self.storeLock.acquire() - let - wallTime = self.getBeaconTime() - (afterGenesis, _) = wallTime.toSlot() + # Cooperative concurrency: one block per loop iteration - because + # we run both networking and CPU-heavy things like block processing + # on the same thread, we need to make sure that there is steady progress + # on the networking side or we get long lockups that lead to timeouts. + const + # We cap waiting for an idle slot in case there's a lot of network traffic + # taking up all CPU - we don't want to _completely_ stop processing blocks + # in this case - doing so also allows us to benefit from more batching / + # larger network reads when under load. + idleTimeout = 10.milliseconds - if not afterGenesis: - error "Processing block before genesis, clock turned back?" - quit 1 + discard await idleAsync().withTimeout(idleTimeout) - let - res = withBlck(entry.blck): let - sidecars = selectSidecars(consensusFork, entry.blobs, entry.columns) - res = await self.storeBlock( - entry.src, wallTime, forkyBlck, sidecars, - entry.maybeFinalized, entry.queueTick, entry.validationDur) + wallTime = self.getBeaconTime() + (afterGenesis, _) = wallTime.toSlot() + + if not afterGenesis: + fatal "Processing block before genesis, clock turned back?" + quit 1 - self[].dumpBlock(forkyBlck, res) + await self.storeBlock( + src, wallTime, blck, sidecarsOpt, maybeFinalized, queueTick, validationDur + ) + finally: + try: + self.storeLock.release() + self.pendingStores -= 1 + except AsyncLockError: + raiseAssert "release matched with acquire, shouldn't happen" - res - root = entry.blck.root + self[].dumpBlock(blck, res) if res.isOk(): # Once a block is successfully stored, enqueue the direct descendants - self[].enqueueQuarantine(root) + self.enqueueQuarantine(blockRoot) else: case res.error() of VerifierError.MissingParent: - if (let r = self.consensusManager.quarantine[].addOrphan( - self.consensusManager.dag.finalizedHead.slot, entry.blck); - r.isErr()): - debug "could not add orphan", - blockRoot = shortLog(root), - blck = shortLog(entry.blck), - signature = shortLog(entry.blck.signature), - err = r.error() + let finalizedSlot = self.consensusManager.dag.finalizedHead.slot + if ( + let r = self.consensusManager.quarantine[].addOrphan( + finalizedSlot, ForkedSignedBeaconBlock.init(blck) + ) + r.isErr() + ): + debug "Could not add orphan", + blck = shortLog(blck), signature = shortLog(blck.signature), err = r.error() else: - if entry.blobs.isSome: - self.blobQuarantine[].put(root, entry.blobs.get) - if entry.columns.isSome: - self.dataColumnQuarantine[].put(root, entry.columns.get) + when sidecarsOpt is Opt[BlobSidecars]: + if sidecarsOpt.isSome: + self.blobQuarantine[].put(blockRoot, sidecarsOpt.get) + elif sidecarsOpt is Opt[DataColumnSidecars]: + if sidecarsOpt.isSome: + self.dataColumnQuarantine[].put(blockRoot, sidecarsOpt.get) + elif sidecarsOpt is NoSidecars: + discard + else: + {.error.} debug "Block quarantined", - blockRoot = shortLog(root), - blck = shortLog(entry.blck), - signature = shortLog(entry.blck.signature) - + blck = shortLog(blck), signature = shortLog(blck.signature) of VerifierError.UnviableFork: # Track unviables so that descendants can be discarded promptly # TODO Invalid and unviable should be treated separately, to correctly # respond when a descendant of an invalid block is validated # TODO re-add VeriferError.Invalid handling - self.consensusManager.quarantine[].addUnviable(root) + self.consensusManager.quarantine[].addUnviable(blockRoot) else: discard - if entry.resfut != nil: - entry.resfut.complete(res.mapConvert(void)) - -proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async: (raises: []).} = - try: - while true: - # Cooperative concurrency: one block per loop iteration - because - # we run both networking and CPU-heavy things like block processing - # on the same thread, we need to make sure that there is steady progress - # on the networking side or we get long lockups that lead to timeouts. - const - # We cap waiting for an idle slot in case there's a lot of network traffic - # taking up all CPU - we don't want to _completely_ stop processing blocks - # in this case - doing so also allows us to benefit from more batching / - # larger network reads when under load. - idleTimeout = 10.milliseconds - - discard await idleAsync().withTimeout(idleTimeout) - - await self.processBlock(await self[].blockQueue.popFirst()) - except CancelledError: - debug "Shutting down queue processing loop" + res diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index ba402c442f..67266cc6f3 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -219,6 +219,8 @@ proc processSignedBeaconBlock*( self: var Eth2Processor, src: MsgSource, signedBlock: ForkySignedBeaconBlock, maybeFinalized: bool = false): ValidationRes = + const consensusFork = typeof(signedBlock).kind + let wallTime = self.getCurrentBeaconTime() (afterGenesis, wallSlot) = wallTime.toSlot() @@ -240,66 +242,50 @@ proc processSignedBeaconBlock*( # decoding at this stage, which may be significant debug "Block received", delay - let v = - self.dag.validateBeaconBlock(self.quarantine, signedBlock, wallTime, {}) + self.dag.validateBeaconBlock(self.quarantine, signedBlock, wallTime, {}).isOkOr: + debug "Dropping block", err = error - if v.isOk(): - # Block passed validation - enqueue it for processing. The block processing - # queue is effectively unbounded as we use a freestanding task to enqueue - # the block - this is done so that when blocks arrive concurrently with - # sync, we don't lose the gossip blocks, but also don't block the gossip - # propagation of seemingly good blocks - trace "Block validated" - - if not(isNil(self.dag.onBlockGossipAdded)): - self.dag.onBlockGossipAdded(ForkedSignedBeaconBlock.init(signedBlock)) - - let blobs = - when typeof(signedBlock).kind in - [ConsensusFork.Deneb, ConsensusFork.Electra]: - let bres = - self.blobQuarantine[].popSidecars(signedBlock.root, signedBlock) - if bres.isSome(): - bres - else: - self.quarantine[].addSidecarless(signedBlock) - return v - else: - Opt.none(BlobSidecars) + self.blockProcessor[].dumpInvalidBlock(signedBlock) - let columns = - when typeof(signedBlock).kind >= ConsensusFork.Fulu: - let cres = - self.dataColumnQuarantine[].popSidecars(signedBlock.root, - signedBlock) - if cres.isSome(): - cres - else: - discard self.quarantine[].addSidecarless(self.dag.finalizedHead.slot, - signedBlock) - return v - else: - Opt.none(DataColumnSidecars) - - self.blockProcessor[].enqueueBlock( - src, ForkedSignedBeaconBlock.init(signedBlock), - blobs, - columns, - maybeFinalized = maybeFinalized, - validationDur = nanoseconds( - (self.getCurrentBeaconTime() - wallTime).nanoseconds)) - - # Validator monitor registration for blocks is done by the processor - beacon_blocks_received.inc() - beacon_block_delay.observe(delay.toFloatSeconds()) + beacon_blocks_dropped.inc(1, [$error[0]]) + return err(error) + + # Block passed validation - enqueue it for processing. The block processing + # queue is effectively unbounded as we use a freestanding task to enqueue + # the block - this is done so that when blocks arrive concurrently with + # sync, we don't lose the gossip blocks, but also don't block the gossip + # propagation of seemingly good blocks + trace "Block validated" + + if not (isNil(self.dag.onBlockGossipAdded)): + self.dag.onBlockGossipAdded(ForkedSignedBeaconBlock.init(signedBlock)) + + when consensusFork in ConsensusFork.Fulu .. ConsensusFork.Gloas: + let sidecarsOpt = + self.dataColumnQuarantine[].popSidecars(signedBlock.root, signedBlock) + if sidecarsOpt.isNone(): + discard self.quarantine[].addSidecarless(self.dag.finalizedHead.slot, signedBlock) + return ok() + elif consensusFork in ConsensusFork.Deneb .. ConsensusFork.Electra: + let sidecarsOpt = self.blobQuarantine[].popSidecars(signedBlock.root, signedBlock) + if sidecarsOpt.isNone(): + self.quarantine[].addSidecarless(signedBlock) + return ok() + elif consensusFork in ConsensusFork.Phase0 .. ConsensusFork.Capella: + const sidecarsOpt = noSidecars else: - debug "Dropping block", error = v.error() + {.error: "Unknown fork " & $consensusFork.} - self.blockProcessor[].dumpInvalidBlock(signedBlock) + let validationDur = nanoseconds((self.getCurrentBeaconTime() - wallTime).nanoseconds) + self.blockProcessor.enqueueBlock( + src, signedBlock, sidecarsOpt, maybeFinalized, validationDur + ) - beacon_blocks_dropped.inc(1, [$v.error[0]]) + # Validator monitor registration for blocks is done by the processor + beacon_blocks_received.inc() + beacon_block_delay.observe(delay.toFloatSeconds()) - v + ok() proc processBlobSidecar*( self: var Eth2Processor, src: MsgSource, @@ -332,13 +318,11 @@ proc processBlobSidecar*( self.blobQuarantine[].put(block_root, newClone(blobSidecar)) if (let o = self.quarantine[].popSidecarless(block_root); o.isSome): - let blobless = o.unsafeGet() - withBlck(blobless): + withBlck(o[]): when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: let bres = self.blobQuarantine[].popSidecars(block_root, forkyBlck) if bres.isSome(): - self.blockProcessor[].enqueueBlock(MsgSource.gossip, blobless, bres, - Opt.none(DataColumnSidecars)) + self.blockProcessor.enqueueBlock(MsgSource.gossip, forkyBlck, bres) else: self.quarantine[].addSidecarless(forkyBlck) else: @@ -375,17 +359,13 @@ proc processDataColumnSidecar*( debug "Data column validated, putting data column in quarantine" self.dataColumnQuarantine[].put(block_root, newClone(dataColumnSidecar)) if (let o = self.quarantine[].popSidecarless(block_root); o.isSome): - let columnless = o.unsafeGet() - withBlck(columnless): + withBlck(o[]): when consensusFork >= ConsensusFork.Fulu and consensusFork < ConsensusFork.Gloas: let cres = self.dataColumnQuarantine[].popSidecars(block_root, forkyBlck) if cres.isSome(): - self.blockProcessor[].enqueueBlock( - MsgSource.gossip, columnless, - Opt.none(BlobSidecars), - cres) + self.blockProcessor.enqueueBlock(MsgSource.gossip, forkyBlck, cres) else: discard self.quarantine[].addSidecarless( self.dag.finalizedHead.slot, forkyBlck) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 4e3c9d0199..438d41f237 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -448,12 +448,20 @@ proc initFullNode( blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = - # The design with a callback for block verification is unusual compared - # to the rest of the application, but fits with the general approach - # taken in the sync/request managers - this is an architectural compromise - # that should probably be reimagined more holistically in the future. - blockProcessor[].addBlock( - MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + withBlck(signedBlock): + when consensusFork in ConsensusFork.Fulu .. ConsensusFork.Gloas: + # TODO document why there are no columns here + let sidecarsOpt = Opt.none(DataColumnSidecars) + elif consensusFork in ConsensusFork.Deneb .. ConsensusFork.Electra: + template sidecarsOpt: untyped = blobs + elif consensusFork in ConsensusFork.Phase0 .. ConsensusFork.Capella: + const sidecarsOpt = noSidecars + else: + {.error: "Unkown fork: " & $consensusFork.} + + blockProcessor.addBlock( + MsgSource.gossip, forkyBlck, sidecarsOpt, maybeFinalized) + untrustedBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {. @@ -463,40 +471,38 @@ proc initFullNode( maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): - when consensusFork >= ConsensusFork.Fulu and - consensusFork < ConsensusFork.Gloas: + when consensusFork == ConsensusFork.Gloas: debugGloasComment "no blob_kzg_commitments field for gloas" - let cres = dataColumnQuarantine[].popSidecars(forkyBlck.root, forkyBlck) - if cres.isSome(): - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - cres, - maybeFinalized = maybeFinalized) - else: + let sidecarsOpt = Opt.none(DataColumnSidecars) + elif consensusFork == ConsensusFork.Fulu: + let sidecarsOpt = + dataColumnQuarantine[].popSidecars(forkyBlck.root, forkyBlck) + if sidecarsOpt.isNone(): # We don't have all the columns for this block, so we have # to put it in columnless quarantine. - if not quarantine[].addSidecarless( - dag.finalizedHead.slot, forkyBlck): - err(VerifierError.UnviableFork) - else: - err(VerifierError.MissingParent) - - elif consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: - let bres = blobQuarantine[].popSidecars(forkyBlck.root, forkyBlck) - if bres.isSome(): - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - bres, - maybeFinalized = maybeFinalized) - else: + return + if not quarantine[].addSidecarless(dag.finalizedHead.slot, forkyBlck): + err(VerifierError.UnviableFork) + else: + err(VerifierError.MissingParent) + elif consensusFork in ConsensusFork.Deneb .. ConsensusFork.Electra: + let sidecarsOpt = blobQuarantine[].popSidecars(forkyBlck.root, forkyBlck) + if sidecarsOpt.isNone(): # We don't have all the sidecars for this block, so we have # to put it to the quarantine. - if not quarantine[].addSidecarless( - dag.finalizedHead.slot, forkyBlck): - err(VerifierError.UnviableFork) - else: - err(VerifierError.MissingParent) + return + if not quarantine[].addSidecarless(dag.finalizedHead.slot, forkyBlck): + err(VerifierError.UnviableFork) + else: + err(VerifierError.MissingParent) + elif consensusFork in ConsensusFork.Phase0 .. ConsensusFork.Capella: + const sidecarsOpt = noSidecars else: - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - maybeFinalized = maybeFinalized) + {.error: "Unkown fork: " & $consensusFork.} + + await blockProcessor.addBlock( + MsgSource.gossip, forkyBlck, sidecarsOpt, maybeFinalized + ) rmanBlockLoader = proc( blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = dag.getForkedBlock(blockRoot) @@ -2524,7 +2530,6 @@ proc run*(node: BeaconNode, stopper: StopFuture) {.raises: [CatchableError].} = asyncSpawn runSlotLoop(node, wallTime) asyncSpawn runOnSecondLoop(node) - asyncSpawn runQueueProcessingLoop(node.blockProcessor) asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache) while true: diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 32f23283a6..5d6cc1e224 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -153,7 +153,7 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() when typeof(blck).kind >= ConsensusFork.Fulu: - var dataColumnRefs = Opt.none(fulu.DataColumnSidecars) + var sidecarOpt = Opt.none(fulu.DataColumnSidecars) let dataColumns = dataColumnsOpt.get() if dataColumnsOpt.isSome(): var das_workers = @@ -186,11 +186,9 @@ proc routeSignedBeaconBlock*( for dc in dataColumns: if dc.index in custody_columns: final_columns.add newClone(dc) - dataColumnRefs = Opt.some(final_columns) - let added = await router[].blockProcessor[].addBlock( - MsgSource.api, ForkedSignedBeaconBlock.init(blck), dataColumnRefs) + sidecarOpt = Opt.some(final_columns) elif typeof(blck).kind in [ConsensusFork.Deneb, ConsensusFork.Electra]: - var blobRefs = Opt.none(BlobSidecars) + var sidecarOpt = Opt.none(BlobSidecars) if blobsOpt.isSome(): let blobs = blobsOpt.get() var workers = newSeq[Future[SendResult]](blobs.len) @@ -208,13 +206,13 @@ proc routeSignedBeaconBlock*( blob = shortLog(blobs[i]), error = res.error[] else: notice "Blob sent", blob = shortLog(blobs[i]) - blobRefs = Opt.some(blobs.mapIt(newClone(it))) + sidecarOpt = Opt.some(blobs.mapIt(newClone(it))) - let added = await router[].blockProcessor[].addBlock( - MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobRefs) else: - let added = await router[].blockProcessor[].addBlock( - MsgSource.api, ForkedSignedBeaconBlock.init(blck)) + const sidecarOpt = noSidecars + + let added = await router[].blockProcessor.addBlock( + MsgSource.api, blck, sidecarOpt) # The boolean we return tells the caller whether the block was integrated # into the chain diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 5af7dcc9aa..a859f2bf30 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -70,11 +70,9 @@ suite "Block processor" & preset(): processor = BlockProcessor.new( false, "", "", batchVerifier, consensusManager, validatorMonitor, blobQuarantine, dataColumnQuarantine, getTimeFn) - discard processor.runQueueProcessingLoop() asyncTest "Reverse order block add & get" & preset(): - let missing = await processor[].addBlock( - MsgSource.gossip, ForkedSignedBeaconBlock.init(b2)) + let missing = await processor.addBlock(MsgSource.gossip, b2, noSidecars) check: missing.error == VerifierError.MissingParent @@ -84,8 +82,7 @@ suite "Block processor" & preset(): FetchRecord(root: b1.root) in quarantine[].checkMissing(32) let - status = await processor[].addBlock( - MsgSource.gossip, ForkedSignedBeaconBlock.init(b1)) + status = await processor.addBlock(MsgSource.gossip, b1, noSidecars) b1Get = dag.getBlockRef(b1.root) check: @@ -133,20 +130,16 @@ suite "Block processor" & preset(): false, "", "", batchVerifier, consensusManager, validatorMonitor, blobQuarantine, dataColumnQuarantine, getTimeFn, invalidBlockRoots = @[b2.root]) - processorFut = processor.runQueueProcessingLoop() - defer: await processorFut.cancelAndWait() block: - let res = await processor[].addBlock( - MsgSource.gossip, ForkedSignedBeaconBlock.init(b2)) + let res = await processor.addBlock(MsgSource.gossip, b2, noSidecars) check: res.isErr not dag.containsForkBlock(b1.root) not dag.containsForkBlock(b2.root) block: - let res = await processor[].addBlock( - MsgSource.gossip, ForkedSignedBeaconBlock.init(b1)) + let res = await processor.addBlock(MsgSource.gossip, b1, noSidecars) check: res.isOk dag.containsForkBlock(b1.root) @@ -158,8 +151,7 @@ suite "Block processor" & preset(): not dag.containsForkBlock(b2.root) block: - let res = await processor[].addBlock( - MsgSource.gossip, ForkedSignedBeaconBlock.init(b2)) + let res = await processor.addBlock(MsgSource.gossip, b2, noSidecars) check: res == Result[void, VerifierError].err VerifierError.Invalid dag.containsForkBlock(b1.root)