diff --git a/crates/op-rbuilder/src/builders/flashblocks/ctx.rs b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs new file mode 100644 index 00000000..c465445f --- /dev/null +++ b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs @@ -0,0 +1,81 @@ +use crate::{ + builders::{BuilderConfig, OpPayloadBuilderCtx, flashblocks::FlashblocksConfig}, + gas_limiter::{AddressGasLimiter, args::GasLimiterArgs}, + metrics::OpRBuilderMetrics, + traits::ClientBounds, +}; +use op_revm::OpSpecId; +use reth_basic_payload_builder::PayloadConfig; +use reth_evm::EvmEnv; +use reth_optimism_chainspec::OpChainSpec; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_payload_builder::{OpPayloadBuilderAttributes, config::OpDAConfig}; +use reth_optimism_primitives::OpTransactionSigned; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; + +#[derive(Debug, Clone)] +pub(super) struct OpPayloadSyncerCtx { + /// The type that knows how to perform system calls and configure the evm. + evm_config: OpEvmConfig, + /// The DA config for the payload builder + da_config: OpDAConfig, + /// The chainspec + chain_spec: Arc, + /// Max gas that can be used by a transaction. + max_gas_per_txn: Option, + /// The metrics for the builder + metrics: Arc, +} + +impl OpPayloadSyncerCtx { + pub(super) fn new( + client: &Client, + builder_config: BuilderConfig, + evm_config: OpEvmConfig, + metrics: Arc, + ) -> eyre::Result + where + Client: ClientBounds, + { + let chain_spec = client.chain_spec(); + Ok(Self { + evm_config, + da_config: builder_config.da_config.clone(), + chain_spec, + max_gas_per_txn: builder_config.max_gas_per_txn, + metrics, + }) + } + + pub(super) fn evm_config(&self) -> &OpEvmConfig { + &self.evm_config + } + + pub(super) fn max_gas_per_txn(&self) -> Option { + self.max_gas_per_txn + } + + pub(super) fn into_op_payload_builder_ctx( + self, + payload_config: PayloadConfig>, + evm_env: EvmEnv, + block_env_attributes: OpNextBlockEnvAttributes, + cancel: CancellationToken, + ) -> OpPayloadBuilderCtx { + OpPayloadBuilderCtx { + evm_config: self.evm_config, + da_config: self.da_config, + chain_spec: self.chain_spec, + config: payload_config, + evm_env, + block_env_attributes, + cancel, + builder_signer: None, + metrics: self.metrics, + extra_ctx: (), + max_gas_per_txn: self.max_gas_per_txn, + address_gas_limiter: AddressGasLimiter::new(GasLimiterArgs::default()), + } + } +} diff --git a/crates/op-rbuilder/src/builders/flashblocks/mod.rs b/crates/op-rbuilder/src/builders/flashblocks/mod.rs index 643bf39a..87d4494c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/mod.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/mod.rs @@ -6,6 +6,7 @@ use service::FlashblocksServiceBuilder; mod best_txs; mod builder_tx; mod config; +mod ctx; mod p2p; mod payload; mod payload_handler; diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 9e2237bc..89f19210 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -158,8 +158,8 @@ impl OpPayloadBuilder { config: BuilderConfig, builder_tx: BuilderTx, payload_tx: mpsc::Sender, + metrics: Arc, ) -> eyre::Result { - let metrics = Arc::new(OpRBuilderMetrics::default()); let ws_pub = WebSocketPublisher::new(config.specific.ws_addr, Arc::clone(&metrics))?.into(); let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone()); Ok(Self { @@ -710,7 +710,7 @@ where match build_result { Err(err) => { - ctx.metrics.invalid_blocks_count.increment(1); + ctx.metrics.invalid_built_blocks_count.increment(1); Err(err).wrap_err("failed to build payload") } Ok((new_payload, mut fb_payload)) => { diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index e19bb19c..4927a047 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -1,12 +1,35 @@ -use crate::builders::flashblocks::p2p::Message; +use crate::{ + builders::flashblocks::{ + ctx::OpPayloadSyncerCtx, p2p::Message, payload::FlashblocksExecutionInfo, + }, + primitives::reth::ExecutionInfo, + traits::ClientBounds, +}; +use alloy_evm::eth::receipt_builder::ReceiptBuilderCtx; +use alloy_primitives::B64; +use eyre::{WrapErr as _, bail}; +use op_alloy_consensus::OpTxEnvelope; +use reth::revm::{State, database::StateProviderDatabase}; +use reth_basic_payload_builder::PayloadConfig; +use reth_evm::FromRecoveredTx; use reth_node_builder::Events; -use reth_optimism_node::OpEngineTypes; +use reth_optimism_chainspec::OpChainSpec; +use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; +use reth_optimism_node::{OpEngineTypes, OpPayloadBuilderAttributes}; use reth_optimism_payload_builder::OpBuiltPayload; +use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; +use reth_payload_builder::EthPayloadBuilderAttributes; +use rollup_boost::FlashblocksPayloadV1; +use std::sync::Arc; use tokio::sync::mpsc; use tracing::warn; -pub(crate) struct PayloadHandler { - // receives new payloads built by us. +/// Handles newly built or received flashblock payloads. +/// +/// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. +/// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. +pub(crate) struct PayloadHandler { + // receives new payloads built by this builder. built_rx: mpsc::Receiver, // receives incoming p2p messages from peers. p2p_rx: mpsc::Receiver, @@ -14,20 +37,35 @@ pub(crate) struct PayloadHandler { p2p_tx: mpsc::Sender, // sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received. payload_events_handle: tokio::sync::broadcast::Sender>, + // context required for execution of blocks during syncing + ctx: OpPayloadSyncerCtx, + // chain client + client: Client, + cancel: tokio_util::sync::CancellationToken, } -impl PayloadHandler { +impl PayloadHandler +where + Client: ClientBounds + 'static, +{ + #[allow(clippy::too_many_arguments)] pub(crate) fn new( built_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, + ctx: OpPayloadSyncerCtx, + client: Client, + cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { built_rx, p2p_rx, p2p_tx, payload_events_handle, + ctx, + client, + cancel, } } @@ -37,9 +75,12 @@ impl PayloadHandler { mut p2p_rx, p2p_tx, payload_events_handle, + ctx, + client, + cancel, } = self; - tracing::info!("flashblocks payload handler started"); + tracing::debug!("flashblocks payload handler started"); loop { tokio::select! { @@ -54,11 +95,351 @@ impl PayloadHandler { match message { Message::OpBuiltPayload(payload) => { let payload: OpBuiltPayload = payload.into(); - let _ = payload_events_handle.send(Events::BuiltPayload(payload)); + let ctx = ctx.clone(); + let client = client.clone(); + let payload_events_handle = payload_events_handle.clone(); + let cancel = cancel.clone(); + + // execute the flashblock on a thread where blocking is acceptable, + // as it's potentially a heavy operation + tokio::task::spawn_blocking(move || { + let res = execute_flashblock( + payload, + ctx, + client, + cancel, + ); + match res { + Ok((payload, _)) => { + tracing::info!(hash = payload.block().hash().to_string(), block_number = payload.block().header().number, "successfully executed received flashblock"); + let _ = payload_events_handle.send(Events::BuiltPayload(payload)); + } + Err(e) => { + tracing::error!(error = ?e, "failed to execute received flashblock"); + } + } + }); } } } + else => break, } } } } + +fn execute_flashblock( + payload: OpBuiltPayload, + ctx: OpPayloadSyncerCtx, + client: Client, + cancel: tokio_util::sync::CancellationToken, +) -> eyre::Result<(OpBuiltPayload, FlashblocksPayloadV1)> +where + Client: ClientBounds, +{ + use alloy_consensus::BlockHeader as _; + use reth::primitives::SealedHeader; + use reth_evm::{ConfigureEvm as _, execute::BlockBuilder as _}; + + let start = tokio::time::Instant::now(); + + tracing::info!(header = ?payload.block().header(), "executing flashblock"); + + let mut cached_reads = reth::revm::cached::CachedReads::default(); + let parent_hash = payload.block().sealed_header().parent_hash; + let parent_header = client + .header_by_id(parent_hash.into()) + .wrap_err("failed to get parent header")? + .ok_or_else(|| eyre::eyre!("parent header not found"))?; + + let state_provider = client + .state_by_block_hash(parent_hash) + .wrap_err("failed to get state for parent hash")?; + let db = StateProviderDatabase::new(&state_provider); + let mut state = State::builder() + .with_database(cached_reads.as_db_mut(db)) + .with_bundle_update() + .build(); + + let chain_spec = client.chain_spec(); + let timestamp = payload.block().header().timestamp(); + let block_env_attributes = OpNextBlockEnvAttributes { + timestamp, + suggested_fee_recipient: payload.block().sealed_header().beneficiary, + prev_randao: payload.block().sealed_header().mix_hash, + gas_limit: payload.block().sealed_header().gas_limit, + parent_beacon_block_root: payload.block().sealed_header().parent_beacon_block_root, + extra_data: payload.block().sealed_header().extra_data.clone(), + }; + + let evm_env = ctx + .evm_config() + .next_evm_env(&parent_header, &block_env_attributes) + .wrap_err("failed to create next evm env")?; + + ctx.evm_config() + .builder_for_next_block( + &mut state, + &Arc::new(SealedHeader::new(parent_header.clone(), parent_hash)), + block_env_attributes.clone(), + ) + .wrap_err("failed to create evm builder for next block")? + .apply_pre_execution_changes() + .wrap_err("failed to apply pre execution changes")?; + + let mut info = ExecutionInfo::with_capacity(payload.block().body().transactions.len()); + + let extra_data = payload.block().sealed_header().extra_data.clone(); + if extra_data.len() != 9 { + tracing::error!(len = extra_data.len(), data = ?extra_data, "invalid extra data length in flashblock"); + bail!("extra data length should be 9 bytes"); + } + + // see https://specs.optimism.io/protocol/holocene/exec-engine.html#eip-1559-parameters-in-block-header + let eip_1559_parameters: B64 = extra_data[1..9].try_into().unwrap(); + let payload_config = PayloadConfig::new( + Arc::new(SealedHeader::new(parent_header.clone(), parent_hash)), + OpPayloadBuilderAttributes { + eip_1559_params: Some(eip_1559_parameters), + payload_attributes: EthPayloadBuilderAttributes { + id: payload.id(), // unused + parent: parent_hash, // unused + suggested_fee_recipient: payload.block().sealed_header().beneficiary, + withdrawals: payload + .block() + .body() + .withdrawals + .clone() + .unwrap_or_default(), + parent_beacon_block_root: payload.block().sealed_header().parent_beacon_block_root, + timestamp, + prev_randao: payload.block().sealed_header().mix_hash, + }, + ..Default::default() + }, + ); + + execute_transactions( + &mut info, + &mut state, + payload.block().body().transactions.clone(), + payload.block().header().gas_used, + ctx.evm_config(), + evm_env.clone(), + ctx.max_gas_per_txn(), + is_canyon_active(&chain_spec, timestamp), + is_regolith_active(&chain_spec, timestamp), + ) + .wrap_err("failed to execute best transactions")?; + + let builder_ctx = ctx.into_op_payload_builder_ctx( + payload_config, + evm_env.clone(), + block_env_attributes, + cancel, + ); + + let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( + &mut state, + &builder_ctx, + &mut info, + true, + ) + .wrap_err("failed to build flashblock")?; + + builder_ctx + .metrics + .flashblock_sync_duration + .record(start.elapsed()); + + if built_payload.block().hash() != payload.block().hash() { + tracing::error!( + expected = %payload.block().hash(), + got = %built_payload.block().hash(), + "flashblock hash mismatch after execution" + ); + builder_ctx.metrics.invalid_synced_blocks_count.increment(1); + bail!("flashblock hash mismatch after execution"); + } + + builder_ctx.metrics.block_synced_success.increment(1); + + tracing::info!(header = ?built_payload.block().header(), "successfully executed flashblock"); + Ok((built_payload, fb_payload)) +} + +#[allow(clippy::too_many_arguments)] +fn execute_transactions( + info: &mut ExecutionInfo, + state: &mut State, + txs: Vec, + gas_limit: u64, + evm_config: &reth_optimism_evm::OpEvmConfig, + evm_env: alloy_evm::EvmEnv, + max_gas_per_txn: Option, + is_canyon_active: bool, + is_regolith_active: bool, +) -> eyre::Result<()> { + use alloy_evm::{Evm as _, EvmError as _}; + use op_revm::{OpTransaction, transaction::deposit::DepositTransactionParts}; + use reth_evm::ConfigureEvm as _; + use reth_primitives_traits::SignerRecoverable as _; + use revm::{ + DatabaseCommit as _, + context::{TxEnv, result::ResultAndState}, + }; + + let mut evm = evm_config.evm_with_env(&mut *state, evm_env); + + for tx in txs { + let sender = tx + .recover_signer() + .wrap_err("failed to recover tx signer")?; + let tx_env = TxEnv::from_recovered_tx(&tx, sender); + let executable_tx = match tx { + OpTxEnvelope::Deposit(ref tx) => { + let deposit = DepositTransactionParts { + mint: Some(tx.mint), + source_hash: tx.source_hash, + is_system_transaction: tx.is_system_transaction, + }; + OpTransaction { + base: tx_env, + enveloped_tx: None, + deposit, + } + } + OpTxEnvelope::Legacy(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + OpTxEnvelope::Eip2930(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + OpTxEnvelope::Eip1559(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + OpTxEnvelope::Eip7702(_) => { + let mut tx = OpTransaction::new(tx_env); + tx.enveloped_tx = Some(vec![0x00].into()); + tx + } + }; + + let ResultAndState { result, state } = match evm.transact_raw(executable_tx) { + Ok(res) => res, + Err(err) => { + if let Some(err) = err.as_invalid_tx_err() { + // TODO: what invalid txs are allowed in the block? + // reverting txs should be allowed (?) but not straight up invalid ones + tracing::error!(error = %err, "skipping invalid transaction in flashblock"); + continue; + } + return Err(err).wrap_err("failed to execute flashblock transaction"); + } + }; + + if let Some(max_gas_per_txn) = max_gas_per_txn { + if result.gas_used() > max_gas_per_txn { + return Err(eyre::eyre!( + "transaction exceeded max gas per txn limit in flashblock" + )); + } + } + + let tx_gas_used = result.gas_used(); + info.cumulative_gas_used = info + .cumulative_gas_used + .checked_add(tx_gas_used) + .ok_or_else(|| { + eyre::eyre!("total gas used overflowed when executing flashblock transactions") + })?; + if info.cumulative_gas_used > gas_limit { + bail!("flashblock exceeded gas limit when executing transactions"); + } + + let depositor_nonce = (is_regolith_active && tx.is_deposit()) + .then(|| { + evm.db_mut() + .load_cache_account(sender) + .map(|acc| acc.account_info().unwrap_or_default().nonce) + }) + .transpose() + .wrap_err("failed to get depositor nonce")?; + + let ctx = ReceiptBuilderCtx { + tx: &tx, + evm: &evm, + result, + state: &state, + cumulative_gas_used: info.cumulative_gas_used, + }; + + info.receipts.push(build_receipt( + evm_config, + ctx, + depositor_nonce, + is_canyon_active, + )); + + evm.db_mut().commit(state); + + // append sender and transaction to the respective lists + info.executed_senders.push(sender); + info.executed_transactions.push(tx.clone()); + } + + Ok(()) +} + +fn build_receipt( + evm_config: &OpEvmConfig, + ctx: ReceiptBuilderCtx<'_, OpTransactionSigned, E>, + deposit_nonce: Option, + is_canyon_active: bool, +) -> OpReceipt { + use alloy_consensus::Eip658Value; + use alloy_op_evm::block::receipt_builder::OpReceiptBuilder as _; + use op_alloy_consensus::OpDepositReceipt; + use reth_evm::ConfigureEvm as _; + + let receipt_builder = evm_config.block_executor_factory().receipt_builder(); + match receipt_builder.build_receipt(ctx) { + Ok(receipt) => receipt, + Err(ctx) => { + let receipt = alloy_consensus::Receipt { + // Success flag was added in `EIP-658: Embedding transaction status code + // in receipts`. + status: Eip658Value::Eip658(ctx.result.is_success()), + cumulative_gas_used: ctx.cumulative_gas_used, + logs: ctx.result.into_logs(), + }; + + receipt_builder.build_deposit_receipt(OpDepositReceipt { + inner: receipt, + deposit_nonce, + // The deposit receipt version was introduced in Canyon to indicate an + // update to how receipt hashes should be computed + // when set. The state transition process ensures + // this is only set for post-Canyon deposit + // transactions. + deposit_receipt_version: is_canyon_active.then_some(1), + }) + } + } +} + +fn is_canyon_active(chain_spec: &OpChainSpec, timestamp: u64) -> bool { + use reth_optimism_chainspec::OpHardforks as _; + chain_spec.is_canyon_active_at_timestamp(timestamp) +} + +fn is_regolith_active(chain_spec: &OpChainSpec, timestamp: u64) -> bool { + use reth_optimism_chainspec::OpHardforks as _; + chain_spec.is_regolith_active_at_timestamp(timestamp) +} diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 68d4b036..e11fa2f2 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -12,6 +12,7 @@ use crate::{ generator::BlockPayloadJobGenerator, }, flashtestations::service::bootstrap_flashtestations, + metrics::OpRBuilderMetrics, traits::{NodeBounds, PoolBounds}, }; use eyre::WrapErr as _; @@ -21,6 +22,7 @@ use reth_node_builder::{BuilderContext, components::PayloadServiceBuilder}; use reth_optimism_evm::OpEvmConfig; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_provider::CanonStateSubscriptions; +use std::sync::Arc; pub struct FlashblocksServiceBuilder(pub BuilderConfig); @@ -41,6 +43,10 @@ impl FlashblocksServiceBuilder { + Sync + 'static, { + // TODO: is there a different global token? + // this is effectively unused right now due to the usage of reth's `task_executor`. + let cancel = tokio_util::sync::CancellationToken::new(); + let (incoming_message_rx, outgoing_message_tx) = if self.0.specific.p2p_enabled { let mut builder = p2p::NodeBuilder::new(); @@ -76,6 +82,7 @@ impl FlashblocksServiceBuilder { .with_protocol(FLASHBLOCKS_STREAM_PROTOCOL) .with_known_peers(known_peers) .with_port(self.0.specific.p2p_port) + .with_cancellation_token(cancel.clone()) .with_max_peer_count(self.0.specific.p2p_max_peer_count) .try_build::() .wrap_err("failed to build flashblocks p2p node")?; @@ -97,6 +104,7 @@ impl FlashblocksServiceBuilder { (incoming_message_rx, outgoing_message_tx) }; + let metrics = Arc::new(OpRBuilderMetrics::default()); let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); let payload_builder = OpPayloadBuilder::new( OpEvmConfig::optimism(ctx.chain_spec()), @@ -105,9 +113,9 @@ impl FlashblocksServiceBuilder { self.0.clone(), builder_tx, built_payload_tx, + metrics.clone(), ) .wrap_err("failed to create flashblocks payload builder")?; - let payload_job_config = BasicPayloadJobGeneratorConfig::default(); let payload_generator = BlockPayloadJobGenerator::with_builder( @@ -122,11 +130,22 @@ impl FlashblocksServiceBuilder { let (payload_service, payload_builder_handle) = PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream()); + let syncer_ctx = crate::builders::flashblocks::ctx::OpPayloadSyncerCtx::new( + &ctx.provider().clone(), + self.0, + OpEvmConfig::optimism(ctx.chain_spec()), + metrics.clone(), + ) + .wrap_err("failed to create flashblocks payload builder context")?; + let payload_handler = PayloadHandler::new( built_payload_rx, incoming_message_rx, outgoing_message_tx, payload_service.payload_events_handle(), + syncer_ctx, + ctx.provider().clone(), + cancel, ); ctx.task_executor() diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 31fe081c..c268c43c 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -63,6 +63,8 @@ pub const VERSION: VersionInfo = VersionInfo { pub struct OpRBuilderMetrics { /// Block built success pub block_built_success: Counter, + /// Block synced success + pub block_synced_success: Counter, /// Number of flashblocks added to block (Total per block) pub flashblock_count: Histogram, /// Number of messages sent @@ -73,12 +75,16 @@ pub struct OpRBuilderMetrics { pub total_block_built_gauge: Gauge, /// Histogram of the time taken to build a Flashblock pub flashblock_build_duration: Histogram, + /// Histogram of the time taken to sync a Flashblock + pub flashblock_sync_duration: Histogram, /// Flashblock UTF8 payload byte size histogram pub flashblock_byte_size_histogram: Histogram, /// Histogram of transactions in a Flashblock pub flashblock_num_tx_histogram: Histogram, /// Number of invalid blocks - pub invalid_blocks_count: Counter, + pub invalid_built_blocks_count: Counter, + /// Number of invalid synced blocks + pub invalid_synced_blocks_count: Counter, /// Histogram of fetching transactions from the pool duration pub transaction_pool_fetch_duration: Histogram, /// Latest time taken to fetch tx from the pool diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 140d309b..bedfb78e 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -1,7 +1,9 @@ use eyre::WrapErr as _; use libp2p::{ - autonat, connection_limits, connection_limits::ConnectionLimits, identify, identity, mdns, - ping, swarm::NetworkBehaviour, + Swarm, autonat, + connection_limits::{self, ConnectionLimits}, + identify, identity, mdns, ping, + swarm::NetworkBehaviour, }; use std::{convert::Infallible, time::Duration}; @@ -84,14 +86,22 @@ impl Behaviour { } impl BehaviourEvent { - pub(crate) fn handle(self) { + pub(crate) fn handle(self, swarm: &mut Swarm) { match self { BehaviourEvent::Autonat(_event) => {} BehaviourEvent::Identify(_event) => {} BehaviourEvent::Mdns(event) => match event { mdns::Event::Discovered(list) => { for (peer_id, multiaddr) in list { + if swarm.is_connected(&peer_id) { + continue; + } + tracing::debug!("mDNS discovered peer {peer_id} at {multiaddr}"); + swarm.add_peer_address(peer_id, multiaddr); + swarm.dial(peer_id).unwrap_or_else(|e| { + tracing::error!("failed to dial mDNS discovered peer {peer_id}: {e}") + }); } } mdns::Event::Expired(list) => { diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index fba12aa1..757be3b2 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -147,6 +147,7 @@ impl Node { } Some(message) = outgoing_message_rx.recv() => { let protocol = message.protocol(); + debug!("received message to broadcast on protocol {protocol}"); if let Err(e) = outgoing_streams_handler.broadcast_message(message).await { warn!("failed to broadcast message on protocol {protocol}: {e:?}"); } @@ -169,28 +170,22 @@ impl Node { } => { // when a new connection is established, open outbound streams for each protocol // and add them to the outgoing streams handler. - // - // If we already have a connection with this peer, close the new connection, - // as we only want one connection per peer. debug!("connection established with peer {peer_id}"); - if outgoing_streams_handler.has_peer(&peer_id) { - swarm.close_connection(connection_id); - debug!("already have connection with peer {peer_id}, closed connection {connection_id}"); - } else { + if !outgoing_streams_handler.has_peer(&peer_id) { for protocol in &protocols { - match swarm - .behaviour_mut() - .new_control() - .open_stream(peer_id, protocol.clone()) - .await - { - Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream); - debug!("opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); + match swarm + .behaviour_mut() + .new_control() + .open_stream(peer_id, protocol.clone()) + .await + { + Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream); + debug!("opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); + } + Err(e) => { + warn!("failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); + } } - Err(e) => { - warn!("failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); - } - } } } } @@ -202,7 +197,7 @@ impl Node { debug!("connection closed with peer {peer_id}: {cause:?}"); outgoing_streams_handler.remove_peer(&peer_id); } - SwarmEvent::Behaviour(event) => event.handle(), + SwarmEvent::Behaviour(event) => event.handle(&mut swarm), _ => continue, } }, @@ -273,6 +268,14 @@ impl NodeBuilder { self } + pub fn with_cancellation_token( + mut self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Self { + self.cancellation_token = Some(cancellation_token); + self + } + pub fn with_max_peer_count(mut self, max_peer_count: u32) -> Self { self.max_peer_count = Some(max_peer_count); self diff --git a/crates/p2p/src/outgoing.rs b/crates/p2p/src/outgoing.rs index c948bde6..2440e0f7 100644 --- a/crates/p2p/src/outgoing.rs +++ b/crates/p2p/src/outgoing.rs @@ -63,7 +63,10 @@ impl StreamsHandler { let payload = payload.clone(); let fut = async move { let mut writer = FramedWrite::new(stream, LinesCodec::new()); - writer.send(payload).await?; + writer + .send(payload) + .await + .wrap_err("failed to send message to peer")?; Ok::<(PeerId, libp2p::swarm::Stream), eyre::ErrReport>(( peer, writer.into_inner().into_inner(), @@ -71,6 +74,7 @@ impl StreamsHandler { }; futures.push(fut); } + while let Some(result) = futures.next().await { match result { Ok((peer, stream)) => { @@ -85,6 +89,7 @@ impl StreamsHandler { } } } + debug!( "broadcasted message to {} peers", self.peers_to_stream.len()