diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 71f10c83..58f09a30 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -74,6 +74,14 @@ pub struct OpPayloadBuilderCtx { } impl OpPayloadBuilderCtx { + pub(super) fn with_cancel(self, cancel: CancellationToken) -> Self { + Self { cancel, ..self } + } + + pub(super) fn with_extra_ctx(self, extra_ctx: ExtraCtx) -> Self { + Self { extra_ctx, ..self } + } + /// Returns the parent block the payload will be build on. pub(super) fn parent(&self) -> &SealedHeader { &self.config.parent_header @@ -330,7 +338,7 @@ impl OpPayloadBuilderCtx { let tx_da_limit = self.da_config.max_da_tx_size(); let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone()); - info!( + debug!( target: "payload_builder", message = "Executing best transactions", block_da_limit = ?block_da_limit, diff --git a/crates/op-rbuilder/src/builders/flashblocks/ctx.rs b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs new file mode 100644 index 00000000..c465445f --- /dev/null +++ b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs @@ -0,0 +1,81 @@ +use crate::{ + builders::{BuilderConfig, OpPayloadBuilderCtx, flashblocks::FlashblocksConfig}, + gas_limiter::{AddressGasLimiter, args::GasLimiterArgs}, + metrics::OpRBuilderMetrics, + traits::ClientBounds, +}; +use op_revm::OpSpecId; +use reth_basic_payload_builder::PayloadConfig; +use reth_evm::EvmEnv; +use reth_optimism_chainspec::OpChainSpec; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_payload_builder::{OpPayloadBuilderAttributes, config::OpDAConfig}; +use reth_optimism_primitives::OpTransactionSigned; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, Clone)] +pub(super) struct OpPayloadSyncerCtx { + /// The type that knows how to perform system calls and configure the evm. + evm_config: OpEvmConfig, + /// The DA config for the payload builder + da_config: OpDAConfig, + /// The chainspec + chain_spec: Arc, + /// Max gas that can be used by a transaction. + max_gas_per_txn: Option, + /// The metrics for the builder + metrics: Arc, +} + +impl OpPayloadSyncerCtx { + pub(super) fn new( + client: &Client, + builder_config: BuilderConfig, + evm_config: OpEvmConfig, + metrics: Arc, + ) -> eyre::Result + where + Client: ClientBounds, + { + let chain_spec = client.chain_spec(); + Ok(Self { + evm_config, + da_config: builder_config.da_config.clone(), + chain_spec, + max_gas_per_txn: builder_config.max_gas_per_txn, + metrics, + }) + } + + pub(super) fn evm_config(&self) -> &OpEvmConfig { + &self.evm_config + } + + pub(super) fn max_gas_per_txn(&self) -> Option { + self.max_gas_per_txn + } + + pub(super) fn into_op_payload_builder_ctx( + self, + payload_config: PayloadConfig>, + evm_env: EvmEnv, + block_env_attributes: OpNextBlockEnvAttributes, + cancel: CancellationToken, + ) -> OpPayloadBuilderCtx { + OpPayloadBuilderCtx { + evm_config: self.evm_config, + da_config: self.da_config, + chain_spec: self.chain_spec, + config: payload_config, + evm_env, + block_env_attributes, + cancel, + builder_signer: None, + metrics: self.metrics, + extra_ctx: (), + max_gas_per_txn: self.max_gas_per_txn, + address_gas_limiter: AddressGasLimiter::new(GasLimiterArgs::default()), + } + } +} diff --git a/crates/op-rbuilder/src/builders/flashblocks/mod.rs b/crates/op-rbuilder/src/builders/flashblocks/mod.rs index 643bf39a..87d4494c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/mod.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/mod.rs @@ -6,6 +6,7 @@ use service::FlashblocksServiceBuilder; mod best_txs; mod builder_tx; mod config; +mod ctx; mod p2p; mod payload; mod payload_handler; diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 09867843..9623d0f3 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -68,12 +68,12 @@ type NextBestFlashblocksTxs = BestFlashblocksTxs< >; #[derive(Debug, Default)] -struct ExtraExecutionInfo { +pub(super) struct ExtraExecutionInfo { /// Index of the last consumed flashblock - pub last_flashblock_index: usize, + last_flashblock_index: usize, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct FlashblocksExtraCtx { /// Current flashblock index flashblock_index: u64, @@ -102,16 +102,9 @@ impl OpPayloadBuilderCtx { self.extra_ctx.target_flashblock_count } - /// Increments the flashblock index - pub(crate) fn increment_flashblock_index(&mut self) -> u64 { - self.extra_ctx.flashblock_index += 1; - self.extra_ctx.flashblock_index - } - - /// Sets the target flashblock count - pub(crate) fn set_target_flashblock_count(&mut self, target_flashblock_count: u64) -> u64 { - self.extra_ctx.target_flashblock_count = target_flashblock_count; - self.extra_ctx.target_flashblock_count + /// Returns the next flashblock index + pub(crate) fn next_flashblock_index(&self) -> u64 { + self.extra_ctx.flashblock_index + 1 } /// Returns if the flashblock is the first fallback block @@ -159,8 +152,8 @@ impl OpPayloadBuilder { config: BuilderConfig, builder_tx: BuilderTx, payload_tx: mpsc::Sender, + metrics: Arc, ) -> eyre::Result { - let metrics = Arc::new(OpRBuilderMetrics::default()); let ws_pub = WebSocketPublisher::new(config.specific.ws_addr, Arc::clone(&metrics))?.into(); let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone()); Ok(Self { @@ -211,43 +204,16 @@ where Client: ClientBounds, BuilderTx: BuilderTransactions + Send + Sync, { - /// Constructs an Optimism payload from the transactions sent via the - /// Payload attributes by the sequencer. If the `no_tx_pool` argument is passed in - /// the payload attributes, the transaction pool will be ignored and the only transactions - /// included in the payload will be those sent through the attributes. - /// - /// Given build arguments including an Optimism client, transaction pool, - /// and configuration, this function creates a transaction payload. Returns - /// a result indicating success with the payload or an error in case of failure. - async fn build_payload( + fn get_op_payload_builder_ctx( &self, - args: BuildArguments, OpBuiltPayload>, - best_payload: BlockCell, - ) -> Result<(), PayloadBuilderError> { - let block_build_start_time = Instant::now(); - let BuildArguments { - mut cached_reads, - config, - cancel: block_cancel, - } = args; - - // We log only every 100th block to reduce usage - let span = if cfg!(feature = "telemetry") - && config.parent_header.number % self.config.sampling_ratio == 0 - { - span!(Level::INFO, "build_payload") - } else { - tracing::Span::none() - }; - let _entered = span.enter(); - span.record( - "payload_id", - config.attributes.payload_attributes.id.to_string(), - ); - + config: reth_basic_payload_builder::PayloadConfig< + OpPayloadBuilderAttributes, + >, + cancel: CancellationToken, + extra_ctx: FlashblocksExtraCtx, + ) -> eyre::Result> { let chain_spec = self.client.chain_spec(); let timestamp = config.attributes.timestamp(); - let calculate_state_root = self.config.specific.calculate_state_root; let block_env_attributes = OpNextBlockEnvAttributes { timestamp, suggested_fee_recipient: config.attributes.suggested_fee_recipient(), @@ -264,7 +230,7 @@ where config .attributes .get_holocene_extra_data(chain_spec.base_fee_params_at_timestamp(timestamp)) - .map_err(PayloadBuilderError::other)? + .wrap_err("failed to get holocene extra data for flashblocks payload builder")? } else { Default::default() }, @@ -273,35 +239,78 @@ where let evm_env = self .evm_config .next_evm_env(&config.parent_header, &block_env_attributes) - .map_err(PayloadBuilderError::other)?; + .wrap_err("failed to create next evm env")?; - let mut ctx = OpPayloadBuilderCtx:: { + Ok(OpPayloadBuilderCtx:: { evm_config: self.evm_config.clone(), - chain_spec: self.client.chain_spec(), + chain_spec, config, evm_env, block_env_attributes, - // Here we use parent token because child token handing is only for proper flashblocks - cancel: block_cancel.clone(), + cancel, da_config: self.config.da_config.clone(), builder_signer: self.config.builder_signer, metrics: Default::default(), - extra_ctx: FlashblocksExtraCtx { - flashblock_index: 0, - target_flashblock_count: self.config.flashblocks_per_block(), - target_gas_for_batch: 0, - target_da_for_batch: None, - gas_per_batch: 0, - da_per_batch: None, - calculate_state_root, - }, + extra_ctx, max_gas_per_txn: self.config.max_gas_per_txn, address_gas_limiter: self.address_gas_limiter.clone(), + }) + } + + /// Constructs an Optimism payload from the transactions sent via the + /// Payload attributes by the sequencer. If the `no_tx_pool` argument is passed in + /// the payload attributes, the transaction pool will be ignored and the only transactions + /// included in the payload will be those sent through the attributes. + /// + /// Given build arguments including an Optimism client, transaction pool, + /// and configuration, this function creates a transaction payload. Returns + /// a result indicating success with the payload or an error in case of failure. + async fn build_payload( + &self, + args: BuildArguments, OpBuiltPayload>, + best_payload: BlockCell, + ) -> Result<(), PayloadBuilderError> { + let block_build_start_time = Instant::now(); + let BuildArguments { + mut cached_reads, + config, + cancel: block_cancel, + } = args; + + // We log only every 100th block to reduce usage + let span = if cfg!(feature = "telemetry") + && config.parent_header.number % self.config.sampling_ratio == 0 + { + span!(Level::INFO, "build_payload") + } else { + tracing::Span::none() }; + let _entered = span.enter(); + span.record( + "payload_id", + config.attributes.payload_attributes.id.to_string(), + ); + + let timestamp = config.attributes.timestamp(); + let calculate_state_root = self.config.specific.calculate_state_root; + let ctx = self + .get_op_payload_builder_ctx( + config.clone(), + block_cancel.clone(), + FlashblocksExtraCtx { + flashblock_index: 0, + target_flashblock_count: self.config.flashblocks_per_block(), + target_gas_for_batch: 0, + target_da_for_batch: None, + gas_per_batch: 0, + da_per_batch: None, + calculate_state_root, + }, + ) + .map_err(|e| PayloadBuilderError::Other(e.into()))?; let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?; let db = StateProviderDatabase::new(&state_provider); - self.address_gas_limiter.refresh(ctx.block_number()); // 1. execute the pre steps and seal an early block with that @@ -395,11 +404,10 @@ where // We adjust our flashblocks timings based on time_drift if dynamic adjustment enable let (flashblocks_per_block, first_flashblock_offset) = self.calculate_flashblocks(timestamp); - ctx.set_target_flashblock_count(flashblocks_per_block); info!( target: "payload_builder", message = "Performed flashblocks timing derivation", - flashblocks_per_block = ctx.target_flashblock_count(), + flashblocks_per_block, first_flashblock_offset = first_flashblock_offset.as_millis(), flashblocks_interval = self.config.specific.interval.as_millis(), ); @@ -411,12 +419,12 @@ where ctx.metrics .first_flashblock_time_offset .record(first_flashblock_offset.as_millis() as f64); - let gas_per_batch = ctx.block_gas_limit() / ctx.target_flashblock_count(); + let gas_per_batch = ctx.block_gas_limit() / flashblocks_per_block; let target_gas_for_batch = gas_per_batch; let da_per_batch = ctx .da_config .max_da_block_size() - .map(|da_limit| da_limit / ctx.target_flashblock_count()); + .map(|da_limit| da_limit / flashblocks_per_block); // Check that builder tx won't affect fb limit too much if let Some(da_limit) = da_per_batch { // We error if we can't insert any tx aside from builder tx in flashblock @@ -426,16 +434,26 @@ where ); } } - let mut total_da_per_batch = da_per_batch; + let mut target_da_for_batch = da_per_batch; // Account for already included builder tx - ctx.extra_ctx.target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); - if let Some(da_limit) = total_da_per_batch.as_mut() { + if let Some(da_limit) = target_da_for_batch.as_mut() { *da_limit = da_limit.saturating_sub(builder_tx_da_size); } - ctx.extra_ctx.target_da_for_batch = total_da_per_batch; - ctx.extra_ctx.gas_per_batch = gas_per_batch; - ctx.extra_ctx.da_per_batch = da_per_batch; + let extra_ctx = FlashblocksExtraCtx { + flashblock_index: 1, + target_flashblock_count: flashblocks_per_block, + target_gas_for_batch: target_gas_for_batch.saturating_sub(builder_tx_gas), + target_da_for_batch, + gas_per_batch, + da_per_batch, + calculate_state_root, + }; + + let mut fb_cancel = block_cancel.child_token(); + let mut ctx = self + .get_op_payload_builder_ctx(config, fb_cancel.clone(), extra_ctx) + .map_err(|e| PayloadBuilderError::Other(e.into()))?; // Create best_transaction iterator let mut best_txs = BestFlashblocksTxs::new(BestPayloadTransactions::new( @@ -444,8 +462,6 @@ where )); let interval = self.config.specific.interval; let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); - let mut fb_cancel = block_cancel.child_token(); - ctx.cancel = fb_cancel.clone(); tokio::spawn({ let block_cancel = block_cancel.clone(); @@ -471,7 +487,6 @@ where // this will only happen if the `build_payload` function returns, // due to payload building error or the main cancellation token being // cancelled. - return; } } _ = block_cancel.cancelled() => { @@ -496,9 +511,9 @@ where let _entered = fb_span.enter(); // build first flashblock immediately - match self + let next_flashblocks_ctx = match self .build_next_flashblock( - &mut ctx, + &ctx, &mut info, &mut state, &state_provider, @@ -509,7 +524,17 @@ where ) .await { - Ok(()) => {} + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } Err(err) => { error!( target: "payload_builder", @@ -520,11 +545,11 @@ where ); return Err(PayloadBuilderError::Other(err.into())); } - } + }; tokio::select! { Some(fb_cancel) = rx.recv() => { - ctx.cancel = fb_cancel; + ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx); }, _ = block_cancel.cancelled() => { self.record_flashblocks_metrics( @@ -546,7 +571,7 @@ where P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, >( &self, - ctx: &mut OpPayloadBuilderCtx, + ctx: &OpPayloadBuilderCtx, info: &mut ExecutionInfo, state: &mut State, state_provider: impl reth::providers::StateProvider + Clone, @@ -554,10 +579,7 @@ where block_cancel: &CancellationToken, best_payload: &BlockCell, span: &tracing::Span, - ) -> eyre::Result<()> { - // fallback block is index 0, so we need to increment here - ctx.increment_flashblock_index(); - + ) -> eyre::Result> { // TODO: remove this if ctx.flashblock_index() > ctx.target_flashblock_count() { info!( @@ -567,12 +589,12 @@ where block_number = ctx.block_number(), "Skipping flashblock reached target", ); - return Ok(()); + return Ok(None); }; // Continue with flashblock building let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; - let mut target_da_per_batch = ctx.extra_ctx.target_da_for_batch; + let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; info!( target: "payload_builder", @@ -580,7 +602,7 @@ where flashblock_index = ctx.flashblock_index(), target_gas = target_gas_for_batch, gas_used = info.cumulative_gas_used, - target_da = target_da_per_batch, + target_da = target_da_for_batch, da_used = info.cumulative_da_bytes_used, block_gas_used = ctx.block_gas_limit(), "Building flashblock", @@ -604,7 +626,7 @@ where target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size - if let Some(da_limit) = target_da_per_batch.as_mut() { + if let Some(da_limit) = target_da_for_batch.as_mut() { *da_limit = da_limit.saturating_sub(builder_tx_da_size); } @@ -630,7 +652,7 @@ where state, best_txs, target_gas_for_batch.min(ctx.block_gas_limit()), - target_da_per_batch, + target_da_for_batch, ) .wrap_err("failed to execute best transactions")?; // Extract last transactions @@ -651,7 +673,7 @@ where span, "Payload building complete, channel closed or job cancelled", ); - return Ok(()); + return Ok(None); } let payload_tx_simulation_time = tx_execution_start_time.elapsed(); @@ -662,15 +684,11 @@ where .payload_tx_simulation_gauge .set(payload_tx_simulation_time); - match self + if let Err(e) = self .builder_tx .add_builder_txs(&state_provider, info, ctx, state, false) { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error simulating builder txs: {}", e); - vec![] - } + error!(target: "payload_builder", "Error simulating builder txs: {}", e); }; let total_block_built_duration = Instant::now(); @@ -690,8 +708,8 @@ where match build_result { Err(err) => { - ctx.metrics.invalid_blocks_count.increment(1); - return Err(err).wrap_err("failed to build payload"); + ctx.metrics.invalid_built_blocks_count.increment(1); + Err(err).wrap_err("failed to build payload") } Ok((new_payload, mut fb_payload)) => { fb_payload.index = ctx.flashblock_index(); @@ -707,7 +725,7 @@ where span, "Payload building complete, channel closed or job cancelled", ); - return Ok(()); + return Ok(None); } let flashblock_byte_size = self .ws_pub @@ -732,7 +750,7 @@ where // Update bundle_state for next iteration if let Some(da_limit) = ctx.extra_ctx.da_per_batch { - if let Some(da) = target_da_per_batch.as_mut() { + if let Some(da) = target_da_for_batch.as_mut() { *da += da_limit; } else { error!( @@ -741,8 +759,17 @@ where } } - ctx.extra_ctx.target_gas_for_batch += ctx.extra_ctx.gas_per_batch; - ctx.extra_ctx.target_da_for_batch = target_da_per_batch; + let target_gas_for_batch = + ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; + let next_extra_ctx = FlashblocksExtraCtx { + flashblock_index: ctx.next_flashblock_index(), + target_flashblock_count: ctx.target_flashblock_count(), + target_gas_for_batch, + target_da_for_batch, + gas_per_batch: ctx.extra_ctx.gas_per_batch, + da_per_batch: ctx.extra_ctx.da_per_batch, + calculate_state_root: ctx.extra_ctx.calculate_state_root, + }; info!( target: "payload_builder", @@ -752,9 +779,10 @@ where current_da = info.cumulative_da_bytes_used, target_flashblocks = ctx.target_flashblock_count(), ); + + Ok(Some(next_extra_ctx)) } } - Ok(()) } /// Do some logging and metric recording when we stop build flashblocks @@ -785,7 +813,6 @@ where message = message, flashblocks_per_block = flashblocks_per_block, flashblock_index = ctx.flashblock_index(), - config_flashblocks_per_block = self.config.flashblocks_per_block(), ); span.record("flashblock_count", ctx.flashblock_index()); @@ -801,6 +828,7 @@ where self.config.specific.interval - self.config.specific.leeway_time, ); } + // We use this system time to determine remining time to build a block // Things to consider: // FCU(a) - FCU with attributes @@ -893,13 +921,13 @@ where .map_err(PayloadBuilderError::other)? .apply_pre_execution_changes()?; - // 3. execute sequencer transactions + // 2. execute sequencer transactions let info = ctx.execute_sequencer_transactions(state)?; Ok(info) } -fn build_block( +pub(super) fn build_block( state: &mut State, ctx: &OpPayloadBuilderCtx, info: &mut ExecutionInfo, @@ -946,7 +974,7 @@ where .expect("Number is in range"); // TODO: maybe recreate state with bundle in here - // // calculate the state root + // calculate the state root let state_root_start_time = Instant::now(); let mut state_root = B256::ZERO; let mut trie_output = TrieUpdates::default(); @@ -1054,7 +1082,7 @@ where }, trie: ExecutedTrieUpdates::Present(Arc::new(trie_output)), }; - info!(target: "payload_builder", message = "Executed block created"); + debug!(target: "payload_builder", message = "Executed block created"); let sealed_block = Arc::new(block.seal_slow()); debug!(target: "payload_builder", ?sealed_block, "sealed built block"); diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index d053866d..4c1108d0 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -1,11 +1,34 @@ -use crate::builders::flashblocks::p2p::Message; +use crate::{ + builders::flashblocks::{ctx::OpPayloadSyncerCtx, p2p::Message, payload::ExtraExecutionInfo}, + primitives::reth::ExecutionInfo, + traits::ClientBounds, +}; +use alloy_evm::eth::receipt_builder::ReceiptBuilderCtx; +use alloy_primitives::B64; +use eyre::{WrapErr as _, bail}; +use futures::stream::FuturesUnordered; +use futures_util::StreamExt as _; +use op_alloy_consensus::OpTxEnvelope; +use reth::revm::{State, database::StateProviderDatabase}; +use reth_basic_payload_builder::PayloadConfig; +use reth_evm::FromRecoveredTx; use reth_node_builder::Events; -use reth_optimism_node::OpEngineTypes; +use reth_optimism_chainspec::OpChainSpec; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_node::{OpEngineTypes, OpPayloadBuilderAttributes}; use reth_optimism_payload_builder::OpBuiltPayload; +use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; +use reth_payload_builder::EthPayloadBuilderAttributes; +use rollup_boost::FlashblocksPayloadV1; +use std::sync::Arc; use tokio::sync::mpsc; -pub(crate) struct PayloadHandler { - // receives new payloads built by us. +/// Handles newly built or received flashblock payloads. +/// +/// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. +/// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. +pub(crate) struct PayloadHandler { + // receives new payloads built by this builder. built_rx: mpsc::Receiver, // receives incoming p2p messages from peers. p2p_rx: mpsc::Receiver, @@ -13,20 +36,35 @@ pub(crate) struct PayloadHandler { p2p_tx: mpsc::Sender, // sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received. payload_events_handle: tokio::sync::broadcast::Sender>, + // context required for execution of blocks during syncing + ctx: OpPayloadSyncerCtx, + // chain client + client: Client, + cancel: tokio_util::sync::CancellationToken, } -impl PayloadHandler { +impl PayloadHandler +where + Client: ClientBounds + 'static, +{ + #[allow(clippy::too_many_arguments)] pub(crate) fn new( built_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, + ctx: OpPayloadSyncerCtx, + client: Client, + cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { built_rx, p2p_rx, p2p_tx, payload_events_handle, + ctx, + client, + cancel, } } @@ -36,27 +74,370 @@ impl PayloadHandler { mut p2p_rx, p2p_tx, payload_events_handle, + ctx, + client, + cancel, } = self; - tracing::info!("flashblocks payload handler started"); + tracing::debug!("flashblocks payload handler started"); + + let mut execute_flashblock_futures = FuturesUnordered::new(); loop { tokio::select! { Some(payload) = built_rx.recv() => { let _ = payload_events_handle.send(Events::BuiltPayload(payload.clone())); - // TODO: only broadcast if `!no_tx_pool`? - // ignore error here; if p2p was disabled, the channel will be closed. + // ignore error here; if p2p is disabled, the channel will be closed. let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message { Message::OpBuiltPayload(payload) => { let payload: OpBuiltPayload = payload.into(); - let _ = payload_events_handle.send(Events::BuiltPayload(payload)); + let handle = tokio::spawn( + execute_flashblock( + payload, + ctx.clone(), + client.clone(), + cancel.clone(), + ) + ); + execute_flashblock_futures.push(handle); + } + } + } + Some(res) = execute_flashblock_futures.next() => { + match res { + Ok(Ok((payload, _))) => { + tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed flashblock"); + let _ = payload_events_handle.send(Events::BuiltPayload(payload)); + } + Ok(Err(e)) => { + tracing::error!(error = ?e, "failed to execute flashblock"); + } + Err(e) => { + tracing::error!(error = ?e, "task panicked while executing flashblock"); } } } + else => break, + } + } + } +} + +async fn execute_flashblock( + payload: OpBuiltPayload, + ctx: OpPayloadSyncerCtx, + client: Client, + cancel: tokio_util::sync::CancellationToken, +) -> eyre::Result<(OpBuiltPayload, FlashblocksPayloadV1)> +where + Client: ClientBounds, +{ + use alloy_consensus::BlockHeader as _; + use reth::primitives::SealedHeader; + use reth_evm::{ConfigureEvm as _, execute::BlockBuilder as _}; + + let start = tokio::time::Instant::now(); + + tracing::info!(header = ?payload.block().header(), "executing flashblock"); + + let mut cached_reads = reth::revm::cached::CachedReads::default(); + let parent_hash = payload.block().sealed_header().parent_hash; + let parent_header = client + .header_by_id(parent_hash.into()) + .wrap_err("failed to get parent header")? + .ok_or_else(|| eyre::eyre!("parent header not found"))?; + + let state_provider = client + .state_by_block_hash(parent_hash) + .wrap_err("failed to get state for parent hash")?; + let db = StateProviderDatabase::new(&state_provider); + let mut state = State::builder() + .with_database(cached_reads.as_db_mut(db)) + .with_bundle_update() + .build(); + + let chain_spec = client.chain_spec(); + let timestamp = payload.block().header().timestamp(); + let block_env_attributes = OpNextBlockEnvAttributes { + timestamp, + suggested_fee_recipient: payload.block().sealed_header().beneficiary, + prev_randao: payload.block().sealed_header().mix_hash, + gas_limit: payload.block().sealed_header().gas_limit, + parent_beacon_block_root: payload.block().sealed_header().parent_beacon_block_root, + extra_data: payload.block().sealed_header().extra_data.clone(), + }; + + let evm_env = ctx + .evm_config() + .next_evm_env(&parent_header, &block_env_attributes) + .wrap_err("failed to create next evm env")?; + + ctx.evm_config() + .builder_for_next_block( + &mut state, + &Arc::new(SealedHeader::new(parent_header.clone(), parent_hash)), + block_env_attributes.clone(), + ) + .wrap_err("failed to create evm builder for next block")? + .apply_pre_execution_changes() + .wrap_err("failed to apply pre execution changes")?; + + let mut info = ExecutionInfo::with_capacity(payload.block().body().transactions.len()); + + let extra_data = payload.block().sealed_header().extra_data.clone(); + if extra_data.len() != 9 { + tracing::error!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock"); + bail!("extra data length should be 9 bytes"); + } + + // see https://specs.optimism.io/protocol/holocene/exec-engine.html#eip-1559-parameters-in-block-header + let eip_1559_parameters: B64 = extra_data[1..9].try_into().unwrap(); + let payload_config = PayloadConfig::new( + Arc::new(SealedHeader::new(parent_header.clone(), parent_hash)), + OpPayloadBuilderAttributes { + eip_1559_params: Some(eip_1559_parameters), + payload_attributes: EthPayloadBuilderAttributes { + id: payload.id(), // unused + parent: parent_hash, // unused + suggested_fee_recipient: payload.block().sealed_header().beneficiary, + withdrawals: payload + .block() + .body() + .withdrawals + .clone() + .unwrap_or_default(), + parent_beacon_block_root: payload.block().sealed_header().parent_beacon_block_root, + timestamp, + prev_randao: payload.block().sealed_header().mix_hash, + }, + ..Default::default() + }, + ); + + execute_transactions( + &mut info, + &mut state, + payload.block().body().transactions.clone(), + payload.block().header().gas_used, + ctx.evm_config(), + evm_env.clone(), + ctx.max_gas_per_txn(), + is_canyon_active(&chain_spec, timestamp), + is_regolith_active(&chain_spec, timestamp), + ) + .wrap_err("failed to execute best transactions")?; + + let builder_ctx = ctx.into_op_payload_builder_ctx( + payload_config, + evm_env.clone(), + block_env_attributes, + cancel, + ); + + let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( + &mut state, + &builder_ctx, + &mut info, + true, + ) + .wrap_err("failed to build flashblock")?; + + builder_ctx + .metrics + .flashblock_sync_duration + .record(start.elapsed()); + + if built_payload.block().hash() != payload.block().hash() { + tracing::error!( + expected = %payload.block().hash(), + got = %built_payload.block().hash(), + "flashblock hash mismatch after execution" + ); + builder_ctx.metrics.invalid_synced_blocks_count.increment(1); + bail!("flashblock hash mismatch after execution"); + } + + builder_ctx.metrics.block_synced_success.increment(1); + + tracing::info!(header = ?built_payload.block().header(), "successfully executed flashblock"); + Ok((built_payload, fb_payload)) +} + +#[allow(clippy::too_many_arguments)] +fn execute_transactions( + info: &mut ExecutionInfo, + state: &mut State, + txs: Vec, + gas_limit: u64, + evm_config: &reth_optimism_evm::OpEvmConfig, + evm_env: alloy_evm::EvmEnv, + max_gas_per_txn: Option, + is_canyon_active: bool, + is_regolith_active: bool, +) -> eyre::Result<()> { + use alloy_evm::{Evm as _, EvmError as _}; + use op_revm::{OpTransaction, transaction::deposit::DepositTransactionParts}; + use reth_evm::ConfigureEvm as _; + use reth_primitives_traits::SignerRecoverable as _; + use revm::{ + DatabaseCommit as _, + context::{TxEnv, result::ResultAndState}, + }; + + let mut evm = evm_config.evm_with_env(&mut *state, evm_env); + + for tx in txs { + let sender = tx + .recover_signer() + .wrap_err("failed to recover tx signer")?; + let tx_env = TxEnv::from_recovered_tx(&tx, sender); + let executable_tx = match tx { + OpTxEnvelope::Deposit(ref tx) => { + let deposit = DepositTransactionParts { + mint: Some(tx.mint), + source_hash: tx.source_hash, + is_system_transaction: tx.is_system_transaction, + }; + OpTransaction { + base: tx_env, + enveloped_tx: None, + deposit, + } + } + OpTxEnvelope::Legacy(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + OpTxEnvelope::Eip2930(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + OpTxEnvelope::Eip1559(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + OpTxEnvelope::Eip7702(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + }; + + let ResultAndState { result, state } = match evm.transact_raw(executable_tx) { + Ok(res) => res, + Err(err) => { + if let Some(err) = err.as_invalid_tx_err() { + // TODO: what invalid txs are allowed in the block? + // reverting txs should be allowed (?) but not straight up invalid ones + tracing::error!(error = %err, "skipping invalid transaction in flashblock"); + continue; + } + return Err(err).wrap_err("failed to execute flashblock transaction"); + } + }; + + if let Some(max_gas_per_txn) = max_gas_per_txn { + if result.gas_used() > max_gas_per_txn { + return Err(eyre::eyre!( + "transaction exceeded max gas per txn limit in flashblock" + )); } } + + let tx_gas_used = result.gas_used(); + info.cumulative_gas_used = info + .cumulative_gas_used + .checked_add(tx_gas_used) + .ok_or_else(|| { + eyre::eyre!("total gas used overflowed when executing flashblock transactions") + })?; + if info.cumulative_gas_used > gas_limit { + bail!("flashblock exceeded gas limit when executing transactions"); + } + + let depositor_nonce = (is_regolith_active && tx.is_deposit()) + .then(|| { + evm.db_mut() + .load_cache_account(sender) + .map(|acc| acc.account_info().unwrap_or_default().nonce) + }) + .transpose() + .wrap_err("failed to get depositor nonce")?; + + let ctx = ReceiptBuilderCtx { + tx: &tx, + evm: &evm, + result, + state: &state, + cumulative_gas_used: info.cumulative_gas_used, + }; + + info.receipts.push(build_receipt( + evm_config, + ctx, + depositor_nonce, + is_canyon_active, + )); + + evm.db_mut().commit(state); + + // append sender and transaction to the respective lists + info.executed_senders.push(sender); + info.executed_transactions.push(tx.clone()); } + + Ok(()) +} + +fn build_receipt( + evm_config: &OpEvmConfig, + ctx: ReceiptBuilderCtx<'_, OpTransactionSigned, E>, + deposit_nonce: Option, + is_canyon_active: bool, +) -> OpReceipt { + use alloy_consensus::Eip658Value; + use alloy_op_evm::block::receipt_builder::OpReceiptBuilder as _; + use op_alloy_consensus::OpDepositReceipt; + use reth_evm::ConfigureEvm as _; + + let receipt_builder = evm_config.block_executor_factory().receipt_builder(); + match receipt_builder.build_receipt(ctx) { + Ok(receipt) => receipt, + Err(ctx) => { + let receipt = alloy_consensus::Receipt { + // Success flag was added in `EIP-658: Embedding transaction status code + // in receipts`. + status: Eip658Value::Eip658(ctx.result.is_success()), + cumulative_gas_used: ctx.cumulative_gas_used, + logs: ctx.result.into_logs(), + }; + + receipt_builder.build_deposit_receipt(OpDepositReceipt { + inner: receipt, + deposit_nonce, + // The deposit receipt version was introduced in Canyon to indicate an + // update to how receipt hashes should be computed + // when set. The state transition process ensures + // this is only set for post-Canyon deposit + // transactions. + deposit_receipt_version: is_canyon_active.then_some(1), + }) + } + } +} + +fn is_canyon_active(chain_spec: &OpChainSpec, timestamp: u64) -> bool { + use reth_optimism_chainspec::OpHardforks as _; + chain_spec.is_canyon_active_at_timestamp(timestamp) +} + +fn is_regolith_active(chain_spec: &OpChainSpec, timestamp: u64) -> bool { + use reth_optimism_chainspec::OpHardforks as _; + chain_spec.is_regolith_active_at_timestamp(timestamp) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 4c7f5e98..25d928c4 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -12,6 +12,7 @@ use crate::{ generator::BlockPayloadJobGenerator, }, flashtestations::service::bootstrap_flashtestations, + metrics::OpRBuilderMetrics, traits::{NodeBounds, PoolBounds}, }; use eyre::WrapErr as _; @@ -21,6 +22,7 @@ use reth_node_builder::{BuilderContext, components::PayloadServiceBuilder}; use reth_optimism_evm::OpEvmConfig; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_provider::CanonStateSubscriptions; +use std::sync::Arc; pub struct FlashblocksServiceBuilder(pub BuilderConfig); @@ -36,6 +38,10 @@ impl FlashblocksServiceBuilder { Pool: PoolBounds, BuilderTx: BuilderTransactions + Unpin + Clone + Send + Sync + 'static, { + // TODO: is there a different global token? + // this is effectively unused right now due to the usage of reth's `task_executor`. + let cancel = tokio_util::sync::CancellationToken::new(); + let (incoming_message_rx, outgoing_message_tx) = if self.0.p2p_enabled { let mut builder = p2p::NodeBuilder::new(); @@ -71,6 +77,7 @@ impl FlashblocksServiceBuilder { .with_protocol(FLASHBLOCKS_STREAM_PROTOCOL) .with_known_peers(known_peers) .with_port(self.0.p2p_port) + .with_cancellation_token(cancel.clone()) .try_build::() .wrap_err("failed to build flashblocks p2p node")?; let multiaddrs = node.multiaddrs(); @@ -91,6 +98,7 @@ impl FlashblocksServiceBuilder { (incoming_message_rx, outgoing_message_tx) }; + let metrics = Arc::new(OpRBuilderMetrics::default()); let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); let payload_builder = OpPayloadBuilder::new( OpEvmConfig::optimism(ctx.chain_spec()), @@ -99,6 +107,7 @@ impl FlashblocksServiceBuilder { self.0.clone(), builder_tx, built_payload_tx, + metrics.clone(), ) .wrap_err("failed to create flashblocks payload builder")?; @@ -116,11 +125,22 @@ impl FlashblocksServiceBuilder { let (payload_service, payload_builder_handle) = PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream()); + let syncer_ctx = crate::builders::flashblocks::ctx::OpPayloadSyncerCtx::new( + &ctx.provider().clone(), + self.0, + OpEvmConfig::optimism(ctx.chain_spec()), + metrics.clone(), + ) + .wrap_err("failed to create flashblocks payload builder context")?; + let payload_handler = PayloadHandler::new( built_payload_rx, incoming_message_rx, outgoing_message_tx, payload_service.payload_events_handle(), + syncer_ctx, + ctx.provider().clone(), + cancel, ); ctx.task_executor() diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index dd43d958..d5b54069 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -63,6 +63,8 @@ pub const VERSION: VersionInfo = VersionInfo { pub struct OpRBuilderMetrics { /// Block built success pub block_built_success: Counter, + /// Block synced success + pub block_synced_success: Counter, /// Number of flashblocks added to block (Total per block) pub flashblock_count: Histogram, /// Number of messages sent @@ -73,12 +75,16 @@ pub struct OpRBuilderMetrics { pub total_block_built_gauge: Gauge, /// Histogram of the time taken to build a Flashblock pub flashblock_build_duration: Histogram, + /// Histogram of the time taken to sync a Flashblock + pub flashblock_sync_duration: Histogram, /// Flashblock UTF8 payload byte size histogram pub flashblock_byte_size_histogram: Histogram, /// Histogram of transactions in a Flashblock pub flashblock_num_tx_histogram: Histogram, /// Number of invalid blocks - pub invalid_blocks_count: Counter, + pub invalid_built_blocks_count: Counter, + /// Number of invalid synced blocks + pub invalid_synced_blocks_count: Counter, /// Histogram of fetching transactions from the pool duration pub transaction_pool_fetch_duration: Histogram, /// Latest time taken to fetch tx from the pool diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index a61ea735..651123e9 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -1,7 +1,9 @@ use eyre::WrapErr as _; use libp2p::{ - autonat, connection_limits, connection_limits::ConnectionLimits, identify, identity, mdns, - ping, swarm::NetworkBehaviour, + Swarm, autonat, + connection_limits::{self, ConnectionLimits}, + identify, identity, mdns, ping, + swarm::NetworkBehaviour, }; use std::{convert::Infallible, time::Duration}; @@ -81,14 +83,22 @@ impl Behaviour { } impl BehaviourEvent { - pub(crate) async fn handle(self) { + pub(crate) fn handle(self, swarm: &mut Swarm) { match self { BehaviourEvent::Autonat(_event) => {} BehaviourEvent::Identify(_event) => {} BehaviourEvent::Mdns(event) => match event { mdns::Event::Discovered(list) => { for (peer_id, multiaddr) in list { - tracing::debug!("mDNS discovered peer {peer_id} at {multiaddr}"); + if swarm.is_connected(&peer_id) { + continue; + } + + tracing::info!("mDNS discovered peer {peer_id} at {multiaddr}"); + swarm.add_peer_address(peer_id, multiaddr); + swarm.dial(peer_id).unwrap_or_else(|e| { + tracing::error!("failed to dial mDNS discovered peer {peer_id}: {e}") + }); } } mdns::Event::Expired(list) => { diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 47f43232..f15f12b5 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -15,7 +15,7 @@ use libp2p::{ use std::{collections::HashMap, time::Duration}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; pub use libp2p::{Multiaddr, StreamProtocol}; @@ -144,6 +144,7 @@ impl Node { } Some(message) = outgoing_message_rx.recv() => { let protocol = message.protocol(); + debug!("received message to broadcast on protocol {protocol}"); if let Err(e) = outgoing_streams_handler.broadcast_message(message).await { warn!("failed to broadcast message on protocol {protocol}: {e:?}"); } @@ -166,28 +167,22 @@ impl Node { } => { // when a new connection is established, open outbound streams for each protocol // and add them to the outgoing streams handler. - // - // If we already have a connection with this peer, close the new connection, - // as we only want one connection per peer. - debug!("connection established with peer {peer_id}"); - if outgoing_streams_handler.has_peer(&peer_id) { - swarm.close_connection(connection_id); - debug!("already have connection with peer {peer_id}, closed connection {connection_id}"); - } else { + info!("connection established with peer {peer_id}"); + if !outgoing_streams_handler.has_peer(&peer_id) { for protocol in &protocols { - match swarm - .behaviour_mut() - .new_control() - .open_stream(peer_id, protocol.clone()) - .await - { - Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream); - debug!("opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); + match swarm + .behaviour_mut() + .new_control() + .open_stream(peer_id, protocol.clone()) + .await + { + Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream); + debug!("opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); + } + Err(e) => { + warn!("failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); + } } - Err(e) => { - warn!("failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); - } - } } } } @@ -196,10 +191,10 @@ impl Node { cause, .. } => { - debug!("connection closed with peer {peer_id}: {cause:?}"); + info!("connection closed with peer {peer_id}: {cause:?}"); outgoing_streams_handler.remove_peer(&peer_id); } - SwarmEvent::Behaviour(event) => event.handle().await, + SwarmEvent::Behaviour(event) => event.handle(&mut swarm), _ => continue, } }, @@ -268,6 +263,14 @@ impl NodeBuilder { self } + pub fn with_cancellation_token( + mut self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Self { + self.cancellation_token = Some(cancellation_token); + self + } + pub fn with_known_peers(mut self, addresses: I) -> Self where I: IntoIterator, diff --git a/crates/p2p/src/outgoing.rs b/crates/p2p/src/outgoing.rs index c948bde6..1f7cd31d 100644 --- a/crates/p2p/src/outgoing.rs +++ b/crates/p2p/src/outgoing.rs @@ -3,7 +3,7 @@ use eyre::Context; use futures::stream::FuturesUnordered; use libp2p::{PeerId, StreamProtocol, swarm::Stream}; use std::collections::HashMap; -use tracing::{debug, warn}; +use tracing::{info, warn}; pub(crate) struct StreamsHandler { peers_to_stream: HashMap>, @@ -63,7 +63,10 @@ impl StreamsHandler { let payload = payload.clone(); let fut = async move { let mut writer = FramedWrite::new(stream, LinesCodec::new()); - writer.send(payload).await?; + writer + .send(payload) + .await + .wrap_err("failed to send message to peer")?; Ok::<(PeerId, libp2p::swarm::Stream), eyre::ErrReport>(( peer, writer.into_inner().into_inner(), @@ -71,6 +74,7 @@ impl StreamsHandler { }; futures.push(fut); } + while let Some(result) = futures.next().await { match result { Ok((peer, stream)) => { @@ -85,7 +89,7 @@ impl StreamsHandler { } } } - debug!( + info!( "broadcasted message to {} peers", self.peers_to_stream.len() );