From e25e2af3d33aa8efb3db54cbd70df521fa2fc658 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Thu, 25 Sep 2025 10:41:12 -0700 Subject: [PATCH 1/2] [Cherrypick] Commit Handler rewrite (#23497) - I highly recommend checking out the branch, opening your IDE, and starting at `handle_consensus_commit_v2` and stepping through the whole path line by line. - When you see a comment that looks like `// DONE(commit-handler-rewrite): ` you can search for `` and find a corresponding `TODO` line in the original handler. You can then check that the new code is doing the same thing as the old code. - Anything ending in `_v2` is usually just the original version, changed to handle `VerifiedExecutableTransaction` instead of `VerifiedSequencedConsensusTransaction` - We currently only run the new handler on the first 2 validators in the committee. This makes it easy to test for forks. Before merging, I will switch to using a protocol flag so that all validators switch atomically. (I claim that the new code is compatible, but there's no harm in being safe) - In order to have both code paths present at the same time, there is a fair amount of duplicated code in `_v2` methods. The original versions of this code will be removed after this has been rolled out. - [x] simtest, with only some validators running new handler. This found a lot of bugs - [ ] antithesis, with and without split-version / upgrade tests - [ ] I will attempt to replay mainnet/testnet commits to further verify compatibility --- crates/sui-benchmark/tests/simtest.rs | 7 + crates/sui-bridge/src/e2e_tests/basic.rs | 21 - crates/sui-bridge/src/e2e_tests/test_utils.rs | 5 +- .../authority/authority_per_epoch_store.rs | 301 ++- .../src/authority/consensus_quarantine.rs | 125 +- .../authority/epoch_start_configuration.rs | 29 +- .../shared_object_congestion_tracker.rs | 7 +- crates/sui-core/src/consensus_adapter.rs | 6 +- crates/sui-core/src/consensus_handler.rs | 2088 ++++++++++++++++- crates/sui-core/src/epoch/reconfiguration.rs | 10 +- .../src/execution_cache/cache_types.rs | 15 +- .../sui-core/src/post_consensus_tx_reorder.rs | 20 + .../tests/coin_registry_tests.rs | 13 + .../sui-e2e-tests/tests/randomness_tests.rs | 54 - .../tests/reconfiguration_tests.rs | 13 + crates/sui-e2e-tests/tests/zklogin_tests.rs | 71 - crates/sui-open-rpc/spec/openrpc.json | 1 + crates/sui-protocol-config/src/lib.rs | 44 + crates/sui-types/src/messages_consensus.rs | 33 +- crates/sui-types/src/transaction.rs | 4 +- 20 files changed, 2493 insertions(+), 374 deletions(-) delete mode 100644 crates/sui-e2e-tests/tests/randomness_tests.rs diff --git a/crates/sui-benchmark/tests/simtest.rs b/crates/sui-benchmark/tests/simtest.rs index 5c9fed418c922..1ec359f88c5bc 100644 --- a/crates/sui-benchmark/tests/simtest.rs +++ b/crates/sui-benchmark/tests/simtest.rs @@ -850,6 +850,13 @@ mod test { if version.as_u64() <= 87 { config.set_record_time_estimate_processed_for_testing(true); } + config.set_ignore_execution_time_observations_after_certs_closed_for_testing(true); + config.set_record_time_estimate_processed_for_testing(true); + config.set_prepend_prologue_tx_in_consensus_commit_in_checkpoints_for_testing(true); + config.set_consensus_checkpoint_signature_key_includes_digest_for_testing(true); + config.set_cancel_for_failed_dkg_early_for_testing(true); + config.set_use_mfp_txns_in_load_initial_object_debts_for_testing(true); + config.set_authority_capabilities_v2_for_testing(true); config }); diff --git a/crates/sui-bridge/src/e2e_tests/basic.rs b/crates/sui-bridge/src/e2e_tests/basic.rs index 6c08aaa00375f..7dc594bf511d2 100644 --- a/crates/sui-bridge/src/e2e_tests/basic.rs +++ b/crates/sui-bridge/src/e2e_tests/basic.rs @@ -16,13 +16,10 @@ use crate::events::{ use crate::sui_transaction_builder::build_add_tokens_on_sui_transaction; use crate::types::{AddTokensOnEvmAction, BridgeAction}; use crate::utils::publish_and_register_coins_return_add_coins_on_sui_action; -use crate::BRIDGE_ENABLE_PROTOCOL_VERSION; use ethers::prelude::*; use ethers::types::Address as EthAddress; use std::collections::HashSet; -use sui_json_rpc_api::BridgeReadApiClient; use sui_types::crypto::get_key_pair; -use test_cluster::TestClusterBuilder; use std::path::Path; @@ -354,21 +351,3 @@ async fn test_committee_registration() { .trigger_reconfiguration_if_not_yet_and_assert_bridge_committee_initialized() .await; } - -#[tokio::test] -async fn test_bridge_api_compatibility() { - let test_cluster: test_cluster::TestCluster = TestClusterBuilder::new() - .with_protocol_version(BRIDGE_ENABLE_PROTOCOL_VERSION.into()) - .build() - .await; - - test_cluster.trigger_reconfiguration().await; - let client = test_cluster.rpc_client(); - client.get_latest_bridge().await.unwrap(); - // TODO: assert fields in summary - - client - .get_bridge_object_initial_shared_version() - .await - .unwrap(); -} diff --git a/crates/sui-bridge/src/e2e_tests/test_utils.rs b/crates/sui-bridge/src/e2e_tests/test_utils.rs index 115ba91b6d1a3..c133819dc2ac4 100644 --- a/crates/sui-bridge/src/e2e_tests/test_utils.rs +++ b/crates/sui-bridge/src/e2e_tests/test_utils.rs @@ -55,7 +55,7 @@ use sui_types::bridge::BridgeSummary; use sui_types::bridge::BridgeTrait; use sui_types::bridge::{get_bridge, BRIDGE_MODULE_NAME}; use sui_types::bridge::{TOKEN_ID_BTC, TOKEN_ID_ETH, TOKEN_ID_USDC, TOKEN_ID_USDT}; -use sui_types::committee::TOTAL_VOTING_POWER; +use sui_types::committee::{ProtocolVersion, TOTAL_VOTING_POWER}; use sui_types::crypto::get_key_pair; use sui_types::crypto::ToFromBytes; use sui_types::digests::TransactionDigest; @@ -72,7 +72,6 @@ use tracing::info; use crate::config::{BridgeNodeConfig, EthConfig, SuiConfig}; use crate::node::run_bridge_node; use crate::sui_client::SuiBridgeClient; -use crate::BRIDGE_ENABLE_PROTOCOL_VERSION; use anyhow::anyhow; use ethers::prelude::*; use move_core_types::ident_str; @@ -883,7 +882,7 @@ pub struct TestClusterWrapperBuilder { impl TestClusterWrapperBuilder { pub fn new() -> Self { Self { - protocol_version: BRIDGE_ENABLE_PROTOCOL_VERSION, + protocol_version: ProtocolVersion::MAX.as_u64(), bridge_authority_keys: vec![], deploy_tokens: false, } diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index c493c7a331b9c..8165f868ca358 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -27,7 +27,6 @@ use mysten_metrics::monitored_scope; use nonempty::NonEmpty; use parking_lot::RwLock; use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard}; -use prometheus::IntCounter; use serde::{Deserialize, Serialize}; use sui_config::node::ExpensiveSafetyCheckConfig; use sui_execution::{self, Executor}; @@ -50,7 +49,9 @@ use sui_types::digests::{ChainIdentifier, TransactionEffectsDigest}; use sui_types::dynamic_field::get_dynamic_field_from_store; use sui_types::effects::{TransactionEffects, TransactionEffectsAPI}; use sui_types::error::{SuiError, SuiResult}; -use sui_types::executable_transaction::VerifiedExecutableTransaction; +use sui_types::executable_transaction::{ + TrustedExecutableTransaction, VerifiedExecutableTransaction, +}; use sui_types::execution::{ExecutionTimeObservationKey, ExecutionTiming}; use sui_types::global_state_hash::GlobalStateHash; use sui_types::message_envelope::TrustedEnvelope; @@ -335,9 +336,9 @@ pub struct AuthorityPerEpochStore { /// Holds the outputs of both consensus handler and checkpoint builder in memory /// until they are proven not to have forked by a certified checkpoint. - consensus_quarantine: RwLock, + pub(crate) consensus_quarantine: RwLock, /// Holds variouis data from consensus_quarantine in a more easily accessible form. - consensus_output_cache: ConsensusOutputCache, + pub(crate) consensus_output_cache: ConsensusOutputCache, protocol_config: ProtocolConfig, @@ -376,7 +377,7 @@ pub struct AuthorityPerEpochStore { /// wait for in-memory tasks for the epoch to finish. If node crashes at this stage validator /// will start with the new epoch(and will open instance of per-epoch store for a new epoch). epoch_alive: tokio::sync::RwLock, - end_of_publish: Mutex>, + pub(crate) end_of_publish: Mutex>, /// Pending certificates that are waiting to be sequenced by the consensus. /// This is an in-memory 'index' of a AuthorityPerEpochTables::pending_consensus_transactions. /// We need to keep track of those in order to know when to send EndOfPublish message. @@ -412,15 +413,15 @@ pub struct AuthorityPerEpochStore { jwk_aggregator: Mutex, /// State machine managing randomness DKG and generation. - randomness_manager: OnceCell>, + pub(crate) randomness_manager: OnceCell>, randomness_reporter: OnceCell, /// Manages recording execution time observations and generating estimates. - execution_time_estimator: tokio::sync::Mutex>, + pub(crate) execution_time_estimator: tokio::sync::Mutex>, tx_local_execution_time: OnceCell>, - tx_object_debts: OnceCell>>, + pub(crate) tx_object_debts: OnceCell>>, // Saved at end of epoch for propagating observations to the next. - end_of_epoch_execution_time_observations: OnceCell, + pub(crate) end_of_epoch_execution_time_observations: OnceCell, pub(crate) consensus_tx_status_cache: Option, @@ -548,8 +549,12 @@ pub struct AuthorityEpochTables { active_jwks: DBMap<(u64, (JwkId, JWK)), ()>, /// Transactions that are being deferred until some future time + /// TODO(consensus-handler-rewrite): remove this table once we no longer need to support the old consensus handler deferred_transactions: DBMap>, + /// Transactions that are being deferred until some future time + deferred_transactions_v2: DBMap>, + // Tables for recording state for RandomnessManager. /// Records messages processed from other nodes. Updated when receiving a new dkg::Message /// via consensus. @@ -954,6 +959,16 @@ impl AuthorityEpochTables { .safe_iter() .collect::>()?) } + + fn get_all_deferred_transactions_v2( + &self, + ) -> SuiResult>> { + Ok(self + .deferred_transactions_v2 + .safe_iter() + .map(|item| item.map(|(key, txs)| (key, txs.into_iter().map(Into::into).collect()))) + .collect::>()?) + } } pub(crate) const MUTEX_TABLE_SIZE: usize = 1024; @@ -2246,14 +2261,6 @@ impl AuthorityPerEpochStore { .assigned_versions) } - fn load_deferred_transactions_for_randomness( - &self, - output: &mut ConsensusCommitOutput, - ) -> SuiResult)>> { - let (min, max) = DeferralKey::full_range_for_randomness(); - self.load_deferred_transactions(output, min, max) - } - fn load_and_process_deferred_transactions_for_randomness( &self, output: &mut ConsensusCommitOutput, @@ -2282,7 +2289,15 @@ impl AuthorityPerEpochStore { Ok(()) } - fn load_deferred_transactions_for_up_to_consensus_round( + pub(crate) fn load_deferred_transactions_for_randomness( + &self, + output: &mut ConsensusCommitOutput, + ) -> SuiResult)>> { + let (min, max) = DeferralKey::full_range_for_randomness(); + self.load_deferred_transactions(output, min, max) + } + + pub(crate) fn load_deferred_transactions_for_up_to_consensus_round( &self, output: &mut ConsensusCommitOutput, consensus_round: u64, @@ -2336,6 +2351,68 @@ impl AuthorityPerEpochStore { Ok(txns) } + pub(crate) fn load_deferred_transactions_for_randomness_v2( + &self, + output: &mut ConsensusCommitOutput, + ) -> SuiResult)>> { + let (min, max) = DeferralKey::full_range_for_randomness(); + self.load_deferred_transactions_v2(output, min, max) + } + + pub(crate) fn load_deferred_transactions_for_up_to_consensus_round_v2( + &self, + output: &mut ConsensusCommitOutput, + consensus_round: u64, + ) -> SuiResult)>> { + let (min, max) = DeferralKey::range_for_up_to_consensus_round(consensus_round); + self.load_deferred_transactions_v2(output, min, max) + } + + // factoring of the above + fn load_deferred_transactions_v2( + &self, + output: &mut ConsensusCommitOutput, + min: DeferralKey, + max: DeferralKey, + ) -> SuiResult)>> { + debug!("Query epoch store to load deferred txn {:?} {:?}", min, max); + + let (keys, txns) = { + let mut keys = Vec::new(); + let mut txns = Vec::new(); + + let deferred_transactions = self.consensus_output_cache.deferred_transactions_v2.lock(); + + for (key, transactions) in deferred_transactions.range(min..max) { + debug!( + "Loaded {:?} deferred txn with deferral key {:?}", + transactions.len(), + key + ); + keys.push(*key); + txns.push((*key, transactions.clone())); + } + + (keys, txns) + }; + + // verify that there are no duplicates - should be impossible due to + // is_consensus_message_processed + #[cfg(debug_assertions)] + { + let mut seen = HashSet::new(); + for deferred_txn_batch in &txns { + for txn in &deferred_txn_batch.1 { + assert!(seen.insert(txn.digest())); + } + } + } + + output.delete_loaded_deferred_transactions(&keys); + + Ok(txns) + } + pub fn get_all_deferred_transactions_for_test( &self, ) -> Vec<(DeferralKey, Vec)> { @@ -2347,7 +2424,7 @@ impl AuthorityPerEpochStore { .collect() } - fn should_defer( + pub(crate) fn should_defer( &self, tx_cost: Option, cert: &VerifiedExecutableTransaction, @@ -2364,6 +2441,9 @@ impl AuthorityPerEpochStore { && self.randomness_state_enabled() && cert.transaction_data().uses_randomness() { + // TODO(commit-handler-rewrite): propogate original deferred_from_round when re-deferring + // DONE(commit-handler-rewrite): propogate original deferred_from_round when re-deferring + // (TODO and DONE are in same place because this code is shared between old and new commit handler) let deferred_from_round = previously_deferred_tx_digests .get(cert.digest()) .map(|previous_key| previous_key.deferred_from_round()) @@ -2420,7 +2500,7 @@ impl AuthorityPerEpochStore { lock: Option<&RwLockReadGuard>, ) -> SuiResult { let key_value_pairs = transactions.iter().filter_map(|tx| { - if tx.is_user_transaction() { + if tx.is_mfp_transaction() { // UserTransaction does not need to be resubmitted on recovery. None } else { @@ -2495,6 +2575,13 @@ impl AuthorityPerEpochStore { .is_empty() } + pub fn deferred_transactions_empty_v2(&self) -> bool { + self.consensus_output_cache + .deferred_transactions_v2 + .lock() + .is_empty() + } + /// Check whether any certificates were processed by consensus. /// This handles multiple certificates at once. pub fn is_any_tx_certs_consensus_message_processed<'a>( @@ -2600,12 +2687,11 @@ impl AuthorityPerEpochStore { Ok(()) } - pub fn has_received_end_of_publish_from(&self, authority: &AuthorityName) -> SuiResult { - Ok(self - .end_of_publish + pub fn has_received_end_of_publish_from(&self, authority: &AuthorityName) -> bool { + self.end_of_publish .try_lock() .expect("No contention on end_of_publish lock") - .contains_key(authority)) + .contains_key(authority) } // Converts transaction keys to digests, waiting for digests to become available for any @@ -2811,15 +2897,16 @@ impl AuthorityPerEpochStore { .collect::, _>>()?) } - fn record_jwk_vote( + pub(crate) fn record_jwk_vote( &self, output: &mut ConsensusCommitOutput, round: u64, authority: AuthorityName, id: &JwkId, jwk: &JWK, - ) -> SuiResult { + ) { info!( + ?round, "received jwk vote from {:?} for jwk ({:?}, {:?})", authority.concise(), id, @@ -2831,7 +2918,7 @@ impl AuthorityPerEpochStore { "ignoring vote because authenticator state object does exist yet (it will be created at the end of this epoch)" ); - return Ok(()); + return; } let mut jwk_aggregator = self.jwk_aggregator.lock(); @@ -2846,7 +2933,7 @@ impl AuthorityPerEpochStore { "validator {:?} has already voted {} times this epoch, ignoring vote", authority, votes, ); - return Ok(()); + return; } output.insert_pending_jwk(authority, id.clone(), jwk.clone()); @@ -2859,11 +2946,10 @@ impl AuthorityPerEpochStore { info!(epoch = ?self.epoch(), ?round, jwk = ?key, "jwk became active"); output.insert_active_jwk(round, key); } - - Ok(()) } pub(crate) fn get_new_jwks(&self, round: u64) -> SuiResult> { + info!("Getting new jwks for round {:?}", round); self.consensus_quarantine.read().get_new_jwks(self, round) } @@ -2918,7 +3004,10 @@ impl AuthorityPerEpochStore { .expect("push_consensus_output should not fail"); } - fn process_user_signatures<'a>(&self, certificates: impl Iterator) { + pub(crate) fn process_user_signatures<'a>( + &self, + certificates: impl Iterator, + ) { let sigs: Vec<_> = certificates .filter_map(|s| match s { Schedulable::Transaction(certificate) => { @@ -2950,7 +3039,7 @@ impl AuthorityPerEpochStore { self.reconfig_state_mem.read() } - pub fn get_reconfig_state_write_lock_guard(&self) -> RwLockWriteGuard { + pub(crate) fn get_reconfig_state_write_lock_guard(&self) -> RwLockWriteGuard { self.reconfig_state_mem.write() } @@ -3026,24 +3115,12 @@ impl AuthorityPerEpochStore { /// Important: This function can potentially be called in parallel and you can not rely on order of transactions to perform verification /// If this function return an error, transaction is skipped and is not passed to handle_consensus_transaction /// This function returns unit error and is responsible for emitting log messages for internal errors - fn verify_consensus_transaction( + pub(crate) fn verify_consensus_transaction( &self, transaction: SequencedConsensusTransaction, - skipped_consensus_txns: &IntCounter, ) -> Option { let _scope = monitored_scope("VerifyConsensusTransaction"); - if self - .is_consensus_message_processed(&transaction.transaction.key()) - .expect("Storage error") - { - trace!( - consensus_index=?transaction.consensus_index.transaction_index, - tracking_id=?transaction.transaction.get_tracking_id(), - "handle_consensus_transaction UserTransaction [skip]", - ); - skipped_consensus_txns.inc(); - return None; - } + // Signatures are verified as part of the consensus payload verification in SuiTxValidator match &transaction.transaction { SequencedConsensusTransactionKind::External(ConsensusTransaction { @@ -3192,14 +3269,24 @@ impl AuthorityPerEpochStore { mut indirect_state_observer: IndirectStateObserver, authority_metrics: &Arc, ) -> SuiResult<(Vec, AssignedTxAndVersions)> { - // Split transactions into different types for processing. + // TODO(commit-handler-rewrite): Split transactions into different types for processing. let verified_transactions: Vec<_> = transactions .into_iter() .filter_map(|transaction| { - self.verify_consensus_transaction( - transaction, - &authority_metrics.skipped_consensus_txns, - ) + if self + .is_consensus_message_processed(&transaction.transaction.key()) + .expect("Storage error") + { + trace!( + consensus_index=?transaction.consensus_index.transaction_index, + tracking_id=?transaction.transaction.get_tracking_id(), + "handle_consensus_transaction UserTransaction [skip]", + ); + authority_metrics.skipped_consensus_txns.inc(); + None + } else { + self.verify_consensus_transaction(transaction) + } }) .collect(); let mut system_transactions = Vec::with_capacity(verified_transactions.len()); @@ -3229,7 +3316,7 @@ impl AuthorityPerEpochStore { let mut output = ConsensusCommitOutput::new(consensus_commit_info.round); - // Load transactions deferred from previous commits. + // TODO(commit-handler-rewrite): Load transactions deferred from previous commits, compute the digest set of all such transactions. let deferred_txs: Vec<(DeferralKey, Vec)> = self .load_deferred_transactions_for_up_to_consensus_round( &mut output, @@ -3265,6 +3352,7 @@ impl AuthorityPerEpochStore { + previously_deferred_tx_digests.len(), ); + // TODO(commit-handler-rewrite): load randomness manager, get random round let mut randomness_manager = self.randomness_manager.get().map(|rm| { rm.try_lock() .expect("should only ever be called from the commit handler thread") @@ -3281,6 +3369,7 @@ impl AuthorityPerEpochStore { None } DkgStatus::Successful => { + // TODO(commit-handler-rewrite): do not reserve randomness if !should_accept_tx() // Generate randomness for this commit if DKG is successful and we are still // accepting certs. if self @@ -3301,6 +3390,7 @@ impl AuthorityPerEpochStore { None }; + // TODO(commit-handler-rewrite): load all deferred randomness-using txns // We should load any previously-deferred randomness-using tx: // - if DKG is failed, so we can ignore them // - if randomness is being generated, so we can process them @@ -3312,6 +3402,7 @@ impl AuthorityPerEpochStore { )?; } + // TODO(commit-handler-rewrite): deferred transactions have higher priority than new txns (if gas price is equal) // Add ConsensusRound deferred tx back into the sequence. for tx in deferred_txs .into_iter() @@ -3329,6 +3420,8 @@ impl AuthorityPerEpochStore { sequenced_transactions.extend(current_commit_sequenced_consensus_transactions); sequenced_randomness_transactions.extend(current_commit_sequenced_randomness_transactions); + // TODO(commit-handler-rewrite): compute checkpoint roots (this should be done at the end) + // Save roots for checkpoint generation. One set for most tx, one for randomness tx. let mut roots: BTreeSet<_> = system_transactions .iter() @@ -3354,6 +3447,7 @@ impl AuthorityPerEpochStore { }) .collect(); + // TODO(commit-handler-rewrite): reorder transactions by gas price PostConsensusTxReorder::reorder( &mut sequenced_transactions, self.protocol_config.consensus_transaction_ordering(), @@ -3363,7 +3457,7 @@ impl AuthorityPerEpochStore { self.protocol_config.consensus_transaction_ordering(), ); - // Process new execution time observations for use by congestion control. + // TODO(commit-handler-rewrite): [ssm] Process new execution time observations for use by congestion control. let mut execution_time_estimator = self .execution_time_estimator .try_lock() @@ -3371,6 +3465,7 @@ impl AuthorityPerEpochStore { // It is ok to just release lock here as functions called by this one are the // only place that transition reconfig state, and this function itself is always // executed from consensus task. + // TODO(commit-handler-rewrite): [ssm] ignore execution time observations if !should_accept_consensus_certs() if !self .protocol_config() .ignore_execution_time_observations_after_certs_closed() @@ -3404,6 +3499,7 @@ impl AuthorityPerEpochStore { } } + // TODO(commit-handler-rewrite): initialize congestion trackers // We track transaction execution cost separately for regular transactions and transactions using randomness, since // they will be in different PendingCheckpoints. let shared_object_congestion_tracker = SharedObjectCongestionTracker::from_protocol_config( @@ -3415,7 +3511,7 @@ impl AuthorityPerEpochStore { )?, self.protocol_config(), false, - )?; + ); let shared_object_using_randomness_congestion_tracker = SharedObjectCongestionTracker::from_protocol_config( self.consensus_quarantine.read().load_initial_object_debts( @@ -3426,7 +3522,7 @@ impl AuthorityPerEpochStore { )?, self.protocol_config(), true, - )?; + ); let sequenced_non_randomness_transactions: Vec<_> = system_transactions .into_iter() @@ -3459,6 +3555,7 @@ impl AuthorityPerEpochStore { ) .await?; + // TODO(commit-handler-rewrite): create accumulator settlement transactions if self.accumulators_enabled() { // We insert settlement transactions to the end of the certificate queues. // This is important for shared object version assignment to work correctly. @@ -3481,6 +3578,7 @@ impl AuthorityPerEpochStore { } } + // TODO(commit-handler-rewrite): add the consensus commit prologue transaction and root // Add the consensus commit prologue transaction to the beginning of `verified_non_randomness_certificates`. let consensus_commit_prologue_root = self.add_consensus_commit_prologue_transaction( &mut output, @@ -3490,6 +3588,7 @@ impl AuthorityPerEpochStore { &cancelled_txns, )?; + // TODO(commit-handler-rewrite): assign shared object versions let assigned_versions = self.process_consensus_transaction_shared_object_versions( cache_reader, verified_non_randomness_transactions.iter(), @@ -3498,18 +3597,23 @@ impl AuthorityPerEpochStore { &mut output, )?; + // TODO(commit-handler-rewrite): [ssm] check if epoch is over let (lock, final_round) = self.process_end_of_publish_transactions_and_reconfig( &mut output, &end_of_publish_transactions, )?; + // TODO(commit-handler-rewrite): store all user signatures for use by checkpoint builder self.process_user_signatures( verified_non_randomness_transactions .iter() .chain(verified_randomness_transactions.iter()), ); + + // TODO(commit-handler-rewrite): record consensus_stats output.record_consensus_commit_stats(consensus_stats.clone()); + // TODO(commit-handler-rewrite): record execution time observations for next epoch // If this is the final round, record execution time observations for storage in the // end-of-epoch tx. if final_round { @@ -3523,7 +3627,7 @@ impl AuthorityPerEpochStore { drop(execution_time_estimator); // make sure this is not used after `take_observations` } - // Create pending checkpoints if we are still accepting tx. + // TODO(commit-handler-rewrite): check tx acceptance state let should_accept_tx = if let Some(lock) = &lock { lock.should_accept_tx() } else { @@ -3534,6 +3638,7 @@ impl AuthorityPerEpochStore { self.get_reconfig_state_read_lock_guard().should_accept_tx() }; let make_checkpoint = should_accept_tx || final_round; + // TODO(commit-handler-rewrite): Create pending checkpoints if we are still accepting tx. if make_checkpoint { let checkpoint_height = self.calculate_pending_checkpoint_height(consensus_commit_info.round); @@ -3560,6 +3665,8 @@ impl AuthorityPerEpochStore { )); } + // TODO(commit-handler-rewrite): [ssm] write pending randomness checkpoint if we have a new randomness round, OR dkg failed while there are pending randomness txns + // Determine whether to write pending checkpoint for user tx with randomness. // - If randomness is not generated for this commit, we will skip the // checkpoint with the associated height. Therefore checkpoint heights may @@ -3593,6 +3700,7 @@ impl AuthorityPerEpochStore { } { + // TODO(commit-handler-rewrite): propogate deferral deletion to consensus output cache let mut deferred_transactions = self.consensus_output_cache.deferred_transactions.lock(); for deleted_deferred_key in output.get_deleted_deferred_txn_keys() { @@ -3600,10 +3708,12 @@ impl AuthorityPerEpochStore { } } + // TODO(commit-handler-rewrite): send consensus output to quarantine self.consensus_quarantine .write() .push_consensus_output(output, self)?; + // TODO(commit-handler-rewrite): notify checkpoint service // Only after batch is written, notify checkpoint service to start building any new // pending checkpoints. if make_checkpoint { @@ -3614,7 +3724,7 @@ impl AuthorityPerEpochStore { checkpoint_service.notify_checkpoint()?; } - // Once commit processing is recorded, kick off randomness generation. + // TODO(commit-handler-rewrite): Once commit processing is recorded, kick off randomness generation. if let Some(randomness_round) = randomness_round { let epoch = self.epoch(); randomness_manager @@ -3623,8 +3733,14 @@ impl AuthorityPerEpochStore { .generate_randomness(epoch, randomness_round); } - self.process_notifications(¬ifications, &end_of_publish_transactions); + // TODO(commit-handler-rewrite): notify waiters that consensus transactions have been processed + let eop_keys: Vec<_> = end_of_publish_transactions + .into_iter() + .map(|tx| tx.0.transaction.key()) + .collect(); + self.process_notifications(notifications.iter().chain(eop_keys.iter())); + // TODO(commit-handler-rewrite): log end of epoch if final_round { info!( epoch=?self.epoch(), @@ -3645,7 +3761,7 @@ impl AuthorityPerEpochStore { Ok((all_txns, assigned_versions)) } - fn calculate_pending_checkpoint_height(&self, consensus_round: u64) -> u64 { + pub(crate) fn calculate_pending_checkpoint_height(&self, consensus_round: u64) -> u64 { if self.randomness_state_enabled() { consensus_round * 2 } else { @@ -3656,7 +3772,7 @@ impl AuthorityPerEpochStore { // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update // the system clock used in all transactions in the current consensus commit. // Returns the root of the consensus commit prologue transaction if it was added to the input. - fn add_consensus_commit_prologue_transaction( + pub(crate) fn add_consensus_commit_prologue_transaction( &self, output: &mut ConsensusCommitOutput, transactions: &mut VecDeque>, @@ -3715,6 +3831,7 @@ impl AuthorityPerEpochStore { transactions.push_front(Schedulable::Transaction(processed_tx.clone())); Some(processed_tx.key()) } + // TODO(commit-handler-rewrite): do not insert commit prologue if !should_accept_tx() ConsensusCertificateResult::IgnoredSystem => None, _ => unreachable!("process_consensus_system_transaction returned unexpected ConsensusCertificateResult."), }; @@ -3728,7 +3845,7 @@ impl AuthorityPerEpochStore { // Assigns shared object versions to transactions and updates the next shared object version state. // Shared object versions in cancelled transactions are assigned to special versions that will // cause the transactions to be cancelled in execution engine. - fn process_consensus_transaction_shared_object_versions<'a>( + pub(crate) fn process_consensus_transaction_shared_object_versions<'a>( &'a self, cache_reader: &dyn ObjectCacheRead, non_randomness_transactions: impl Iterator + Clone, @@ -3821,17 +3938,12 @@ impl AuthorityPerEpochStore { Ok(assigned_versions) } - fn process_notifications( - &self, - notifications: &[SequencedConsensusTransactionKey], - end_of_publish: &[VerifiedSequencedConsensusTransaction], + pub(crate) fn process_notifications<'a>( + &'a self, + notifications: impl Iterator, ) { - for key in notifications - .iter() - .cloned() - .chain(end_of_publish.iter().map(|tx| tx.0.transaction.key())) - { - self.consensus_notify_read.notify(&key, &()); + for key in notifications { + self.consensus_notify_read.notify(key, &()); } } @@ -3896,6 +4008,7 @@ impl AuthorityPerEpochStore { let mut verified_randomness_certificates = VecDeque::with_capacity(randomness_transactions.len() + 1); + // TODO(commit-handler-rewrite): add randomness state update transaction if let Some(round) = randomness_round { verified_randomness_certificates .push_back(Schedulable::RandomnessStateUpdate(self.epoch(), round)); @@ -3906,6 +4019,7 @@ impl AuthorityPerEpochStore { .map(Either::Left) .chain(randomness_transactions.iter().map(Either::Right)) { + // TODO(commit-handler-rewrite): use correct congestion tracker for randomness vs non-randomness let (tx, execution_cost, verified_certificates) = match entry { Either::Left(tx) => ( tx, @@ -3943,6 +4057,7 @@ impl AuthorityPerEpochStore { verified_certificates.push_back(Schedulable::Transaction(cert)); } ConsensusCertificateResult::Deferred(deferral_key) => { + // TODO(commit-handler-rewrite): record_consensus_message_processed() must be called for deferred txns // Note: record_consensus_message_processed() must be called for this // cert even though we are not processing it now! deferred_txns @@ -3956,18 +4071,26 @@ impl AuthorityPerEpochStore { } } ConsensusCertificateResult::Cancelled((cert, reason)) => { + // TODO(commit-handler-rewrite): cancelled txns must be recorded as processed + // TODO(commit-handler-rewrite): cancelled txns must be scheduled for execution notifications.push(key.clone()); assert!(cancelled_txns.insert(*cert.digest(), reason).is_none()); verified_certificates.push_back(Schedulable::Transaction(cert)); } ConsensusCertificateResult::RandomnessConsensusMessage => { + // TODO(commit-handler-rewrite): randomness messages must be recorded as processed randomness_state_updated = true; notifications.push(key.clone()); } - ConsensusCertificateResult::ConsensusMessage => notifications.push(key.clone()), + ConsensusCertificateResult::ConsensusMessage => { + // TODO(commit-handler-rewrite): consensus messages must be recorded as processed + notifications.push(key.clone()); + } ConsensusCertificateResult::IgnoredSystem => { + // TODO(commit-handler-rewrite): commit prologue should not be added to roots after tx processing is closed filter_roots = true; } + // TODO(commit-handler-rewrite): ignored external transactions must not be recorded as processed. // Note: ignored external transactions must not be recorded as processed. Otherwise // they may not get reverted after restart during epoch change. ConsensusCertificateResult::Ignored => { @@ -3990,6 +4113,7 @@ impl AuthorityPerEpochStore { } } + // TODO(commit-handler-rewrite): add deferred transactions to consensus output let mut total_deferred_txns = 0; { let mut deferred_transactions = @@ -4001,6 +4125,7 @@ impl AuthorityPerEpochStore { } } + // TODO(commit-handler-rewrite): update metrics authority_metrics .consensus_handler_deferred_transactions .inc_by(total_deferred_txns as u64); @@ -4016,6 +4141,7 @@ impl AuthorityPerEpochStore { .with_label_values(&["randomness_commit"]) .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64); + // TODO(commit-handler-rewrite): gather object debts, send them to ExecutionTimeObserver let object_debts = shared_object_congestion_tracker.accumulated_debts(consensus_commit_info); let randomness_object_debts = shared_object_using_randomness_congestion_tracker @@ -4031,9 +4157,11 @@ impl AuthorityPerEpochStore { info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}"); } } + // TODO(commit-handler-rewrite): commit object debts to output output.set_congestion_control_object_debts(object_debts); output.set_congestion_control_randomness_object_debts(randomness_object_debts); + // TODO(commit-handler-rewrite): [ssm] advance randomness state if needed if randomness_state_updated { if let Some(randomness_manager) = randomness_manager.as_mut() { randomness_manager @@ -4080,6 +4208,7 @@ impl AuthorityPerEpochStore { // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state // And this function itself is always executed from consensus task + // TODO(commit-handler-rewrite): [ssm] ignore end of publish messages if !should_accept_consensus_certs() let collected_end_of_publish = if lock.is_none() && self .get_reconfig_state_read_lock_guard() @@ -4104,6 +4233,7 @@ impl AuthorityPerEpochStore { authority.concise(), ); let mut l = self.get_reconfig_state_write_lock_guard(); + // TODO(commit-handler-rewrite): [ssm] after 2f+1 EOPs, transition to RejectAllCerts l.close_all_certs(); output.store_reconfig_state(l.clone()); // Holding this lock until end of process_consensus_transactions_and_commit_boundary() where we write batch to DB @@ -4132,6 +4262,7 @@ impl AuthorityPerEpochStore { .is_reject_all_certs() }; + // TODO(commit-handler-rewrite): [ssm] if we are rejecting all certs, AND there are no deferred transactions to process, transition to RejectAllTx if !is_reject_all_certs || !self.deferred_transactions_empty() || commit_has_deferred_txns { // Don't end epoch until all deferred transactions are processed. if is_reject_all_certs { @@ -4147,6 +4278,7 @@ impl AuthorityPerEpochStore { let mut lock = lock.unwrap_or_else(|| self.get_reconfig_state_write_lock_guard()); lock.close_all_tx(); output.store_reconfig_state(lock.clone()); + // TODO(commit-handler-rewrite): [ssm] only return final_round=true on the first round where we transition to RejectAllTx Ok((Some(lock), true)) } @@ -4181,6 +4313,7 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::CertifiedTransaction(certificate), .. }) => { + // TODO(commit-handler-rewrite): ignore certs from wrong epoch if certificate.epoch() != self.epoch() { // Epoch has changed after this certificate was sequenced, ignore it. debug!( @@ -4214,6 +4347,7 @@ impl AuthorityPerEpochStore { | ConsensusTransactionKind::CheckpointSignatureV2(info), .. }) => { + // TODO(commit-handler-rewrite): [ssm] notify checkpoint signatures // We usually call notify_checkpoint_signature in SuiTxValidator, but that step can // be skipped when a batch is already part of a certificate, so we must also // notify here. @@ -4231,6 +4365,7 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::CapabilityNotification(capabilities), .. }) => { + // TODO(commit-handler-rewrite): [ssm] can ignore capabilities v1 in rewrite let authority = capabilities.authority; if self .get_reconfig_state_read_lock_guard() @@ -4253,6 +4388,8 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::CapabilityNotificationV2(capabilities), .. }) => { + // TODO(commit-handler-rewrite): [ssm] ignore capability notifications if !should_accept_consensus_certs() + // TODO(commit-handler-rewrite): [ssm] record authority capabilities let authority = capabilities.authority; if self .get_reconfig_state_read_lock_guard() @@ -4275,6 +4412,8 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::NewJWKFetched(authority, jwk_id, jwk), .. }) => { + // TODO(commit-handler-rewrite): [ssm] record jwk votes + // TODO(commit-handler-rewrite): [ssm] ignore jwk votes if !should_accept_consensus_certs() if self .get_reconfig_state_read_lock_guard() .should_accept_consensus_certs() @@ -4285,7 +4424,7 @@ impl AuthorityPerEpochStore { *authority, jwk_id, jwk, - )?; + ); } else { debug!( "Ignoring NewJWKFetched from {:?} because of end of epoch", @@ -4305,6 +4444,8 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::RandomnessDkgMessage(authority, bytes), .. }) => { + // TODO(commit-handler-rewrite): [ssm] process dkg message + // TODO(commit-handler-rewrite): [ssm] ignore dkg message if !should_accept_tx() if self.get_reconfig_state_read_lock_guard().should_accept_tx() { if let Some(randomness_manager) = randomness_manager.as_mut() { debug!( @@ -4338,6 +4479,8 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::RandomnessDkgConfirmation(authority, bytes), .. }) => { + // TODO(commit-handler-rewrite): [ssm] process dkg confirmation + // TODO(commit-handler-rewrite): [ssm] ignore dkg confirmation if !should_accept_tx() if self.get_reconfig_state_read_lock_guard().should_accept_tx() { if let Some(randomness_manager) = randomness_manager.as_mut() { debug!( @@ -4382,7 +4525,7 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::UserTransaction(tx), .. }) => { - // Ignore consensus certified user transaction if Mysticeti fastpath is not enabled. + // TODO(commit-handler-rewrite): ignore mfp user transaction if mfp is disabled if !self.protocol_config().mysticeti_fastpath() { return Ok(ConsensusCertificateResult::Ignored); } @@ -4445,7 +4588,8 @@ impl AuthorityPerEpochStore { ) -> SuiResult { let _scope = monitored_scope("ConsensusCommitHandler::process_consensus_user_transaction"); - if self.has_received_end_of_publish_from(block_author)? + // TODO(commit-handler-rewrite): ignore transactions sent by validators that have already sent EOP + if self.has_received_end_of_publish_from(block_author) && !previously_deferred_tx_digests.contains_key(transaction.digest()) { // This can not happen with valid authority @@ -4462,6 +4606,7 @@ impl AuthorityPerEpochStore { "Processing consensus transactions from user (CertifiedTransaction and UserTransaction)", ); + // TODO(commit-handler-rewrite): ignore txns due to !should_accept_consensus_certs(), unless they were previously deferred if !self .get_reconfig_state_read_lock_guard() .should_accept_consensus_certs() @@ -4491,6 +4636,7 @@ impl AuthorityPerEpochStore { ))); } + // TODO(commit-handler-rewrite): check if transaction should be deferred let tx_cost = shared_object_congestion_tracker.get_tx_cost( execution_time_estimator, &transaction, @@ -4516,13 +4662,14 @@ impl AuthorityPerEpochStore { let deferral_result = match deferral_reason { DeferralReason::RandomnessNotReady => { - // Always defer transaction due to randomness not ready. + // TODO(commit-handler-rewrite): Always defer transaction due to randomness not ready. ConsensusCertificateResult::Deferred(deferral_key) } DeferralReason::SharedObjectCongestion(congested_objects) => { authority_metrics .consensus_handler_congested_transactions .inc(); + // TODO(commit-handler-rewrite): when deferral limit is exceeded, cancel the transaction if transaction_deferral_within_limit( &deferral_key, self.protocol_config() @@ -4549,6 +4696,7 @@ impl AuthorityPerEpochStore { return Ok(deferral_result); } + // TODO(commit-handler-rewrite): cancel randomness-using txns if DKG failed if !fail_dkg_early && dkg_failed && self.randomness_state_enabled() @@ -4564,6 +4712,7 @@ impl AuthorityPerEpochStore { ))); } + // TODO(commit-handler-rewrite): update object execution cost for all scheduled transactions // This certificate will be scheduled. Update object execution cost. shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, &transaction); diff --git a/crates/sui-core/src/authority/consensus_quarantine.rs b/crates/sui-core/src/authority/consensus_quarantine.rs index 8c6b0fa6e5182..d6624b40a7c74 100644 --- a/crates/sui-core/src/authority/consensus_quarantine.rs +++ b/crates/sui-core/src/authority/consensus_quarantine.rs @@ -68,6 +68,8 @@ pub(crate) struct ConsensusCommitOutput { // TODO: If we delay committing consensus output until after all deferrals have been loaded, // we can move deferred_txns to the ConsensusOutputCache and save disk bandwidth. deferred_txns: Vec<(DeferralKey, Vec)>, + // TODO(commit-handler-rewrite): remove the original once we no longer need to support the old consensus handler + deferred_txns_v2: Vec<(DeferralKey, Vec)>, // deferred txns that have been loaded and can be removed deleted_deferred_txns: BTreeSet, @@ -109,7 +111,7 @@ impl ConsensusCommitOutput { } pub fn has_deferred_transactions(&self) -> bool { - !self.deferred_txns.is_empty() + !self.deferred_txns.is_empty() || !self.deferred_txns_v2.is_empty() } fn get_randomness_last_round_timestamp(&self) -> Option { @@ -176,6 +178,12 @@ impl ConsensusCommitOutput { self.consensus_messages_processed.insert(key); } + pub fn get_consensus_messages_processed( + &self, + ) -> impl Iterator { + self.consensus_messages_processed.iter() + } + pub fn set_next_shared_object_versions( &mut self, next_versions: HashMap, @@ -192,6 +200,14 @@ impl ConsensusCommitOutput { self.deferred_txns.push((key, transactions)); } + pub fn defer_transactions_v2( + &mut self, + key: DeferralKey, + transactions: Vec, + ) { + self.deferred_txns_v2.push((key, transactions)); + } + pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) { self.deleted_deferred_txns .extend(deferral_keys.iter().cloned()); @@ -285,8 +301,28 @@ impl ConsensusCommitOutput { batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?; } - batch.delete_batch(&tables.deferred_transactions, self.deleted_deferred_txns)?; + // TODO(consensus-handler-rewrite): delete the old structures once commit handler rewrite is complete + batch.delete_batch(&tables.deferred_transactions, &self.deleted_deferred_txns)?; + batch.delete_batch( + &tables.deferred_transactions_v2, + &self.deleted_deferred_txns, + )?; + batch.insert_batch(&tables.deferred_transactions, self.deferred_txns)?; + batch.insert_batch( + &tables.deferred_transactions_v2, + self.deferred_txns_v2.into_iter().map(|(key, txs)| { + ( + key, + txs.into_iter() + .map(|tx| { + let tx: TrustedExecutableTransaction = tx.serializable(); + tx + }) + .collect::>(), + ) + }), + )?; if let Some((round, commit_timestamp)) = self.next_randomness_round { batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?; @@ -366,11 +402,16 @@ impl ConsensusCommitOutput { pub(crate) struct ConsensusOutputCache { // deferred transactions is only used by consensus handler so there should never be lock contention // - hence no need for a DashMap. - pub(super) deferred_transactions: + // TODO(consensus-handler-rewrite): remove this once we no longer need to support the old consensus handler + pub(crate) deferred_transactions: Mutex>>, + + pub(crate) deferred_transactions_v2: + Mutex>>, + // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder // The critical sections are small in both cases so a DashMap is probably not helpful. - pub(super) user_signatures_for_checkpoints: + pub(crate) user_signatures_for_checkpoints: Mutex>>, executed_in_epoch: RwLock>, @@ -386,6 +427,10 @@ impl ConsensusOutputCache { .get_all_deferred_transactions() .expect("load deferred transactions cannot fail"); + let deferred_transactions_v2 = tables + .get_all_deferred_transactions_v2() + .expect("load deferred transactions cannot fail"); + assert!( epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(), "This version of sui-node can only run after data quarantining has been enabled. Please run version 1.45.0 or later to the end of the current epoch and retry" @@ -395,6 +440,7 @@ impl ConsensusOutputCache { Self { deferred_transactions: Mutex::new(deferred_transactions), + deferred_transactions_v2: Mutex::new(deferred_transactions_v2), user_signatures_for_checkpoints: Default::default(), executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)), executed_in_epoch_cache: MokaCache::builder(8) @@ -491,7 +537,7 @@ impl ConsensusOutputQuarantine { // There are only two sources! ConsensusHandler and CheckpointBuilder. impl ConsensusOutputQuarantine { // Push all data gathered from a consensus commit into the quarantine. - pub(super) fn push_consensus_output( + pub(crate) fn push_consensus_output( &mut self, output: ConsensusCommitOutput, epoch_store: &AuthorityPerEpochStore, @@ -954,6 +1000,75 @@ impl ConsensusOutputQuarantine { (object_id, debt) })) } + + // TODO: Remove the above version and rename this without _v2 + pub(crate) fn load_initial_object_debts_v2( + &self, + epoch_store: &AuthorityPerEpochStore, + current_round: Round, + for_randomness: bool, + transactions: &[VerifiedExecutableTransaction], + ) -> SuiResult> { + let protocol_config = epoch_store.protocol_config(); + let tables = epoch_store.tables()?; + let default_per_commit_budget = protocol_config + .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option() + .unwrap_or(0); + let (hash_table, db_table, per_commit_budget) = if for_randomness { + ( + &self.congestion_control_randomness_object_debts, + &tables.congestion_control_randomness_object_debts, + protocol_config + .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option() + .unwrap_or(default_per_commit_budget), + ) + } else { + ( + &self.congestion_control_object_debts, + &tables.congestion_control_object_debts, + default_per_commit_budget, + ) + }; + let mut shared_input_object_ids: Vec<_> = transactions + .iter() + .flat_map(|tx| tx.shared_input_objects().map(|obj| obj.id)) + .collect(); + shared_input_object_ids.sort(); + shared_input_object_ids.dedup(); + + let results = do_fallback_lookup( + &shared_input_object_ids, + |object_id| { + if let Some(debt) = hash_table.get(object_id) { + CacheResult::Hit(Some(debt.into_v1())) + } else { + CacheResult::Miss + } + }, + |object_ids| { + db_table + .multi_get(object_ids) + .expect("db error") + .into_iter() + .map(|debt| debt.map(|debt| debt.into_v1())) + .collect() + }, + ); + + Ok(results + .into_iter() + .zip(shared_input_object_ids) + .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id))) + .map(move |((round, debt), object_id)| { + // Stored debts already account for the budget of the round in which + // they were accumulated. Application of budget from future rounds to + // the debt is handled here. + assert!(current_round > round); + let num_rounds = current_round - round - 1; + let debt = debt.saturating_sub(per_commit_budget * num_rounds); + (object_id, debt) + })) + } } // A wrapper around HashMap that uses refcounts to keep entries alive until diff --git a/crates/sui-core/src/authority/epoch_start_configuration.rs b/crates/sui-core/src/authority/epoch_start_configuration.rs index b3a1764a67de2..8d70bf3cdf569 100644 --- a/crates/sui-core/src/authority/epoch_start_configuration.rs +++ b/crates/sui-core/src/authority/epoch_start_configuration.rs @@ -38,6 +38,10 @@ pub trait EpochStartConfigTrait { self.flags() .contains(&EpochFlag::DataQuarantineFromBeginningOfEpoch) } + + fn use_commit_handler_v2(&self) -> bool { + self.flags().contains(&EpochFlag::UseCommitHandlerV2) + } } // IMPORTANT: Assign explicit values to each variant to ensure that the values are stable. @@ -67,9 +71,12 @@ pub enum EpochFlag { // beginning of the epoch. DataQuarantineFromBeginningOfEpoch = 9, + // This flag indicates whether the new commit handler is enabled. + UseCommitHandlerV2 = 10, + // Used for `test_epoch_flag_upgrade`. #[cfg(msim)] - DummyFlag = 10, + DummyFlag = 11, } impl EpochFlag { @@ -92,11 +99,18 @@ impl EpochFlag { } fn default_flags_impl() -> Vec { - vec![ - EpochFlag::DataQuarantineFromBeginningOfEpoch, - #[cfg(msim)] - EpochFlag::DummyFlag, - ] + let mut flags = vec![EpochFlag::DataQuarantineFromBeginningOfEpoch]; + + if std::env::var("SUI_USE_NEW_COMMIT_HANDLER").is_ok() { + flags.push(EpochFlag::UseCommitHandlerV2); + } + + #[cfg(msim)] + { + flags.push(EpochFlag::DummyFlag); + } + + flags } } @@ -134,6 +148,9 @@ impl fmt::Display for EpochFlag { EpochFlag::DataQuarantineFromBeginningOfEpoch => { write!(f, "DataQuarantineFromBeginningOfEpoch") } + EpochFlag::UseCommitHandlerV2 => { + write!(f, "UseCommitHandlerV2") + } #[cfg(msim)] EpochFlag::DummyFlag => { write!(f, "DummyFlag") diff --git a/crates/sui-core/src/authority/shared_object_congestion_tracker.rs b/crates/sui-core/src/authority/shared_object_congestion_tracker.rs index d6eb43388de9a..f1c4e73cd5acb 100644 --- a/crates/sui-core/src/authority/shared_object_congestion_tracker.rs +++ b/crates/sui-core/src/authority/shared_object_congestion_tracker.rs @@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig}; use sui_types::base_types::{ObjectID, TransactionDigest}; -use sui_types::error::SuiResult; use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::messages_consensus::Round; use sui_types::transaction::{Argument, SharedInputObject, TransactionDataAPI}; @@ -176,10 +175,10 @@ impl SharedObjectCongestionTracker { initial_object_debts: impl IntoIterator, protocol_config: &ProtocolConfig, for_randomness: bool, - ) -> SuiResult { + ) -> Self { let max_accumulated_txn_cost_per_object_in_commit = protocol_config.max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option(); - Ok(Self::new( + Self::new( initial_object_debts, protocol_config.per_object_congestion_control_mode(), for_randomness, @@ -198,7 +197,7 @@ impl SharedObjectCongestionTracker { protocol_config .allowed_txn_cost_overage_burst_per_object_in_commit_as_option() .unwrap_or(0), - )) + ) } // Given a list of shared input objects, returns the starting cost of a transaction that operates on diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index fe2f132ba65cd..912c4956c7258 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -383,7 +383,7 @@ impl ConsensusAdapter { epoch_store: &Arc, transactions: &[ConsensusTransaction], ) -> (impl Future, usize, usize, usize, usize) { - if transactions.iter().any(|tx| tx.is_user_transaction()) { + if transactions.iter().any(|tx| tx.is_mfp_transaction()) { // UserTransactions are generally sent to just one validator and should // be submitted to consensus without delay. return (tokio::time::sleep(Duration::ZERO), 0, 0, 0, 0); @@ -1000,7 +1000,7 @@ impl ConsensusAdapter { let consensus_keys: Vec<_> = transactions .iter() .filter_map(|t| { - if t.is_user_transaction() { + if t.is_mfp_transaction() { // UserTransaction is not inserted into the pending consensus transactions table. // Also UserTransaction shares the same key as CertifiedTransaction, so removing // the key here can have unexpected effects. @@ -1054,7 +1054,7 @@ impl ConsensusAdapter { ) -> (Vec, BlockStatusReceiver) { let ack_start = Instant::now(); let mut retries: u32 = 0; - let is_dkg = !transactions.is_empty() && transactions[0].kind.is_dkg(); + let is_dkg = !transactions.is_empty() && transactions[0].is_dkg(); let (consensus_positions, status_waiter) = loop { let span = debug_span!("client_submit"); diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 6b03cb8c62597..067c10f2a7952 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, hash::Hash, num::NonZeroUsize, sync::Arc, @@ -13,53 +13,72 @@ use arc_swap::ArcSwap; use consensus_config::Committee as ConsensusCommittee; use consensus_core::{CertifiedBlocksOutput, CommitConsumerMonitor, CommitIndex}; use consensus_types::block::TransactionIndex; +use fastcrypto_zkp::bn254::zk_login::{JwkId, JWK}; use itertools::Itertools as _; use lru::LruCache; -use mysten_common::{debug_fatal, random_util::randomize_cache_capacity_in_tests}; +use mysten_common::{ + assert_reachable, debug_fatal, in_test_configuration, + random_util::randomize_cache_capacity_in_tests, +}; use mysten_metrics::{ monitored_future, monitored_mpsc::{self, UnboundedReceiver}, monitored_scope, spawn_monitored_task, }; +use parking_lot::RwLockWriteGuard; use serde::{Deserialize, Serialize}; -use sui_macros::{fail_point, fail_point_if}; +use sui_macros::{fail_point, fail_point_arg, fail_point_if}; use sui_protocol_config::ProtocolConfig; use sui_types::{ authenticator_state::ActiveJwk, base_types::{ - AuthorityName, ConsensusObjectSequenceKey, EpochId, SequenceNumber, TransactionDigest, + AuthorityName, ConciseableName, ConsensusObjectSequenceKey, SequenceNumber, + TransactionDigest, }, + crypto::RandomnessRound, digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest}, executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction}, + messages_checkpoint::CheckpointSignatureMessage, messages_consensus::{ - AuthorityIndex, ConsensusDeterminedVersionAssignments, ConsensusPosition, - ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind, + AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments, + ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind, ExecutionTimeObservation, }, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait, - transaction::{SenderSignedData, VerifiedTransaction}, + transaction::{SenderSignedData, VerifiedCertificate, VerifiedTransaction}, }; -use tokio::task::JoinSet; -use tracing::{debug, error, info, instrument, trace_span, warn}; +use tokio::{sync::MutexGuard, task::JoinSet}; +use tracing::{debug, error, info, instrument, trace, warn}; use crate::{ authority::{ authority_per_epoch_store::{ - AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndices, + consensus_quarantine::ConsensusCommitOutput, AuthorityPerEpochStore, + CancelConsensusCertificateReason, ConsensusStats, ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStats, }, backpressure::{BackpressureManager, BackpressureSubscriber}, consensus_tx_status_cache::ConsensusTxStatus, epoch_start_configuration::EpochStartConfigTrait, - shared_object_version_manager::{AssignedTxAndVersions, Schedulable}, + execution_time_estimator::ExecutionTimeEstimator, + shared_object_congestion_tracker::SharedObjectCongestionTracker, + shared_object_version_manager::{AssignedTxAndVersions, Schedulable, SharedObjVerManager}, + transaction_deferral::{transaction_deferral_within_limit, DeferralKey, DeferralReason}, AuthorityMetrics, AuthorityState, ExecutionEnv, }, - checkpoints::{CheckpointService, CheckpointServiceNotify}, + checkpoints::{ + CheckpointService, CheckpointServiceNotify, PendingCheckpoint, PendingCheckpointInfo, + }, consensus_adapter::ConsensusAdapter, consensus_throughput_calculator::ConsensusThroughputCalculator, consensus_types::consensus_output_api::{parse_block_transactions, ConsensusCommitAPI}, + epoch::{ + randomness::{DkgStatus, RandomnessManager}, + reconfiguration::ReconfigState, + }, execution_cache::ObjectCacheRead, execution_scheduler::{ExecutionScheduler, SchedulingSource}, + post_consensus_tx_reorder::PostConsensusTxReorder, scoring_decision::update_low_scoring_authorities, traffic_controller::{policies::TrafficTally, TrafficController}, }; @@ -181,6 +200,7 @@ mod additional_consensus_state { pub(crate) fn observe_commit( &mut self, protocol_config: &ProtocolConfig, + epoch_start_time: u64, consensus_commit: &impl ConsensusCommitAPI, ) -> ConsensusCommitInfo { self.commit_interval_observer @@ -193,29 +213,54 @@ mod additional_consensus_state { protocol_config.min_checkpoint_interval_ms(), )); - ConsensusCommitInfo { - _phantom: PhantomData, - round: consensus_commit.leader_round(), - timestamp: consensus_commit.commit_timestamp_ms(), - consensus_commit_digest: consensus_commit.consensus_digest(protocol_config), - additional_state_digest: Some(self.digest()), - estimated_commit_period: Some(estimated_commit_period), - skip_consensus_commit_prologue_in_test: false, - } + info!("estimated commit rate: {:?}", estimated_commit_period); + + self.commit_info_impl( + epoch_start_time, + protocol_config, + consensus_commit, + Some(estimated_commit_period), + ) } pub(crate) fn stateless_commit_info( &self, + epoch_store: &AuthorityPerEpochStore, + consensus_commit: &impl ConsensusCommitAPI, + ) -> ConsensusCommitInfo { + let protocol_config = epoch_store.protocol_config(); + let epoch_start_time = epoch_store.epoch_start_config().epoch_start_timestamp_ms(); + self.commit_info_impl(epoch_start_time, protocol_config, consensus_commit, None) + } + + fn commit_info_impl( + &self, + epoch_start_time: u64, protocol_config: &ProtocolConfig, consensus_commit: &impl ConsensusCommitAPI, + estimated_commit_period: Option, ) -> ConsensusCommitInfo { + let leader_author = consensus_commit.leader_author_index(); + let timestamp = consensus_commit.commit_timestamp_ms(); + + let timestamp = if timestamp < epoch_start_time { + error!( + "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}" + ); + epoch_start_time + } else { + timestamp + }; + ConsensusCommitInfo { _phantom: PhantomData, round: consensus_commit.leader_round(), - timestamp: consensus_commit.commit_timestamp_ms(), + timestamp, + leader_author, + sub_dag_index: consensus_commit.commit_sub_dag_index(), consensus_commit_digest: consensus_commit.consensus_digest(protocol_config), - additional_state_digest: None, - estimated_commit_period: None, + additional_state_digest: Some(self.digest()), + estimated_commit_period, skip_consensus_commit_prologue_in_test: false, } } @@ -234,6 +279,8 @@ mod additional_consensus_state { pub round: u64, pub timestamp: u64, + pub leader_author: AuthorityIndex, + pub sub_dag_index: u64, pub consensus_commit_digest: ConsensusCommitDigest, additional_state_digest: Option, @@ -253,6 +300,8 @@ mod additional_consensus_state { _phantom: PhantomData, round: commit_round, timestamp: commit_timestamp, + leader_author: 0, + sub_dag_index: 0, consensus_commit_digest: ConsensusCommitDigest::default(), additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO), estimated_commit_period, @@ -488,7 +537,11 @@ mod additional_consensus_state { fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) { let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE(); - state.observe_commit(&protocol_config, &TestConsensusCommit { round, timestamp }); + state.observe_commit( + &protocol_config, + 100, + &TestConsensusCommit { round, timestamp }, + ); } let mut s1 = AdditionalConsensusState::new(3); @@ -611,6 +664,85 @@ impl ConsensusHandler { } } +#[derive(Default)] +struct CommitHandlerInput { + user_transactions: Vec, + capability_notifications: Vec, + execution_time_observations: Vec, + checkpoint_signature_messages: Vec, + randomness_dkg_messages: Vec<(AuthorityName, Vec)>, + randomness_dkg_confirmations: Vec<(AuthorityName, Vec)>, + end_of_publish_transactions: Vec, + new_jwks: Vec<(AuthorityName, JwkId, JWK)>, +} + +struct CommitHandlerState { + dkg_failed: bool, + randomness_round: Option, + output: ConsensusCommitOutput, + indirect_state_observer: Option, + initial_reconfig_state: ReconfigState, +} + +impl CommitHandlerState { + fn get_notifications(&self) -> Vec { + self.output + .get_consensus_messages_processed() + .cloned() + .collect() + } + + fn init_randomness<'a, 'epoch>( + &'a mut self, + epoch_store: &'epoch AuthorityPerEpochStore, + commit_info: &'a ConsensusCommitInfo, + ) -> Option> { + // DONE(commit-handler-rewrite): load randomness manager, get random round + let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| { + rm.try_lock() + .expect("should only ever be called from the commit handler thread") + }); + + let mut dkg_failed = false; + let randomness_round = if epoch_store.randomness_state_enabled() { + let randomness_manager = randomness_manager + .as_mut() + .expect("randomness manager should exist if randomness is enabled"); + match randomness_manager.dkg_status() { + DkgStatus::Pending => None, + DkgStatus::Failed => { + dkg_failed = true; + None + } + DkgStatus::Successful => { + // DONE(commit-handler-rewrite): do not reserve randomness if !should_accept_tx() + // Generate randomness for this commit if DKG is successful and we are still + // accepting certs. + if self.initial_reconfig_state.should_accept_tx() { + randomness_manager + // TODO: make infallible + .reserve_next_randomness(commit_info.timestamp, &mut self.output) + .expect("epoch ended") + } else { + None + } + } + } + } else { + None + }; + + if randomness_round.is_some() { + assert!(!dkg_failed); // invariant check + } + + self.randomness_round = randomness_round; + self.dkg_failed = dkg_failed; + + randomness_manager + } +} + impl ConsensusHandler { /// Called during startup to allow us to observe commits we previously processed, for crash recovery. /// Any state computed here must be a pure function of the commits observed, it cannot depend on any @@ -620,12 +752,1166 @@ impl ConsensusHandler { .epoch_store .protocol_config() .record_additional_state_digest_in_prologue()); - self.additional_consensus_state - .observe_commit(self.epoch_store.protocol_config(), &consensus_commit); + let protocol_config = self.epoch_store.protocol_config(); + let epoch_start_time = self + .epoch_store + .epoch_start_config() + .epoch_start_timestamp_ms(); + + self.additional_consensus_state.observe_commit( + protocol_config, + epoch_start_time, + &consensus_commit, + ); } - #[instrument(level = "debug", skip_all)] + #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))] + async fn handle_consensus_commit_v2(&mut self, consensus_commit: impl ConsensusCommitAPI) { + let protocol_config = self.epoch_store.protocol_config(); + + // Assert all protocol config settings for which we don't support old behavior. + assert!(protocol_config.ignore_execution_time_observations_after_certs_closed()); + assert!(protocol_config.record_time_estimate_processed()); + assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints()); + assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest()); + assert!(protocol_config.authority_capabilities_v2()); + assert!(protocol_config.cancel_for_failed_dkg_early()); + + // This may block until one of two conditions happens: + // - Number of uncommitted transactions in the writeback cache goes below the + // backpressure threshold. + // - The highest executed checkpoint catches up to the highest certified checkpoint. + self.backpressure_subscriber.await_no_backpressure().await; + + let epoch = self.epoch_store.epoch(); + + let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit"); + + let last_committed_round = self.last_consensus_stats.index.last_committed_round; + + // DONE(commit-handler-rewrite): Update last_committed_round stats + if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref() + { + consensus_tx_status_cache + .update_last_committed_leader_round(last_committed_round as u32) + .await; + } + if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() { + tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32); + } + + // DONE(commit-handler-rewrite): this will be unconditionally enabled in the rewrite + let commit_info = self.additional_consensus_state.observe_commit( + protocol_config, + self.epoch_store + .epoch_start_config() + .epoch_start_timestamp_ms(), + &consensus_commit, + ); + assert!(commit_info.round > last_committed_round); + + // DONE(commit-handler-rewrite): already not needed + + // DONE(commit-handler-rewrite): gather commit metadata + let (timestamp, leader_author, commit_sub_dag_index) = + self.gather_commit_metadata(&consensus_commit); + + info!( + %consensus_commit, + "Received consensus output" + ); + + // DONE(commit-handler-rewrite): update stats & ExecutionIndices + self.last_consensus_stats.index = ExecutionIndices { + last_committed_round: commit_info.round, + sub_dag_index: commit_sub_dag_index, + transaction_index: 0_u64, + }; + + // DONE(commit-handler-rewrite): update low scoring authorities + update_low_scoring_authorities( + self.low_scoring_authorities.clone(), + self.epoch_store.committee(), + &self.committee, + consensus_commit.reputation_score_sorted_desc(), + &self.metrics, + protocol_config.consensus_bad_nodes_stake_threshold(), + ); + + // DONE(commit-handler-rewrite): update metrics + self.metrics + .consensus_committed_subdags + .with_label_values(&[&leader_author.to_string()]) + .inc(); + + let mut state = CommitHandlerState { + output: ConsensusCommitOutput::new(commit_info.round), + dkg_failed: false, + randomness_round: None, + indirect_state_observer: Some(IndirectStateObserver::new()), + initial_reconfig_state: self + .epoch_store + .get_reconfig_state_read_lock_guard() + .clone(), + }; + + // DONE(commit-handler-rewrite): update transaction status (rejected/finalized) and update metrics + let transactions = self.filter_consensus_txns( + state.initial_reconfig_state.clone(), + &commit_info, + &consensus_commit, + ); + // DONE(commit-handler-rewrite): de-duplicate transactions + let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions); + + let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info); + + // DONE(commit-handler-rewrite): Split transactions into different types for processing. + let CommitHandlerInput { + user_transactions, + capability_notifications, + execution_time_observations, + checkpoint_signature_messages, + randomness_dkg_messages, + randomness_dkg_confirmations, + end_of_publish_transactions, + new_jwks, + } = self.build_commit_handler_input(transactions); + + self.process_jwks(&mut state, &commit_info, new_jwks); + self.process_capability_notifications(capability_notifications); + self.process_execution_time_observations(&mut state, execution_time_observations); + self.process_checkpoint_signature_messages(checkpoint_signature_messages); + + self.process_dkg_updates( + &mut state, + &commit_info, + randomness_manager.as_deref_mut(), + randomness_dkg_messages, + randomness_dkg_confirmations, + ) + .await; + + let mut execution_time_estimator = self + .epoch_store + .execution_time_estimator + .try_lock() + .expect("should only ever be called from the commit handler thread"); + + // DONE(commit-handler-rewrite): load and activate previous round's jwks + let authenticator_state_update_transaction = + self.create_authenticator_state_update(last_committed_round, &commit_info); + + let (schedulables, randomness_schedulables, assigned_versions) = self.process_transactions( + &mut state, + execution_time_estimator.as_mut(), + &commit_info, + authenticator_state_update_transaction, + user_transactions, + ); + + let (should_accept_tx, lock, final_round) = + self.handle_eop(&mut state, end_of_publish_transactions); + + let make_checkpoint = should_accept_tx || final_round; + if !make_checkpoint { + // No need for any further processing + // DONE(commit-handler-rewrite): do not insert commit prologue if !should_accept_tx() + // DONE(commit-handler-rewrite): commit prologue should not be added to roots after tx processing is closed + return; + } + + // DONE(commit-handler-rewrite): record execution time observations for next epoch + // If this is the final round, record execution time observations for storage in the + // end-of-epoch tx. + if final_round { + self.record_end_of_epoch_execution_time_observations(execution_time_estimator); + } + + self.create_pending_checkpoints( + &mut state, + &commit_info, + &schedulables, + &randomness_schedulables, + final_round, + ); + + let notifications = state.get_notifications(); + + // DONE(commit-handler-rewrite): record consensus_stats + state + .output + .record_consensus_commit_stats(self.last_consensus_stats.clone()); + + // DONE(commit-handler-rewrite): propogate deferral deletion to consensus output cache + self.record_deferral_deletion(&mut state); + + // DONE(commit-handler-rewrite): send consensus output to quarantine + self.epoch_store + .consensus_quarantine + .write() + .push_consensus_output(state.output, &self.epoch_store) + .expect("push_consensus_output should not fail"); + + // DONE(commit-handler-rewrite): notify checkpoint service + // Only after batch is written, notify checkpoint service to start building any new + // pending checkpoints. + debug!( + ?commit_info.round, + "Notifying checkpoint service about new pending checkpoint(s)", + ); + self.checkpoint_service + .notify_checkpoint() + .expect("failed to notify checkpoint service"); + + // DONE(commit-handler-rewrite): Once commit processing is recorded, kick off randomness generation. + if let Some(randomness_round) = state.randomness_round { + randomness_manager + .as_ref() + .expect("randomness manager should exist if randomness round is provided") + .generate_randomness(epoch, randomness_round); + } + + // DONE(commit-handler-rewrite): notify waiters that consensus transactions have been processed + self.epoch_store.process_notifications(notifications.iter()); + + // DONE(commit-handler-rewrite): log end of epoch + // pass lock by value to ensure that it is held until this point + self.log_final_round(lock, final_round); + + // DONE(commit-handler-rewrite): update throughput calculator + // update the calculated throughput + self.throughput_calculator + .add_transactions(timestamp, schedulables.len() as u64); + + // DONE(commit-handler-rewrite): fail points + fail_point_if!("correlated-crash-after-consensus-commit-boundary", || { + let key = [commit_sub_dag_index, epoch]; + if sui_simulator::random::deterministic_probability_once(&key, 0.01) { + sui_simulator::task::kill_current_node(None); + } + }); + + fail_point!("crash"); // for tests that produce random crashes + + // DONE(commit-handler-rewrite): enqueue transactions + let mut schedulables = schedulables; + schedulables.extend(randomness_schedulables); + self.execution_scheduler_sender.send( + schedulables, + assigned_versions, + SchedulingSource::NonFastPath, + ); + + // DONE(commit-handler-rewrite): Check if we should send EndOfPublish after processing consensus commit + self.send_end_of_publish_if_needed().await; + } + + fn handle_eop( + &self, + state: &mut CommitHandlerState, + end_of_publish_transactions: Vec, + ) -> (bool, Option>, bool) { + let collected_eop = + self.process_end_of_publish_transactions(state, end_of_publish_transactions); + if collected_eop { + // DONE(commit-handler-rewrite): [ssm] after 2f+1 EOPs, transition to RejectAllCerts + // DONE(commit-handler-rewrite): [ssm] check if epoch is over + let (lock, final_round) = self.advance_eop_state_machine(state); + // DONE(commit-handler-rewrite): check tx acceptance state + (lock.should_accept_tx(), Some(lock), final_round) + } else { + (true, None, false) + } + } + + fn record_end_of_epoch_execution_time_observations( + &self, + mut execution_time_estimator: MutexGuard>, + ) { + if let Some(estimator) = execution_time_estimator.as_mut() { + self.epoch_store + .end_of_epoch_execution_time_observations + .set(estimator.take_observations()) + .expect( + "`stored_execution_time_observations` should only be set once at end of epoch", + ); + } + } + + fn record_deferral_deletion(&self, state: &mut CommitHandlerState) { + let mut deferred_transactions = self + .epoch_store + .consensus_output_cache + .deferred_transactions_v2 + .lock(); + for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() { + deferred_transactions.remove(&deleted_deferred_key); + } + } + + fn log_final_round(&self, lock: Option>, final_round: bool) { + if final_round { + let epoch = self.epoch_store.epoch(); + info!( + ?epoch, + lock=?lock.as_ref(), + final_round=?final_round, + "Notified last checkpoint" + ); + self.epoch_store.record_end_of_message_quorum_time_metric(); + } + } + + fn create_pending_checkpoints( + &self, + state: &mut CommitHandlerState, + commit_info: &ConsensusCommitInfo, + schedulables: &[Schedulable], + randomness_schedulables: &[Schedulable], + final_round: bool, + ) { + // DONE(commit-handler-rewrite): Create pending checkpoints if we are still accepting tx. + let checkpoint_height = self + .epoch_store + .calculate_pending_checkpoint_height(commit_info.round); + + // DONE(commit-handler-rewrite): [ssm] write pending randomness checkpoint if we have a new randomness round, OR dkg failed while there are pending randomness txns + + // Determine whether to write pending checkpoint for user tx with randomness. + // - If randomness is not generated for this commit, we will skip the + // checkpoint with the associated height. Therefore checkpoint heights may + // not be contiguous. + // - Exception: if DKG fails, we always need to write out a PendingCheckpoint + // for randomness tx that are canceled. + let should_write_random_checkpoint = state.randomness_round.is_some() + || (state.dkg_failed && !randomness_schedulables.is_empty()); + + let pending_checkpoint = PendingCheckpoint { + // DONE(commit-handler-rewrite): compute checkpoint roots (this should be done at the end) + roots: schedulables.iter().map(|s| s.key()).collect(), + details: PendingCheckpointInfo { + timestamp_ms: commit_info.timestamp, + last_of_epoch: final_round && !should_write_random_checkpoint, + checkpoint_height, + }, + }; + self.epoch_store + .write_pending_checkpoint(&mut state.output, &pending_checkpoint) + .expect("failed to write pending checkpoint"); + + if should_write_random_checkpoint { + let pending_checkpoint = PendingCheckpoint { + roots: randomness_schedulables.iter().map(|s| s.key()).collect(), + details: PendingCheckpointInfo { + timestamp_ms: commit_info.timestamp, + last_of_epoch: final_round, + checkpoint_height: checkpoint_height + 1, + }, + }; + self.epoch_store + .write_pending_checkpoint(&mut state.output, &pending_checkpoint) + .expect("failed to write pending checkpoint"); + } + } + + fn process_transactions( + &self, + state: &mut CommitHandlerState, + execution_time_estimator: Option<&mut ExecutionTimeEstimator>, + commit_info: &ConsensusCommitInfo, + authenticator_state_update_transaction: Option, + user_transactions: Vec, + ) -> (Vec, Vec, AssignedTxAndVersions) { + let protocol_config = self.epoch_store.protocol_config(); + let epoch = self.epoch_store.epoch(); + + // Get the ordered set of all transactions to process, which includes deferred and + // newly arrived transactions. + let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) = + self.merge_and_reorder_transactions(state, commit_info, user_transactions); + + // DONE(commit-handler-rewrite): initialize congestion trackers + let mut shared_object_congestion_tracker = + self.init_congestion_tracker(commit_info, false, &ordered_txns); + let mut shared_object_using_randomness_congestion_tracker = + self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns); + + // DONE(commit-handler-rewrite): add randomness state update transaction + let randomness_state_update_transaction = state + .randomness_round + .map(|round| Schedulable::RandomnessStateUpdate(epoch, round)); + debug!( + "Randomness state update transaction: {:?}", + randomness_state_update_transaction + .as_ref() + .map(|t| t.key()) + ); + + let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len()); + let mut randomness_transactions_to_schedule = + Vec::with_capacity(ordered_randomness_txns.len()); + let mut deferred_txns = BTreeMap::new(); + let mut cancelled_txns = BTreeMap::new(); + + for transaction in ordered_txns { + self.handle_deferral_and_cancellation( + state, + &mut cancelled_txns, + &mut deferred_txns, + &mut transactions_to_schedule, + protocol_config, + commit_info, + transaction, + &mut shared_object_congestion_tracker, + &previously_deferred_tx_digests, + execution_time_estimator.as_deref(), + ); + } + + for transaction in ordered_randomness_txns { + // DONE(commit-handler-rewrite): cancel randomness-using txns if DKG failed + if state.dkg_failed { + debug!( + "Canceling randomness-using transaction {:?} because DKG failed", + transaction.digest(), + ); + cancelled_txns.insert( + *transaction.digest(), + CancelConsensusCertificateReason::DkgFailed, + ); + // DONE(commit-handler-rewrite): cancelled txns must be scheduled for execution + randomness_transactions_to_schedule.push(transaction); + continue; + } + self.handle_deferral_and_cancellation( + state, + &mut cancelled_txns, + &mut deferred_txns, + &mut randomness_transactions_to_schedule, + protocol_config, + commit_info, + transaction, + // DONE(commit-handler-rewrite): use correct congestion tracker for randomness vs non-randomness + &mut shared_object_using_randomness_congestion_tracker, + &previously_deferred_tx_digests, + execution_time_estimator.as_deref(), + ); + } + + // DONE(commit-handler-rewrite): add deferred transactions to consensus output + let mut total_deferred_txns = 0; + { + let mut deferred_transactions = self + .epoch_store + .consensus_output_cache + .deferred_transactions_v2 + .lock(); + for (key, txns) in deferred_txns.into_iter() { + total_deferred_txns += txns.len(); + deferred_transactions.insert(key, txns.clone()); + state.output.defer_transactions_v2(key, txns); + } + } + + // DONE(commit-handler-rewrite): update metrics + self.metrics + .consensus_handler_deferred_transactions + .inc_by(total_deferred_txns as u64); + self.metrics + .consensus_handler_cancelled_transactions + .inc_by(cancelled_txns.len() as u64); + self.metrics + .consensus_handler_max_object_costs + .with_label_values(&["regular_commit"]) + .set(shared_object_congestion_tracker.max_cost() as i64); + self.metrics + .consensus_handler_max_object_costs + .with_label_values(&["randomness_commit"]) + .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64); + + // DONE(commit-handler-rewrite): gather object debts, send them to ExecutionTimeObserver + let object_debts = shared_object_congestion_tracker.accumulated_debts(commit_info); + let randomness_object_debts = + shared_object_using_randomness_congestion_tracker.accumulated_debts(commit_info); + if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get() { + if let Err(e) = tx_object_debts.try_send( + object_debts + .iter() + .chain(randomness_object_debts.iter()) + .map(|(id, _)| *id) + .collect(), + ) { + info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}"); + } + } + + // DONE(commit-handler-rewrite): commit object debts to output + state + .output + .set_congestion_control_object_debts(object_debts); + state + .output + .set_congestion_control_randomness_object_debts(randomness_object_debts); + + // DONE(commit-handler-rewrite): create accumulator settlement transactions + let mut settlement = None; + let mut randomness_settlement = None; + if self.epoch_store.accumulators_enabled() { + let checkpoint_height = self + .epoch_store + .calculate_pending_checkpoint_height(commit_info.round); + + settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height)); + + if state.randomness_round.is_some() { + randomness_settlement = Some(Schedulable::AccumulatorSettlement( + epoch, + checkpoint_height + 1, + )); + } + } + + // DONE(commit-handler-rewrite): add the consensus commit prologue transaction and root + let consensus_commit_prologue = self.add_consensus_commit_prologue_transaction( + state, + commit_info, + transactions_to_schedule + .iter() + .map(Schedulable::Transaction), + &cancelled_txns, + ); + + let schedulables: Vec<_> = itertools::chain!( + consensus_commit_prologue.into_iter(), + authenticator_state_update_transaction.into_iter(), + transactions_to_schedule.into_iter(), + ) + .map(Schedulable::Transaction) + .chain(settlement) + .collect(); + + let randomness_schedulables: Vec<_> = randomness_state_update_transaction + .into_iter() + .chain( + randomness_transactions_to_schedule + .into_iter() + .map(Schedulable::Transaction), + ) + .chain(randomness_settlement) + .collect(); + + // DONE(commit-handler-rewrite): assign shared object versions + let assigned_versions = self + .epoch_store + .process_consensus_transaction_shared_object_versions( + self.cache_reader.as_ref(), + schedulables.iter(), + randomness_schedulables.iter(), + &cancelled_txns, + &mut state.output, + ) + .expect("failed to assign shared object versions"); + + // DONE(commit-handler-rewrite): store all user signatures for use by checkpoint builder + self.epoch_store + .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter())); + + (schedulables, randomness_schedulables, assigned_versions) + } + + // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update + // the system clock used in all transactions in the current consensus commit. + // Returns the root of the consensus commit prologue transaction if it was added to the input. + fn add_consensus_commit_prologue_transaction<'a>( + &'a self, + state: &'a mut CommitHandlerState, + commit_info: &'a ConsensusCommitInfo, + schedulables: impl Iterator>, + cancelled_txns: &'a BTreeMap, + ) -> Option { + { + if commit_info.skip_consensus_commit_prologue_in_test { + return None; + } + } + + let mut version_assignment = Vec::new(); + let mut shared_input_next_version = HashMap::new(); + for txn in schedulables { + let key = txn.key(); + match key.as_digest().and_then(|d| cancelled_txns.get(d)) { + Some(CancelConsensusCertificateReason::CongestionOnObjects(_)) + | Some(CancelConsensusCertificateReason::DkgFailed) => { + assert_reachable!("cancelled transactions"); + let assigned_versions = SharedObjVerManager::assign_versions_for_certificate( + &self.epoch_store, + &txn, + &mut shared_input_next_version, + cancelled_txns, + ); + version_assignment.push(( + *key.unwrap_digest(), + assigned_versions.shared_object_versions, + )); + } + None => {} + } + } + + fail_point_arg!( + "additional_cancelled_txns_for_tests", + |additional_cancelled_txns: Vec<( + TransactionDigest, + Vec<(ConsensusObjectSequenceKey, SequenceNumber)> + )>| { + version_assignment.extend(additional_cancelled_txns); + } + ); + + let transaction = commit_info.create_consensus_commit_prologue_transaction( + self.epoch_store.epoch(), + self.epoch_store.protocol_config(), + version_assignment, + commit_info, + state.indirect_state_observer.take().unwrap(), + ); + Some(transaction) + } + + fn handle_deferral_and_cancellation( + &self, + state: &mut CommitHandlerState, + cancelled_txns: &mut BTreeMap, + deferred_txns: &mut BTreeMap>, + scheduled_txns: &mut Vec, + protocol_config: &ProtocolConfig, + commit_info: &ConsensusCommitInfo, + transaction: VerifiedExecutableTransaction, + shared_object_congestion_tracker: &mut SharedObjectCongestionTracker, + previously_deferred_tx_digests: &HashMap, + execution_time_estimator: Option<&ExecutionTimeEstimator>, + ) { + // DONE(commit-handler-rewrite): check if transaction should be deferred + let tx_cost = shared_object_congestion_tracker.get_tx_cost( + execution_time_estimator, + &transaction, + state.indirect_state_observer.as_mut().unwrap(), + ); + + let deferral_info = self.epoch_store.should_defer( + tx_cost, + &transaction, + commit_info, + state.dkg_failed, + state.randomness_round.is_some(), + previously_deferred_tx_digests, + shared_object_congestion_tracker, + ); + + if let Some((deferral_key, deferral_reason)) = deferral_info { + debug!( + "Deferring consensus certificate for transaction {:?} until {:?}", + transaction.digest(), + deferral_key + ); + + match deferral_reason { + // DONE(commit-handler-rewrite): Always defer transaction due to randomness not ready. + DeferralReason::RandomnessNotReady => { + deferred_txns + .entry(deferral_key) + .or_default() + .push(transaction); + } + DeferralReason::SharedObjectCongestion(congested_objects) => { + self.metrics.consensus_handler_congested_transactions.inc(); + // DONE(commit-handler-rewrite): when deferral limit is exceeded, cancel the transaction + if transaction_deferral_within_limit( + &deferral_key, + protocol_config.max_deferral_rounds_for_congestion_control(), + ) { + deferred_txns + .entry(deferral_key) + .or_default() + .push(transaction); + } else { + // Cancel the transaction that has been deferred for too long. + debug!( + "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}", + transaction.digest(), + deferral_key, + congested_objects + ); + cancelled_txns.insert( + *transaction.digest(), + CancelConsensusCertificateReason::CongestionOnObjects( + congested_objects, + ), + ); + // DONE(commit-handler-rewrite): cancelled txns must be scheduled for execution + scheduled_txns.push(transaction); + } + } + } + } else { + // DONE(commit-handler-rewrite): update object execution cost for all scheduled transactions + // This certificate will be scheduled. Update object execution cost. + shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, &transaction); + scheduled_txns.push(transaction); + } + } + + fn merge_and_reorder_transactions( + &self, + state: &mut CommitHandlerState, + commit_info: &ConsensusCommitInfo, + user_transactions: Vec, + ) -> ( + Vec, + Vec, + HashMap, + ) { + let protocol_config = self.epoch_store.protocol_config(); + + // DONE(commit-handler-rewrite): deferred transactions have higher priority than new txns (if gas price is equal) + let (mut txns, mut randomness_txns, previously_deferred_tx_digests) = + self.load_deferred_transactions(state, commit_info); + + txns.reserve(user_transactions.len()); + randomness_txns.reserve(user_transactions.len()); + + for txn in user_transactions { + if txn.transaction_data().uses_randomness() { + randomness_txns.push(txn); + } else { + txns.push(txn); + } + } + + // DONE(commit-handler-rewrite): reorder transactions by gas price + PostConsensusTxReorder::reorder_v2( + &mut txns, + protocol_config.consensus_transaction_ordering(), + ); + PostConsensusTxReorder::reorder_v2( + &mut randomness_txns, + protocol_config.consensus_transaction_ordering(), + ); + + (txns, randomness_txns, previously_deferred_tx_digests) + } + + fn load_deferred_transactions( + &self, + state: &mut CommitHandlerState, + commit_info: &ConsensusCommitInfo, + ) -> ( + Vec, + Vec, + HashMap, + ) { + let mut previously_deferred_tx_digests = HashMap::new(); + + // DONE(commit-handler-rewrite): Load transactions deferred from previous commits, compute the digest set of all such transactions. + let deferred_txs: Vec<_> = self + .epoch_store + .load_deferred_transactions_for_up_to_consensus_round_v2( + &mut state.output, + commit_info.round, + ) + .expect("db error") + .into_iter() + .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx))) + .map(|(key, tx)| { + previously_deferred_tx_digests.insert(*tx.digest(), key); + tx + }) + .collect(); + trace!( + "loading deferred transactions: {:?}", + deferred_txs.iter().map(|tx| tx.digest()) + ); + + // DONE(commit-handler-rewrite): load all deferred randomness-using txns + let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() { + let txns: Vec<_> = self + .epoch_store + .load_deferred_transactions_for_randomness_v2(&mut state.output) + .expect("db error") + .into_iter() + .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx))) + .map(|(key, tx)| { + previously_deferred_tx_digests.insert(*tx.digest(), key); + tx + }) + .collect(); + trace!( + "loading deferred randomness transactions: {:?}", + txns.iter().map(|tx| tx.digest()) + ); + txns + } else { + vec![] + }; + + ( + deferred_txs, + deferred_randomness_txs, + previously_deferred_tx_digests, + ) + } + + fn init_congestion_tracker( + &self, + commit_info: &ConsensusCommitInfo, + for_randomness: bool, + txns: &[VerifiedExecutableTransaction], + ) -> SharedObjectCongestionTracker { + SharedObjectCongestionTracker::from_protocol_config( + self.epoch_store + .consensus_quarantine + .read() + .load_initial_object_debts_v2( + &self.epoch_store, + commit_info.round, + for_randomness, + txns, + ) + .expect("db error"), + self.epoch_store.protocol_config(), + for_randomness, + ) + } + + fn process_jwks( + &self, + state: &mut CommitHandlerState, + commit_info: &ConsensusCommitInfo, + new_jwks: Vec<(AuthorityName, JwkId, JWK)>, + ) { + // DONE(commit-handler-rewrite): [ssm] record jwk votes + for (authority_name, jwk_id, jwk) in new_jwks { + self.epoch_store.record_jwk_vote( + &mut state.output, + commit_info.round, + authority_name, + &jwk_id, + &jwk, + ); + } + } + + fn process_capability_notifications( + &self, + capability_notifications: Vec, + ) { + // DONE(commit-handler-rewrite): [ssm] record authority capabilities + for capabilities in capability_notifications { + self.epoch_store + .record_capabilities_v2(&capabilities) + .expect("db error"); + } + } + + fn process_execution_time_observations( + &self, + state: &mut CommitHandlerState, + execution_time_observations: Vec, + ) { + // DONE(commit-handler-rewrite): [ssm] Process new execution time observations for use by congestion control. + let mut execution_time_estimator = self + .epoch_store + .execution_time_estimator + .try_lock() + .expect("should only ever be called from the commit handler thread"); + + for ExecutionTimeObservation { + authority, + generation, + estimates, + } in execution_time_observations + { + let Some(estimator) = execution_time_estimator.as_mut() else { + error!("dropping ExecutionTimeObservation from possibly-Byzantine authority {authority:?} sent when ExecutionTimeEstimate mode is not enabled"); + continue; + }; + let authority_index = self + .epoch_store + .committee() + .authority_index(&authority) + .unwrap(); + estimator.process_observations_from_consensus( + authority_index, + Some(generation), + &estimates, + ); + state + .output + .insert_execution_time_observation(authority_index, generation, estimates); + } + } + + fn process_checkpoint_signature_messages( + &self, + checkpoint_signature_messages: Vec, + ) { + // DONE(commit-handler-rewrite): [ssm] notify checkpoint signatures + for checkpoint_signature_message in checkpoint_signature_messages { + self.checkpoint_service + .notify_checkpoint_signature(&self.epoch_store, &checkpoint_signature_message) + .expect("db error"); + } + } + + async fn process_dkg_updates( + &self, + state: &mut CommitHandlerState, + commit_info: &ConsensusCommitInfo, + randomness_manager: Option<&mut RandomnessManager>, + randomness_dkg_messages: Vec<(AuthorityName, Vec)>, + randomness_dkg_confirmations: Vec<(AuthorityName, Vec)>, + ) { + if !self.epoch_store.randomness_state_enabled() { + debug_fatal!( + "received {} RandomnessDkgConfirmation messages when randomness is not enabled", + randomness_dkg_confirmations.len() + ); + return; + } + + let randomness_manager = + randomness_manager.expect("randomness manager should exist if randomness is enabled"); + + let randomness_dkg_updates = + self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages); + + let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations( + state, + randomness_manager, + randomness_dkg_confirmations, + ); + + // DONE(commit-handler-rewrite): [ssm] advance randomness state if needed + if randomness_dkg_updates || randomness_dkg_confirmation_updates { + randomness_manager + .advance_dkg(&mut state.output, commit_info.round) + .await + .expect("epoch ended"); + } + } + + fn process_randomness_dkg_messages( + &self, + randomness_manager: &mut RandomnessManager, + randomness_dkg_messages: Vec<(AuthorityName, Vec)>, + ) -> bool /* randomness state updated */ { + if randomness_dkg_messages.is_empty() { + return false; + } + + // DONE(commit-handler-rewrite): [ssm] process dkg message + let mut randomness_state_updated = false; + for (authority, bytes) in randomness_dkg_messages { + match bcs::from_bytes(&bytes) { + Ok(message) => { + randomness_manager + .add_message(&authority, message) + // TODO: make infallible + .expect("epoch ended"); + randomness_state_updated = true; + } + + Err(e) => { + warn!( + "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}", + authority.concise(), + ); + } + } + } + + randomness_state_updated + } + + fn process_randomness_dkg_confirmations( + &self, + state: &mut CommitHandlerState, + randomness_manager: &mut RandomnessManager, + randomness_dkg_confirmations: Vec<(AuthorityName, Vec)>, + ) -> bool /* randomness state updated */ { + if randomness_dkg_confirmations.is_empty() { + return false; + } + + // DONE(commit-handler-rewrite): [ssm] process dkg confirmation + let mut randomness_state_updated = false; + for (authority, bytes) in randomness_dkg_confirmations { + match bcs::from_bytes(&bytes) { + Ok(message) => { + randomness_manager + .add_confirmation(&mut state.output, &authority, message) + // TODO: make infallible + .expect("epoch ended"); + randomness_state_updated = true; + } + Err(e) => { + warn!( + "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}", + authority.concise(), + ); + } + } + } + + randomness_state_updated + } + + /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round). + fn process_end_of_publish_transactions( + &self, + state: &mut CommitHandlerState, + end_of_publish_transactions: Vec, + ) -> bool { + let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect( + "No contention on end_of_publish as it is only accessed from consensus handler", + ); + + if eop_aggregator.has_quorum() { + return true; + } + + if end_of_publish_transactions.is_empty() { + return false; + } + + for authority in end_of_publish_transactions { + info!("Received EndOfPublish from {:?}", authority.concise()); + + // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state + // And this function itself is always executed from consensus task + state.output.insert_end_of_publish(authority); + if eop_aggregator + .insert_generic(authority, ()) + .is_quorum_reached() + { + debug!( + "Collected enough end_of_publish messages with last message from validator {:?}", + authority.concise(), + ); + return true; + } + } + + false + } + + /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch + /// ends. + fn advance_eop_state_machine( + &self, + state: &mut CommitHandlerState, + ) -> ( + RwLockWriteGuard, + bool, // true if final round + ) { + let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard(); + let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx(); + + reconfig_state.close_all_certs(); + + let commit_has_deferred_txns = state.output.has_deferred_transactions(); + let previous_commits_have_deferred_txns = + !self.epoch_store.deferred_transactions_empty_v2(); + + // DONE(commit-handler-rewrite): [ssm] if we are rejecting all certs, AND there are no deferred transactions to process, transition to RejectAllTx + if !commit_has_deferred_txns && !previous_commits_have_deferred_txns { + if !start_state_is_reject_all_tx { + info!("Transitioning to RejectAllTx"); + } + reconfig_state.close_all_tx(); + } else { + debug!( + "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}", + previous_commits_have_deferred_txns, + commit_has_deferred_txns, + ); + } + + state.output.store_reconfig_state(reconfig_state.clone()); + + // DONE(commit-handler-rewrite): [ssm] only return final_round=true on the first round where we transition to RejectAllTx + if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() { + (reconfig_state, true) + } else { + (reconfig_state, false) + } + } + + fn gather_commit_metadata( + &self, + consensus_commit: &impl ConsensusCommitAPI, + ) -> (u64, AuthorityIndex, u64) { + let timestamp = consensus_commit.commit_timestamp_ms(); + let leader_author = consensus_commit.leader_author_index(); + let commit_sub_dag_index = consensus_commit.commit_sub_dag_index(); + + let system_time_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64); + let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0; + self.metrics + .consensus_timestamp_bias + .observe(consensus_timestamp_bias_seconds); + + let epoch_start = self + .epoch_store + .epoch_start_config() + .epoch_start_timestamp_ms(); + let timestamp = if timestamp < epoch_start { + error!( + "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}" + ); + epoch_start + } else { + timestamp + }; + + (timestamp, leader_author, commit_sub_dag_index) + } + + #[instrument(level = "debug", skip_all, fields(round = consensus_commit.leader_round()))] async fn handle_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) { + // Migration for commit handler v2: + // - In test configurations, some validators use v1, some use v2. This exposes fork + // bugs in tests. + // - In production, if the protocol flag is set, all validators must switch to v2 atomically. + // This is likely not necessary, since v2 has been extensively tested for compatibility + // with v1. But we do it anyway as a safety measure, in case there are undiscovered forking + // bugs in v2. + // - If the protocol flag is not set, a validator can still use the new commit handler by + // setting `SUI_USE_NEW_COMMIT_HANDLER` in the environment. This is so that we can test the + // new commit handler in prod before it is fully deployed. + let use_new_commit_handler = if in_test_configuration() { + let name = self.epoch_store.name; + let authority_index = self.epoch_store.committee().authority_index(&name).unwrap(); + authority_index < 2 + } else if self.epoch_store.protocol_config().use_new_commit_handler() { + true + } else { + self.epoch_store + .epoch_start_config() + .use_commit_handler_v2() + }; + + if use_new_commit_handler { + self.handle_consensus_commit_v2(consensus_commit).await; + return; + } + // This may block until one of two conditions happens: // - Number of uncommitted transactions in the writeback cache goes below the // backpressure threshold. @@ -636,6 +1922,7 @@ impl ConsensusHandler { let last_committed_round = self.last_consensus_stats.index.last_committed_round; + // TODO(commit-handler-rewrite): Update last_committed_round stats if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref() { consensus_tx_status_cache @@ -646,24 +1933,26 @@ impl ConsensusHandler { tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32); } + // TODO(commit-handler-rewrite): this will be unconditionally enabled in the rewrite let commit_info = if self .epoch_store .protocol_config() .record_additional_state_digest_in_prologue() { - let commit_info = self - .additional_consensus_state - .observe_commit(self.epoch_store.protocol_config(), &consensus_commit); - info!( - "estimated commit rate: {:?}", - commit_info.estimated_commit_period() + let commit_info = self.additional_consensus_state.observe_commit( + self.epoch_store.protocol_config(), + self.epoch_store + .epoch_start_config() + .epoch_start_timestamp_ms(), + &consensus_commit, ); commit_info } else { self.additional_consensus_state - .stateless_commit_info(self.epoch_store.protocol_config(), &consensus_commit) + .stateless_commit_info(&self.epoch_store, &consensus_commit) }; + // TODO(commit-handler-rewrite): already not needed // TODO: Remove this once narwhal is deprecated. For now mysticeti will not return // more than one leader per round so we are not in danger of ignoring any commits. assert!(commit_info.round >= last_committed_round); @@ -678,8 +1967,9 @@ impl ConsensusHandler { return; } + // TODO(commit-handler-rewrite): gather commit metadata + /* (transaction, serialized length) */ - let epoch = self.epoch_store.epoch(); let mut transactions = vec![]; let timestamp = consensus_commit.commit_timestamp_ms(); let leader_author = consensus_commit.leader_author_index(); @@ -716,6 +2006,7 @@ impl ConsensusHandler { "Received consensus output" ); + // TODO(commit-handler-rewrite): update stats & ExecutionIndices let execution_index = ExecutionIndices { last_committed_round: commit_info.round, sub_dag_index: commit_sub_dag_index, @@ -730,6 +2021,8 @@ impl ConsensusHandler { // round and subdag index in the last_consensus_stats, so that it won't be re-executed in the future. self.last_consensus_stats.index = execution_index; + // TODO(commit-handler-rewrite): load and activate previous round's jwks + // Load all jwks that became active in the previous round, and commit them in this round. // We want to delay one round because none of the transactions in the previous round could // have been authenticated with the jwks that became active in that round. @@ -743,8 +2036,11 @@ impl ConsensusHandler { .expect("Unrecoverable error in consensus handler"); if !new_jwks.is_empty() { - let authenticator_state_update_transaction = - self.authenticator_state_update_transaction(commit_info.round, new_jwks); + let authenticator_state_update_transaction = authenticator_state_update_transaction( + &self.epoch_store, + commit_info.round, + new_jwks, + ); debug!( "adding AuthenticatorStateUpdate({:?}) tx: {:?}", authenticator_state_update_transaction.digest(), @@ -757,6 +2053,7 @@ impl ConsensusHandler { )); } + // TODO(commit-handler-rewrite): update low scoring authorities update_low_scoring_authorities( self.low_scoring_authorities.clone(), self.epoch_store.committee(), @@ -768,15 +2065,20 @@ impl ConsensusHandler { .consensus_bad_nodes_stake_threshold(), ); + // TODO(commit-handler-rewrite): update metrics self.metrics .consensus_committed_subdags .with_label_values(&[&leader_author.to_string()]) .inc(); + let epoch = self.epoch_store.epoch(); + + // TODO(commit-handler-rewrite): update transaction status (rejected/finalized) and update metrics let mut num_finalized_user_transactions = vec![0; self.committee.size()]; let mut num_rejected_user_transactions = vec![0; self.committee.size()]; { - let span = trace_span!("ConsensusHandler::HandleCommit::process_consensus_txns"); + let span = + tracing::trace_span!("ConsensusHandler::HandleCommit::process_consensus_txns"); let _guard = span.enter(); for (block, parsed_transactions) in consensus_commit.transactions() { let author = block.author.value(); @@ -834,7 +2136,7 @@ impl ConsensusHandler { } if parsed.rejected { - if parsed.transaction.kind.is_user_transaction() { + if parsed.transaction.is_mfp_transaction() { self.epoch_store .set_consensus_tx_status(position, ConsensusTxStatus::Rejected); num_rejected_user_transactions[author] += 1; @@ -842,7 +2144,7 @@ impl ConsensusHandler { // Skip processing rejected transactions. continue; } - if parsed.transaction.kind.is_user_transaction() { + if parsed.transaction.is_mfp_transaction() { self.epoch_store .set_consensus_tx_status(position, ConsensusTxStatus::Finalized); num_finalized_user_transactions[author] += 1; @@ -908,6 +2210,7 @@ impl ConsensusHandler { .set(num_rejected_user_transactions[i.value()] as i64); } + // TODO(commit-handler-rewrite): de-duplicate transactions let mut all_transactions = Vec::new(); { // We need a set here as well, since the processed_cache is a LRU cache and can drop @@ -971,10 +2274,12 @@ impl ConsensusHandler { .await .expect("Unrecoverable error in consensus handler"); + // TODO(commit-handler-rewrite): update throughput calculator // update the calculated throughput self.throughput_calculator .add_transactions(timestamp, executable_transactions.len() as u64); + // TODO(commit-handler-rewrite): fail points fail_point_if!("correlated-crash-after-consensus-commit-boundary", || { let key = [commit_sub_dag_index, self.epoch_store.epoch()]; if sui_simulator::random::deterministic_probability_once(&key, 0.01) { @@ -984,16 +2289,486 @@ impl ConsensusHandler { fail_point!("crash"); // for tests that produce random crashes + // TODO(commit-handler-rewrite): enqueue transactions self.execution_scheduler_sender.send( executable_transactions, assigned_versions, SchedulingSource::NonFastPath, ); - // Check if we should send EndOfPublish after processing consensus commit + // TODO(commit-handler-rewrite): Check if we should send EndOfPublish after processing consensus commit self.send_end_of_publish_if_needed().await; } + fn create_authenticator_state_update( + &self, + last_committed_round: u64, + commit_info: &ConsensusCommitInfo, + ) -> Option { + // Load all jwks that became active in the previous round, and commit them in this round. + // We want to delay one round because none of the transactions in the previous round could + // have been authenticated with the jwks that became active in that round. + // + // Because of this delay, jwks that become active in the last round of the epoch will + // never be committed. That is ok, because in the new epoch, the validators should + // immediately re-submit these jwks, and they can become active then. + let new_jwks = self + .epoch_store + .get_new_jwks(last_committed_round) + .expect("Unrecoverable error in consensus handler"); + + if !new_jwks.is_empty() { + let authenticator_state_update_transaction = authenticator_state_update_transaction( + &self.epoch_store, + commit_info.round, + new_jwks, + ); + debug!( + "adding AuthenticatorStateUpdate({:?}) tx: {:?}", + authenticator_state_update_transaction.digest(), + authenticator_state_update_transaction, + ); + + Some(authenticator_state_update_transaction) + } else { + None + } + } + + // Filters out rejected or deprecated transactions. + #[instrument(level = "trace", skip_all)] + fn filter_consensus_txns( + &mut self, + initial_reconfig_state: ReconfigState, + commit_info: &ConsensusCommitInfo, + consensus_commit: &impl ConsensusCommitAPI, + ) -> Vec<(SequencedConsensusTransactionKind, u32)> { + let mut transactions = Vec::new(); + let epoch = self.epoch_store.epoch(); + let mut num_finalized_user_transactions = vec![0; self.committee.size()]; + let mut num_rejected_user_transactions = vec![0; self.committee.size()]; + for (block, parsed_transactions) in consensus_commit.transactions() { + let author = block.author.value(); + // TODO: consider only messages within 1~3 rounds of the leader? + self.last_consensus_stats.stats.inc_num_messages(author); + + // Set the "ping" transaction status for this block. This is ncecessary as there might be some ping requests waiting for the ping transaction to be certified. + self.epoch_store.set_consensus_tx_status( + ConsensusPosition::ping(epoch, block), + ConsensusTxStatus::Finalized, + ); + + for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() { + let position = ConsensusPosition { + epoch, + block, + index: tx_index as TransactionIndex, + }; + + // Transaction has appeared in consensus output, we can increment the submission count + // for this tx for DoS protection. + if self.epoch_store.protocol_config().mysticeti_fastpath() { + if let ConsensusTransactionKind::UserTransaction(tx) = &parsed.transaction.kind + { + let digest = tx.digest(); + if let Some((spam_weight, submitter_client_addrs)) = self + .epoch_store + .submitted_transaction_cache + .increment_submission_count(digest) + { + if let Some(ref traffic_controller) = self.traffic_controller { + debug!( + "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses", + submitter_client_addrs.len() + ); + + // Apply spam weight to all client addresses that submitted this transaction + for addr in submitter_client_addrs { + traffic_controller.tally(TrafficTally::new( + Some(addr), + None, + None, + spam_weight.clone(), + )); + } + } else { + warn!( + "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)", + submitter_client_addrs.len() + ); + } + } + } + } + + if parsed.rejected { + // TODO(fastpath): Add metrics for rejected transactions. + if matches!( + parsed.transaction.kind, + ConsensusTransactionKind::UserTransaction(_) + ) { + self.epoch_store + .set_consensus_tx_status(position, ConsensusTxStatus::Rejected); + num_rejected_user_transactions[author] += 1; + } + // Skip processing rejected transactions. + // TODO(fastpath): Handle unlocking. + continue; + } + if matches!( + parsed.transaction.kind, + ConsensusTransactionKind::UserTransaction(_) + ) { + self.epoch_store + .set_consensus_tx_status(position, ConsensusTxStatus::Finalized); + num_finalized_user_transactions[author] += 1; + } + let kind = classify(&parsed.transaction); + self.metrics + .consensus_handler_processed + .with_label_values(&[kind]) + .inc(); + self.metrics + .consensus_handler_transaction_sizes + .with_label_values(&[kind]) + .observe(parsed.serialized_len as f64); + // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config. + if matches!( + &parsed.transaction.kind, + ConsensusTransactionKind::CertifiedTransaction(_) + | ConsensusTransactionKind::UserTransaction(_) + ) { + self.last_consensus_stats + .stats + .inc_num_user_transactions(author); + } + + // DONE(commit-handler-rewrite): ignored external transactions must not be recorded as processed. + if !initial_reconfig_state.should_accept_consensus_certs() { + // DONE(commit-handler-rewrite): ignore txns due to !should_accept_consensus_certs(), unless they were previously deferred + // (Note: we no longer need to worry about the previously deferred condition, since we are only + // processing newly-received transactions at this time). + match &parsed.transaction.kind { + ConsensusTransactionKind::UserTransaction(_) + | ConsensusTransactionKind::CertifiedTransaction(_) + // deprecated and ignore later, but added for exhaustive match + | ConsensusTransactionKind::CapabilityNotification(_) + // DONE(commit-handler-rewrite): [ssm] ignore capability notifications if !should_accept_consensus_certs() + | ConsensusTransactionKind::CapabilityNotificationV2(_) + // DONE(commit-handler-rewrite): [ssm] ignore end of publish messages if !should_accept_consensus_certs() + | ConsensusTransactionKind::EndOfPublish(_) + // DONE(commit-handler-rewrite): [ssm] ignore execution time observations if !should_accept_consensus_certs() + // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed() + | ConsensusTransactionKind::ExecutionTimeObservation(_) + // DONE(commit-handler-rewrite): [ssm] ignore jwk votes if !should_accept_consensus_certs() + | ConsensusTransactionKind::NewJWKFetched(_, _, _) => { + debug!( + "Ignoring consensus transaction {:?} because of end of epoch", + parsed.transaction.key() + ); + continue; + } + + // These are the message types that are still processed even if !should_accept_consensus_certs() + ConsensusTransactionKind::CheckpointSignature(_) + | ConsensusTransactionKind::CheckpointSignatureV2(_) + | ConsensusTransactionKind::RandomnessStateUpdate(_, _) + | ConsensusTransactionKind::RandomnessDkgMessage(_, _) + | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => () + } + } + + if !initial_reconfig_state.should_accept_tx() { + match &parsed.transaction.kind { + // DONE(commit-handler-rewrite): [ssm] ignore dkg confirmation if !should_accept_tx() + ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) + // DONE(commit-handler-rewrite): [ssm] ignore dkg message if !should_accept_tx() + | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue, + _ => {}, + } + } + + // DONE(commit-handler-rewrite): ignore mfp user transaction if mfp is disabled + if parsed.transaction.is_mfp_transaction() + && !self.epoch_store.protocol_config().mysticeti_fastpath() + { + debug!( + "Ignoring MFP transaction {:?} because MFP is disabled", + parsed.transaction.key() + ); + continue; + } + + // DONE(commit-handler-rewrite): ignore certs from wrong epoch + if let ConsensusTransactionKind::CertifiedTransaction(certificate) = + &parsed.transaction.kind + { + if certificate.epoch() != epoch { + debug!( + "Certificate epoch ({:?}) doesn't match the current epoch ({:?})", + certificate.epoch(), + epoch + ); + continue; + } + } + + // Handle deprecated messages + match &parsed.transaction.kind { + // DONE(commit-handler-rewrite): [ssm] can ignore capabilities v1 in rewrite + ConsensusTransactionKind::CapabilityNotification(_) + | ConsensusTransactionKind::RandomnessStateUpdate(_, _) + | ConsensusTransactionKind::CheckpointSignature(_) => { + debug_fatal!( + "BUG: saw deprecated tx {:?}for commit round {}", + parsed.transaction.key(), + commit_info.round + ); + continue; + } + _ => {} + } + + // DONE(commit-handler-rewrite): ignore transactions sent by validators that have already sent EOP + if matches!( + &parsed.transaction.kind, + ConsensusTransactionKind::UserTransaction(_) + | ConsensusTransactionKind::CertifiedTransaction(_) + ) { + let author_name = self + .epoch_store + .committee() + .authority_by_index(author as u32) + .unwrap(); + if self + .epoch_store + .has_received_end_of_publish_from(author_name) + { + // In some edge cases, consensus might resend previously seen certificate after EndOfPublish + // An honest validator should not send a new transaction after EndOfPublish. Whether the + // transaction is duplicate or not, we filter it out here. + warn!( + "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus", + author_name.concise(), + parsed.transaction.key(), + ); + continue; + } + } + + let transaction = SequencedConsensusTransactionKind::External(parsed.transaction); + transactions.push((transaction, author as u32)); + } + } + + // TODO(commit-handler-rewrite): update per validator metrics + // DONE(commit-handler-rewrite): update per validator metrics + // TODO and DONE are in same place because this code is shared between old and new commit handler + for (i, authority) in self.committee.authorities() { + let hostname = &authority.hostname; + self.metrics + .consensus_committed_messages + .with_label_values(&[hostname]) + .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64); + self.metrics + .consensus_committed_user_transactions + .with_label_values(&[hostname]) + .set( + self.last_consensus_stats + .stats + .get_num_user_transactions(i.value()) as i64, + ); + self.metrics + .consensus_finalized_user_transactions + .with_label_values(&[hostname]) + .set(num_finalized_user_transactions[i.value()] as i64); + self.metrics + .consensus_rejected_user_transactions + .with_label_values(&[hostname]) + .set(num_rejected_user_transactions[i.value()] as i64); + } + + transactions + } + + fn deduplicate_consensus_txns( + &mut self, + state: &mut CommitHandlerState, + commit_info: &ConsensusCommitInfo, + transactions: Vec<(SequencedConsensusTransactionKind, u32)>, + ) -> Vec { + // We need a set here as well, since the processed_cache is a LRU cache and can drop + // entries while we're iterating over the sequenced transactions. + let mut processed_set = HashSet::new(); + + let mut all_transactions = Vec::new(); + + // All of these TODOs are handled here in the new code, whereas in the old code, they were + // each handled separately. The key thing to see is that all messages are marked as processed + // here, except for ones that are filtered out earlier (e.g. due to !should_accept_consensus_certs()). + + // DONE(commit-handler-rewrite): record_consensus_message_processed() must be called for deferred txns + // DONE(commit-handler-rewrite): cancelled txns must be recorded as processed + // DONE(commit-handler-rewrite): consensus messages must be recorded as processed + // DONE(commit-handler-rewrite): randomness messages must be recorded as processed + for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() { + // TODO(consensus-handler-rewrite): the seq + 1 is probably not necessary, because we do not create a + // SequencedConsensusTransaction for commit prologue any more. + // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit + // prologue transaction, which will be the first transaction in this consensus commit batch. + // Therefore, the transaction sequence number starts from 1 here. + let current_tx_index = ExecutionIndices { + last_committed_round: commit_info.round, + sub_dag_index: commit_info.sub_dag_index, + transaction_index: (seq + 1) as u64, + }; + + self.last_consensus_stats.index = current_tx_index; + + let certificate_author = *self + .epoch_store + .committee() + .authority_by_index(cert_origin) + .unwrap(); + + let sequenced_transaction = SequencedConsensusTransaction { + certificate_author_index: cert_origin, + certificate_author, + consensus_index: current_tx_index, + transaction, + }; + + let Some(verified_transaction) = self + .epoch_store + .verify_consensus_transaction(sequenced_transaction) + else { + continue; + }; + + let key = verified_transaction.0.key(); + let in_set = !processed_set.insert(key.clone()); + let in_cache = self.processed_cache.put(key.clone(), ()).is_some(); + + if in_set || in_cache { + self.metrics.skipped_consensus_txns_cache_hit.inc(); + continue; + } + if self + .epoch_store + .is_consensus_message_processed(&key) + .expect("db error") + { + self.metrics.skipped_consensus_txns.inc(); + continue; + } + + state.output.record_consensus_message_processed(key); + + all_transactions.push(verified_transaction); + } + + all_transactions + } + + fn build_commit_handler_input( + &self, + transactions: Vec, + ) -> CommitHandlerInput { + let epoch = self.epoch_store.epoch(); + let mut commit_handler_input = CommitHandlerInput::default(); + + for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() { + match transaction.transaction { + SequencedConsensusTransactionKind::External(consensus_transaction) => { + match consensus_transaction.kind { + // === User transactions === + ConsensusTransactionKind::CertifiedTransaction(cert) => { + // Safe because signatures are verified when consensus called into SuiTxValidator::validate_batch. + let cert = VerifiedCertificate::new_unchecked(*cert); + let transaction = + VerifiedExecutableTransaction::new_from_certificate(cert); + commit_handler_input.user_transactions.push(transaction); + } + ConsensusTransactionKind::UserTransaction(tx) => { + // Safe because transactions are certified by consensus. + let tx = VerifiedTransaction::new_unchecked(*tx); + // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here. + let transaction = + VerifiedExecutableTransaction::new_from_consensus(tx, epoch); + commit_handler_input.user_transactions.push(transaction); + } + + // === State machines === + ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => { + commit_handler_input + .end_of_publish_transactions + .push(authority_public_key_bytes); + } + ConsensusTransactionKind::NewJWKFetched( + authority_public_key_bytes, + jwk_id, + jwk, + ) => { + commit_handler_input.new_jwks.push(( + authority_public_key_bytes, + jwk_id, + jwk, + )); + } + ConsensusTransactionKind::RandomnessDkgMessage( + authority_public_key_bytes, + items, + ) => { + commit_handler_input + .randomness_dkg_messages + .push((authority_public_key_bytes, items)); + } + ConsensusTransactionKind::RandomnessDkgConfirmation( + authority_public_key_bytes, + items, + ) => { + commit_handler_input + .randomness_dkg_confirmations + .push((authority_public_key_bytes, items)); + } + ConsensusTransactionKind::CapabilityNotificationV2( + authority_capabilities_v2, + ) => { + commit_handler_input + .capability_notifications + .push(authority_capabilities_v2); + } + ConsensusTransactionKind::ExecutionTimeObservation( + execution_time_observation, + ) => { + commit_handler_input + .execution_time_observations + .push(execution_time_observation); + } + ConsensusTransactionKind::CheckpointSignatureV2( + checkpoint_signature_message, + ) => { + commit_handler_input + .checkpoint_signature_messages + .push(*checkpoint_signature_message); + } + + // Deprecated messages, filtered earlier by filter_consensus_txns() + ConsensusTransactionKind::CheckpointSignature(_) + | ConsensusTransactionKind::RandomnessStateUpdate(_, _) + | ConsensusTransactionKind::CapabilityNotification(_) => { + unreachable!("filtered earlier") + } + } + } + // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream. + SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(), + } + } + + commit_handler_input + } + async fn send_end_of_publish_if_needed(&self) { if !self.epoch_store.should_send_end_of_publish() { return; @@ -1124,31 +2899,26 @@ impl MysticetiConsensusHandler { } } -impl ConsensusHandler { - fn authenticator_state_update_transaction( - &self, - round: u64, - mut new_active_jwks: Vec, - ) -> VerifiedExecutableTransaction { - new_active_jwks.sort(); - - info!("creating authenticator state update transaction"); - assert!(self.epoch_store.authenticator_state_enabled()); - let transaction = VerifiedTransaction::new_authenticator_state_update( - self.epoch(), - round, - new_active_jwks, - self.epoch_store - .epoch_start_config() - .authenticator_obj_initial_shared_version() - .expect("authenticator state obj must exist"), - ); - VerifiedExecutableTransaction::new_system(transaction, self.epoch()) - } - - fn epoch(&self) -> EpochId { - self.epoch_store.epoch() - } +fn authenticator_state_update_transaction( + epoch_store: &AuthorityPerEpochStore, + round: u64, + mut new_active_jwks: Vec, +) -> VerifiedExecutableTransaction { + let epoch = epoch_store.epoch(); + new_active_jwks.sort(); + + info!("creating authenticator state update transaction"); + assert!(epoch_store.authenticator_state_enabled()); + let transaction = VerifiedTransaction::new_authenticator_state_update( + epoch, + round, + new_active_jwks, + epoch_store + .epoch_start_config() + .authenticator_obj_initial_shared_version() + .expect("authenticator state obj must exist"), + ); + VerifiedExecutableTransaction::new_system(transaction, epoch) } pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str { @@ -1271,7 +3041,7 @@ impl SequencedConsensusTransactionKind { pub fn is_executable_transaction(&self) -> bool { match self { - SequencedConsensusTransactionKind::External(ext) => ext.is_executable_transaction(), + SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(), SequencedConsensusTransactionKind::System(_) => true, } } @@ -1835,16 +3605,6 @@ mod tests { // THEN check for no inflight or suspended transactions. state.execution_scheduler().check_empty_for_testing(); - - // WHEN processing the same output multiple times - // THEN the consensus stats do not update - for _ in 0..2 { - consensus_handler - .handle_consensus_commit(committed_sub_dag.clone()) - .await; - let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone(); - assert_eq!(last_consensus_stats_1, last_consensus_stats_2); - } } #[tokio::test] @@ -2072,7 +3832,7 @@ mod tests { } #[tokio::test(flavor = "current_thread")] - async fn test_checkpoint_signature_dedup_v1_vs_v2() { + async fn test_checkpoint_signature_dedup() { telemetry_subscribers::init_for_testing(); let network_config = @@ -2105,20 +3865,6 @@ mod tests { SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name) }; - // Prepare V1 pair: same (authority, seq), different digests => same key - let v1_s1 = make_signed(); - let v1_s2 = make_signed(); - // Validate assumption: digests differ - assert_ne!(v1_s1.data().digest(), v1_s2.data().digest()); - let v1_a = - ConsensusTransaction::new_checkpoint_signature_message(CheckpointSignatureMessage { - summary: v1_s1, - }); - let v1_b = - ConsensusTransaction::new_checkpoint_signature_message(CheckpointSignatureMessage { - summary: v1_s2, - }); - // Prepare V2 pair: same (authority, seq), different digests => different keys let v2_s1 = make_signed(); let v2_s1_clone = v2_s1.clone(); @@ -2147,13 +3893,7 @@ mod tests { let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap()); let block = VerifiedBlock::new_for_test( TestBlock::new(100, 0) - .set_transactions(vec![ - to_tx(&v1_a), - to_tx(&v1_b), - to_tx(&v2_a), - to_tx(&v2_b), - to_tx(&v2_dup), - ]) + .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)]) .build(), ); let commit = CommittedSubDag::new( @@ -2187,10 +3927,6 @@ mod tests { use crate::consensus_handler::SequencedConsensusTransactionKey as SK; use sui_types::messages_consensus::ConsensusTransactionKey as CK; - // V1 collapses digest: both map to the same key and are processed once. - let v1_key = SK::External(CK::CheckpointSignature(state.name, 42)); - assert!(epoch_store.is_consensus_message_processed(&v1_key).unwrap()); - // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process. let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a)); let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b)); @@ -2202,6 +3938,150 @@ mod tests { .unwrap()); } + #[tokio::test(flavor = "current_thread")] + async fn test_verify_consensus_transaction_filters_mismatched_authorities() { + telemetry_subscribers::init_for_testing(); + + let network_config = + sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build(); + let state = TestAuthorityBuilder::new() + .with_network_config(&network_config, 0) + .build() + .await; + + let epoch_store = state.epoch_store_for_testing().clone(); + let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee(); + + // Create a different authority than our test authority + use fastcrypto::traits::KeyPair; + let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair(); + let wrong_authority: AuthorityName = wrong_keypair.public().into(); + + // Create EndOfPublish transaction with mismatched authority + let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority); + + // Create valid EndOfPublish transaction with correct authority + let valid_eop = ConsensusTransaction::new_end_of_publish(state.name); + + // Create CheckpointSignature with mismatched authority + let epoch = epoch_store.epoch(); + let contents = + CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]); + let summary = CheckpointSummary::new( + &ProtocolConfig::get_for_max_version_UNSAFE(), + epoch, + 42, // sequence number + 10, // network_total_transactions + &contents, + None, // previous_digest + GasCostSummary::default(), + None, // end_of_epoch_data + 0, // timestamp + Vec::new(), // randomness_rounds + Vec::new(), // checkpoint commitments + ); + + // Create a signed checkpoint with the wrong authority + let mismatched_checkpoint_signed = + SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority); + let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest(); + let mismatched_checkpoint = + ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage { + summary: mismatched_checkpoint_signed, + }); + + // Create a valid checkpoint signature with correct authority + let valid_checkpoint_signed = + SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name); + let valid_checkpoint_digest = valid_checkpoint_signed.data().digest(); + let valid_checkpoint = + ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage { + summary: valid_checkpoint_signed, + }); + + let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap()); + + // Create a block with both valid and invalid transactions + let block = VerifiedBlock::new_for_test( + TestBlock::new(100, 0) + .set_transactions(vec![ + to_tx(&mismatched_eop), + to_tx(&valid_eop), + to_tx(&mismatched_checkpoint), + to_tx(&valid_checkpoint), + ]) + .build(), + ); + let commit = CommittedSubDag::new( + block.reference(), + vec![block.clone()], + block.timestamp_ms(), + CommitRef::new(10, CommitDigest::MIN), + ); + + let metrics = Arc::new(AuthorityMetrics::new(&Registry::new())); + let throughput = ConsensusThroughputCalculator::new(None, metrics.clone()); + let backpressure = BackpressureManager::new_for_tests(); + let consensus_adapter = + make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]); + let mut handler = ConsensusHandler::new( + epoch_store.clone(), + Arc::new(CheckpointServiceNoop {}), + state.execution_scheduler().clone(), + consensus_adapter, + state.get_object_cache_reader().clone(), + Arc::new(ArcSwap::default()), + consensus_committee.clone(), + metrics, + Arc::new(throughput), + backpressure.subscribe(), + state.traffic_controller.clone(), + ); + + handler.handle_consensus_commit(commit).await; + + use crate::consensus_handler::SequencedConsensusTransactionKey as SK; + use sui_types::messages_consensus::ConsensusTransactionKey as CK; + + // Check that valid transactions were processed + let valid_eop_key = SK::External(CK::EndOfPublish(state.name)); + assert!( + epoch_store + .is_consensus_message_processed(&valid_eop_key) + .unwrap(), + "Valid EndOfPublish should have been processed" + ); + + let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2( + state.name, + 42, + valid_checkpoint_digest, + )); + assert!( + epoch_store + .is_consensus_message_processed(&valid_checkpoint_key) + .unwrap(), + "Valid CheckpointSignature should have been processed" + ); + + // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction) + let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority)); + assert!( + !epoch_store.is_consensus_message_processed(&mismatched_eop_key).unwrap(), + "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)" + ); + + let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2( + wrong_authority, + 42, + mismatched_checkpoint_digest, + )); + assert!( + !epoch_store.is_consensus_message_processed(&mismatched_checkpoint_key).unwrap(), + "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)" + ); + } + fn extract(v: Vec) -> Vec { v.into_iter().map(extract_one).collect() } diff --git a/crates/sui-core/src/epoch/reconfiguration.rs b/crates/sui-core/src/epoch/reconfiguration.rs index 0b492ebe1d6a2..7a5734013c82c 100644 --- a/crates/sui-core/src/epoch/reconfiguration.rs +++ b/crates/sui-core/src/epoch/reconfiguration.rs @@ -4,6 +4,7 @@ use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tracing::info; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ReconfigCertStatus { @@ -46,7 +47,10 @@ impl ReconfigState { } pub fn close_all_certs(&mut self) { - self.status = ReconfigCertStatus::RejectAllCerts; + if !matches!(self.status, ReconfigCertStatus::RejectAllTx) { + info!("closing all certs"); + self.status = ReconfigCertStatus::RejectAllCerts; + } } pub fn should_accept_user_certs(&self) -> bool { @@ -71,6 +75,10 @@ impl ReconfigState { pub fn should_accept_tx(&self) -> bool { !matches!(self.status, ReconfigCertStatus::RejectAllTx) } + + pub fn is_reject_all_tx(&self) -> bool { + matches!(self.status, ReconfigCertStatus::RejectAllTx) + } } pub trait ReconfigurationInitiator { diff --git a/crates/sui-core/src/execution_cache/cache_types.rs b/crates/sui-core/src/execution_cache/cache_types.rs index 77e84a650bdf3..27de439610602 100644 --- a/crates/sui-core/src/execution_cache/cache_types.rs +++ b/crates/sui-core/src/execution_cache/cache_types.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use std::{cmp::Ordering, hash::DefaultHasher}; use moka::sync::SegmentedCache as MokaCache; -use mysten_common::debug_fatal; +use mysten_common::{debug_fatal, fatal}; use parking_lot::Mutex; use sui_types::base_types::SequenceNumber; @@ -50,12 +50,13 @@ impl CachedVersionMap { pub fn insert(&mut self, version: SequenceNumber, value: V) { if !self.values.is_empty() { let back = self.values.back().unwrap().0; - assert!( - back < version, - "version must be monotonically increasing ({} < {})", - back, - version - ); + if back >= version { + fatal!( + "version must be monotonically increasing ({} < {})", + back, + version + ); + } } self.values.push_back((version, value)); } diff --git a/crates/sui-core/src/post_consensus_tx_reorder.rs b/crates/sui-core/src/post_consensus_tx_reorder.rs index 54019346e1a06..191be12a5f339 100644 --- a/crates/sui-core/src/post_consensus_tx_reorder.rs +++ b/crates/sui-core/src/post_consensus_tx_reorder.rs @@ -7,6 +7,7 @@ use crate::consensus_handler::{ use mysten_metrics::monitored_scope; use sui_protocol_config::ConsensusTransactionOrdering; use sui_types::{ + executable_transaction::VerifiedExecutableTransaction, messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}, transaction::TransactionDataAPI as _, }; @@ -48,4 +49,23 @@ impl PostConsensusTxReorder { }) }) } + + // TODO: Remove the above versions and rename these without _v2 + pub fn reorder_v2( + transactions: &mut [VerifiedExecutableTransaction], + kind: ConsensusTransactionOrdering, + ) { + match kind { + ConsensusTransactionOrdering::ByGasPrice => Self::order_by_gas_price_v2(transactions), + ConsensusTransactionOrdering::None => (), + } + } + + fn order_by_gas_price_v2(transactions: &mut [VerifiedExecutableTransaction]) { + let _scope = monitored_scope("ConsensusCommitHandler::order_by_gas_price"); + transactions.sort_by_key(|tx| { + // Reverse order, so that transactions with higher gas price are put to the beginning. + std::cmp::Reverse(tx.transaction_data().gas_price()) + }); + } } diff --git a/crates/sui-e2e-tests/tests/coin_registry_tests.rs b/crates/sui-e2e-tests/tests/coin_registry_tests.rs index f677fc0e576e3..866a94dd537fb 100644 --- a/crates/sui-e2e-tests/tests/coin_registry_tests.rs +++ b/crates/sui-e2e-tests/tests/coin_registry_tests.rs @@ -9,6 +9,19 @@ use sui_macros::sim_test; #[sim_test] async fn test_create_coin_registry_object() { + let _guard = + sui_protocol_config::ProtocolConfig::apply_overrides_for_testing(|_, mut config| { + // The new consensus handler requires these flags, and they are irrelevant to the test + config.set_ignore_execution_time_observations_after_certs_closed_for_testing(true); + config.set_record_time_estimate_processed_for_testing(true); + config.set_prepend_prologue_tx_in_consensus_commit_in_checkpoints_for_testing(true); + config.set_consensus_checkpoint_signature_key_includes_digest_for_testing(true); + config.set_cancel_for_failed_dkg_early_for_testing(true); + config.set_use_mfp_txns_in_load_initial_object_debts_for_testing(true); + config.set_authority_capabilities_v2_for_testing(true); + config + }); + let framework = sui_framework_snapshot::load_bytecode_snapshot(95) .unwrap() .into_iter() diff --git a/crates/sui-e2e-tests/tests/randomness_tests.rs b/crates/sui-e2e-tests/tests/randomness_tests.rs deleted file mode 100644 index 26aa7e0627a42..0000000000000 --- a/crates/sui-e2e-tests/tests/randomness_tests.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2021, Facebook, Inc. and its affiliates -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use sui_types::supported_protocol_versions::SupportedProtocolVersions; -use sui_types::SUI_RANDOMNESS_STATE_OBJECT_ID; -use test_cluster::TestClusterBuilder; - -use sui_macros::sim_test; - -#[sim_test] -async fn test_create_randomness_state_object() { - #[cfg(msim)] - { - use sui_core::authority::framework_injection; - let framework = sui_framework_snapshot::load_bytecode_snapshot(54).unwrap(); - framework_injection::set_system_packages(framework); - } - - let test_cluster = TestClusterBuilder::new() - .with_protocol_version(31.into()) - .with_epoch_duration_ms(10000) - .with_supported_protocol_versions(SupportedProtocolVersions::new_for_testing(31, 54)) - .build() - .await; - - let handles = test_cluster.all_node_handles(); - - // no node has the randomness state object yet - for h in &handles { - h.with(|node| { - assert!(node - .state() - .get_object_cache_reader() - .get_latest_object_ref_or_tombstone(SUI_RANDOMNESS_STATE_OBJECT_ID) - .is_none()); - }); - } - - // wait until feature is enabled - test_cluster.wait_for_protocol_version(32.into()).await; - // wait until next epoch - randomness state object is created at the end of the first epoch - // in which it is supported. - test_cluster.wait_for_epoch_all_nodes(2).await; // protocol upgrade completes in epoch 1 - - for h in &handles { - h.with(|node| { - node.state() - .get_object_cache_reader() - .get_latest_object_ref_or_tombstone(SUI_RANDOMNESS_STATE_OBJECT_ID) - .expect("randomness state object should exist"); - }); - } -} diff --git a/crates/sui-e2e-tests/tests/reconfiguration_tests.rs b/crates/sui-e2e-tests/tests/reconfiguration_tests.rs index b0d77c5ab4621..8d02bbf0500df 100644 --- a/crates/sui-e2e-tests/tests/reconfiguration_tests.rs +++ b/crates/sui-e2e-tests/tests/reconfiguration_tests.rs @@ -664,6 +664,19 @@ async fn test_reconfig_with_committee_change_basic() { #[sim_test] async fn test_protocol_upgrade_to_sip_39_enabled_version() { + let _guard = + sui_protocol_config::ProtocolConfig::apply_overrides_for_testing(|_, mut config| { + // The new consensus handler requires these flags, and they are irrelevant to the test + config.set_ignore_execution_time_observations_after_certs_closed_for_testing(true); + config.set_record_time_estimate_processed_for_testing(true); + config.set_prepend_prologue_tx_in_consensus_commit_in_checkpoints_for_testing(true); + config.set_consensus_checkpoint_signature_key_includes_digest_for_testing(true); + config.set_cancel_for_failed_dkg_early_for_testing(true); + config.set_use_mfp_txns_in_load_initial_object_debts_for_testing(true); + config.set_authority_capabilities_v2_for_testing(true); + config + }); + let initial_num_validators = 10; let new_validator = ValidatorGenesisConfigBuilder::new().build(&mut OsRng); diff --git a/crates/sui-e2e-tests/tests/zklogin_tests.rs b/crates/sui-e2e-tests/tests/zklogin_tests.rs index c11d0c6d55f47..1c713df9d6ca6 100644 --- a/crates/sui-e2e-tests/tests/zklogin_tests.rs +++ b/crates/sui-e2e-tests/tests/zklogin_tests.rs @@ -14,14 +14,12 @@ use sui_types::committee::EpochId; use sui_types::crypto::Signature; use sui_types::error::{SuiError, SuiResult, UserInputError}; use sui_types::signature::GenericSignature; -use sui_types::supported_protocol_versions::SupportedProtocolVersions; use sui_types::transaction::Transaction; use sui_types::utils::load_test_vectors; use sui_types::utils::{ get_legacy_zklogin_user_address, get_zklogin_user_address, make_zklogin_tx, }; use sui_types::zk_login_authenticator::ZkLoginAuthenticator; -use sui_types::SUI_AUTHENTICATOR_STATE_OBJECT_ID; use test_cluster::TestCluster; use test_cluster::TestClusterBuilder; @@ -217,75 +215,6 @@ async fn test_expired_zklogin_sig() { .contains("ZKLogin expired at epoch 2")); } -#[sim_test] -async fn test_auth_state_creation() { - #[cfg(msim)] - { - use sui_core::authority::framework_injection; - let framework = sui_framework_snapshot::load_bytecode_snapshot(25).unwrap(); - framework_injection::set_system_packages(framework); - } - - // Create test cluster without auth state object in genesis - let test_cluster = TestClusterBuilder::new() - .with_protocol_version(23.into()) - .with_epoch_duration_ms(15000) - .with_supported_protocol_versions(SupportedProtocolVersions::new_for_testing(23, 25)) - .with_default_jwks() - .build() - .await; - // Wait until we are in an epoch that has zklogin enabled, but the auth state object is not - // created yet. - test_cluster.wait_for_protocol_version(24.into()).await; - // Now wait until the auth state object is created, ie. AuthenticatorStateUpdate transaction happened. - test_cluster.wait_for_authenticator_state_update().await; -} - -#[sim_test] -async fn test_create_authenticator_state_object() { - #[cfg(msim)] - { - use sui_core::authority::framework_injection; - let framework = sui_framework_snapshot::load_bytecode_snapshot(25).unwrap(); - framework_injection::set_system_packages(framework); - } - - let test_cluster = TestClusterBuilder::new() - .with_protocol_version(23.into()) - .with_epoch_duration_ms(15000) - .with_supported_protocol_versions(SupportedProtocolVersions::new_for_testing(23, 25)) - .build() - .await; - - let handles = test_cluster.all_node_handles(); - - // no node has the authenticator state object yet - for h in &handles { - h.with(|node| { - assert!(node - .state() - .get_object_cache_reader() - .get_latest_object_ref_or_tombstone(SUI_AUTHENTICATOR_STATE_OBJECT_ID) - .is_none()); - }); - } - - // wait until feature is enabled - test_cluster.wait_for_protocol_version(24.into()).await; - // wait until next epoch - authenticator state object is created at the end of the first epoch - // in which it is supported. - test_cluster.wait_for_epoch_all_nodes(2).await; // protocol upgrade completes in epoch 1 - - for h in &handles { - h.with(|node| { - node.state() - .get_object_cache_reader() - .get_latest_object_ref_or_tombstone(SUI_AUTHENTICATOR_STATE_OBJECT_ID) - .expect("auth state object should exist"); - }); - } -} - // This test is intended to look for forks caused by conflicting / repeated JWK votes from // validators. #[cfg(msim)] diff --git a/crates/sui-open-rpc/spec/openrpc.json b/crates/sui-open-rpc/spec/openrpc.json index 1bcb569586451..476575eb1e899 100644 --- a/crates/sui-open-rpc/spec/openrpc.json +++ b/crates/sui-open-rpc/spec/openrpc.json @@ -1400,6 +1400,7 @@ "uncompressed_g1_group_elements": false, "upgraded_multisig_supported": false, "use_mfp_txns_in_load_initial_object_debts": false, + "use_new_commit_handler": false, "validate_identifier_inputs": false, "variant_nodes": false, "verify_legacy_zklogin_address": false, diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index bccd73880b31b..07a483ffdf64c 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -814,6 +814,10 @@ struct FeatureFlags { // If true charge for loads into the cache (i.e., fetches from storage) in the object runtime. #[serde(skip_serializing_if = "is_false")] object_runtime_charge_cache_load_gas: bool, + + // If true, use the new commit handler. + #[serde(skip_serializing_if = "is_false")] + use_new_commit_handler: bool, } fn is_false(b: &bool) -> bool { @@ -2247,6 +2251,10 @@ impl ProtocolConfig { pub fn object_runtime_charge_cache_load_gas(&self) -> bool { self.feature_flags.object_runtime_charge_cache_load_gas } + + pub fn use_new_commit_handler(&self) -> bool { + self.feature_flags.use_new_commit_handler + } } #[cfg(not(msim))] @@ -4267,10 +4275,46 @@ impl ProtocolConfig { }); } + pub fn set_prepend_prologue_tx_in_consensus_commit_in_checkpoints_for_testing( + &mut self, + val: bool, + ) { + self.feature_flags + .prepend_prologue_tx_in_consensus_commit_in_checkpoints = val; + } + pub fn enable_accumulators_for_testing(&mut self) { self.feature_flags.enable_accumulators = true; self.feature_flags.allow_private_accumulator_entrypoints = true; } + + pub fn set_ignore_execution_time_observations_after_certs_closed_for_testing( + &mut self, + val: bool, + ) { + self.feature_flags + .ignore_execution_time_observations_after_certs_closed = val; + } + + pub fn set_consensus_checkpoint_signature_key_includes_digest_for_testing( + &mut self, + val: bool, + ) { + self.feature_flags + .consensus_checkpoint_signature_key_includes_digest = val; + } + + pub fn set_cancel_for_failed_dkg_early_for_testing(&mut self, val: bool) { + self.feature_flags.cancel_for_failed_dkg_early = val; + } + + pub fn set_use_mfp_txns_in_load_initial_object_debts_for_testing(&mut self, val: bool) { + self.feature_flags.use_mfp_txns_in_load_initial_object_debts = val; + } + + pub fn set_authority_capabilities_v2_for_testing(&mut self, val: bool) { + self.feature_flags.authority_capabilities_v2 = val; + } } type OverrideFn = dyn Fn(ProtocolVersion, ProtocolConfig) -> ProtocolConfig + Send; diff --git a/crates/sui-types/src/messages_consensus.rs b/crates/sui-types/src/messages_consensus.rs index 2079544e856d2..222c55265a099 100644 --- a/crates/sui-types/src/messages_consensus.rs +++ b/crates/sui-types/src/messages_consensus.rs @@ -433,19 +433,7 @@ pub enum ConsensusTransactionKind { CheckpointSignatureV2(Box), } -impl ConsensusTransactionKind { - pub fn is_dkg(&self) -> bool { - matches!( - self, - ConsensusTransactionKind::RandomnessDkgMessage(_, _) - | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) - ) - } - - pub fn is_user_transaction(&self) -> bool { - matches!(self, ConsensusTransactionKind::UserTransaction(_)) - } -} +impl ConsensusTransactionKind {} #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] #[allow(clippy::large_enum_variant)] @@ -740,15 +728,26 @@ impl ConsensusTransaction { } } - pub fn is_executable_transaction(&self) -> bool { - matches!(self.kind, ConsensusTransactionKind::CertifiedTransaction(_)) - || matches!(self.kind, ConsensusTransactionKind::UserTransaction(_)) + pub fn is_dkg(&self) -> bool { + matches!( + self.kind, + ConsensusTransactionKind::RandomnessDkgMessage(_, _) + | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) + ) } - pub fn is_user_transaction(&self) -> bool { + pub fn is_mfp_transaction(&self) -> bool { matches!(self.kind, ConsensusTransactionKind::UserTransaction(_)) } + pub fn is_user_transaction(&self) -> bool { + matches!( + self.kind, + ConsensusTransactionKind::UserTransaction(_) + | ConsensusTransactionKind::CertifiedTransaction(_) + ) + } + pub fn is_end_of_publish(&self) -> bool { matches!(self.kind, ConsensusTransactionKind::EndOfPublish(_)) } diff --git a/crates/sui-types/src/transaction.rs b/crates/sui-types/src/transaction.rs index 903ad0e10763b..441138778302e 100644 --- a/crates/sui-types/src/transaction.rs +++ b/crates/sui-types/src/transaction.rs @@ -2143,8 +2143,8 @@ impl TransactionData { } pub fn uses_randomness(&self) -> bool { - self.shared_input_objects() - .iter() + self.kind() + .shared_input_objects() .any(|obj| obj.id() == SUI_RANDOMNESS_STATE_OBJECT_ID) } From bf106f48a6d706e3e8b6e86ffc1a1664028e0330 Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Fri, 26 Sep 2025 14:34:22 -0700 Subject: [PATCH 2/2] [Cherrypick] Two fixes for commit handler v2 (#23757) - Fix bug in deferred transaction loading - Fix upgrade tests on antithesis by requiring the epoch flag --- .../src/authority/epoch_start_configuration.rs | 3 ++- crates/sui-core/src/consensus_handler.rs | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/crates/sui-core/src/authority/epoch_start_configuration.rs b/crates/sui-core/src/authority/epoch_start_configuration.rs index 8d70bf3cdf569..3605c5bb2d321 100644 --- a/crates/sui-core/src/authority/epoch_start_configuration.rs +++ b/crates/sui-core/src/authority/epoch_start_configuration.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use enum_dispatch::enum_dispatch; +use mysten_common::in_test_configuration; use serde::{Deserialize, Serialize}; use sui_config::NodeConfig; use sui_types::accumulator_root::get_accumulator_root_obj_initial_shared_version; @@ -101,7 +102,7 @@ impl EpochFlag { fn default_flags_impl() -> Vec { let mut flags = vec![EpochFlag::DataQuarantineFromBeginningOfEpoch]; - if std::env::var("SUI_USE_NEW_COMMIT_HANDLER").is_ok() { + if std::env::var("SUI_USE_NEW_COMMIT_HANDLER").is_ok() || in_test_configuration() { flags.push(EpochFlag::UseCommitHandlerV2); } diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 067c10f2a7952..4597aa49a5e40 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -1481,6 +1481,20 @@ impl ConsensusHandler { txns.reserve(user_transactions.len()); randomness_txns.reserve(user_transactions.len()); + // There may be randomness transactions in `txns`, which were deferred due to congestion. + // They must be placed back into `randomness_txns`. + let mut txns: Vec<_> = txns + .into_iter() + .filter_map(|tx| { + if tx.transaction_data().uses_randomness() { + randomness_txns.push(tx); + None + } else { + Some(tx) + } + }) + .collect(); + for txn in user_transactions { if txn.transaction_data().uses_randomness() { randomness_txns.push(txn); @@ -1899,6 +1913,10 @@ impl ConsensusHandler { let name = self.epoch_store.name; let authority_index = self.epoch_store.committee().authority_index(&name).unwrap(); authority_index < 2 + && self + .epoch_store + .epoch_start_config() + .use_commit_handler_v2() } else if self.epoch_store.protocol_config().use_new_commit_handler() { true } else {