Skip to content

Commit 9ca6bf7

Browse files
committed
upstream(core): Remove executed_in_epoch table (#8550)
# Description of change - Upstream range: [v1.44.3, v1.45.3) - Port commit: - MystenLabs/sui@efdbe99 - Description: The table is replaced with: - An in-memory "dirty set" which holds executed but un-checkpointed transaction digests. Transactions are removed from the dirty set by CheckpointExecutor. - An additional bounded cache intended to lessen the number of db reads by CheckpointBuilder - Last-resort reads go to the `executed_transactions_to_checkpoint` table. ## Links to any relevant issues Part of #6153 ## How the change has been tested - [x] Basic tests (linting, compilation, formatting, unit/integration tests) - [ ] Patch-specific tests (correctness, functionality coverage) - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have checked that new and existing unit tests pass locally with my changes
1 parent 2f31e14 commit 9ca6bf7

File tree

6 files changed

+167
-96
lines changed

6 files changed

+167
-96
lines changed

crates/iota-core/src/authority.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3225,9 +3225,7 @@ impl AuthorityState {
32253225
);
32263226

32273227
if cfg!(debug_assertions) {
3228-
cur_epoch_store
3229-
.check_all_executed_transactions_in_checkpoint()
3230-
.expect("failed to check all executed transactions in checkpoint");
3228+
cur_epoch_store.check_all_executed_transactions_in_checkpoint();
32313229
}
32323230

32333231
self.get_reconfig_api()

crates/iota-core/src/authority/authority_per_epoch_store.rs

Lines changed: 52 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use futures::{
2121
FutureExt,
2222
future::{Either, join_all, select},
2323
};
24-
use iota_common::sync::{notify_once::NotifyOnce, notify_read::NotifyRead};
24+
use iota_common::{
25+
fatal,
26+
sync::{notify_once::NotifyOnce, notify_read::NotifyRead},
27+
};
2528
use iota_config::node::ExpensiveSafetyCheckConfig;
2629
use iota_execution::{self, Executor};
2730
use iota_macros::{fail_point, fail_point_arg};
@@ -537,6 +540,8 @@ pub struct AuthorityEpochTables {
537540
transaction_cert_signatures: DBMap<TransactionDigest, AuthorityStrongQuorumSignInfo>,
538541

539542
/// Transactions that were executed in the current epoch.
543+
#[allow(dead_code)]
544+
#[deprecated]
540545
executed_in_epoch: DBMap<TransactionDigest, ()>,
541546

542547
#[allow(dead_code)]
@@ -824,15 +829,6 @@ impl AuthorityEpochTables {
824829
.safe_iter()
825830
.collect::<Result<_, _>>()?)
826831
}
827-
828-
fn get_all_user_signatures_for_checkpoints(
829-
&self,
830-
) -> IotaResult<HashMap<TransactionDigest, Vec<GenericSignature>>> {
831-
Ok(self
832-
.user_signatures_for_checkpoints
833-
.safe_iter()
834-
.collect::<Result<_, _>>()?)
835-
}
836832
}
837833

838834
pub(crate) const MUTEX_TABLE_SIZE: usize = 1024;
@@ -1303,18 +1299,15 @@ impl AuthorityPerEpochStore {
13031299
tx_digest: &TransactionDigest,
13041300
) -> IotaResult {
13051301
let tables = self.tables()?;
1306-
let mut batch = self.tables()?.executed_in_epoch.batch();
13071302

1308-
batch.insert_batch(&tables.executed_in_epoch, [(tx_digest, ())])?;
1309-
1310-
if !matches!(tx_key, TransactionKey::Digest(_)) {
1311-
batch.insert_batch(&tables.transaction_key_to_digest, [(tx_key, tx_digest)])?;
1312-
}
1313-
batch.write()?;
1303+
self.consensus_output_cache
1304+
.insert_executed_in_epoch(*tx_digest);
13141305

13151306
if !matches!(tx_key, TransactionKey::Digest(_)) {
1307+
tables.transaction_key_to_digest.insert(tx_key, tx_digest)?;
13161308
self.executed_digests_notify_read.notify(tx_key, tx_digest);
13171309
}
1310+
13181311
Ok(())
13191312
}
13201313

@@ -1331,9 +1324,10 @@ impl AuthorityPerEpochStore {
13311324
}
13321325

13331326
pub fn revert_executed_transaction(&self, tx_digest: &TransactionDigest) -> IotaResult {
1327+
self.consensus_output_cache
1328+
.remove_reverted_transaction(tx_digest);
13341329
let tables = self.tables()?;
13351330
let mut batch = tables.effects_signatures.batch();
1336-
batch.delete_batch(&tables.executed_in_epoch, [*tx_digest])?;
13371331
batch.delete_batch(&tables.effects_signatures, [*tx_digest])?;
13381332
batch.write()?;
13391333
Ok(())
@@ -1356,14 +1350,30 @@ impl AuthorityPerEpochStore {
13561350
Ok(())
13571351
}
13581352

1359-
pub fn transactions_executed_in_cur_epoch<'a>(
1353+
pub fn transactions_executed_in_cur_epoch(
13601354
&self,
1361-
digests: impl IntoIterator<Item = &'a TransactionDigest>,
1355+
digests: &[TransactionDigest],
13621356
) -> IotaResult<Vec<bool>> {
1363-
Ok(self
1364-
.tables()?
1365-
.executed_in_epoch
1366-
.multi_contains_keys(digests)?)
1357+
let tables = self.tables()?;
1358+
Ok(do_fallback_lookup(
1359+
digests,
1360+
|digest| {
1361+
if self
1362+
.consensus_output_cache
1363+
.executed_in_current_epoch(digest)
1364+
{
1365+
CacheResult::Hit(true)
1366+
} else {
1367+
CacheResult::Miss
1368+
}
1369+
},
1370+
|digests| {
1371+
tables
1372+
.executed_transactions_to_checkpoint
1373+
.multi_contains_keys(digests)
1374+
.expect("db error")
1375+
},
1376+
))
13671377
}
13681378

13691379
pub fn get_effects_signature(
@@ -1522,6 +1532,9 @@ impl AuthorityPerEpochStore {
15221532
quarantine.update_highest_executed_checkpoint(seq, self, &mut batch)?;
15231533
batch.write()?;
15241534

1535+
self.consensus_output_cache
1536+
.remove_executed_in_epoch(digests);
1537+
15251538
Ok(())
15261539
}
15271540

@@ -4348,32 +4361,23 @@ impl AuthorityPerEpochStore {
43484361
self.signature_verifier.clear_signature_cache();
43494362
}
43504363

4351-
pub(crate) fn check_all_executed_transactions_in_checkpoint(&self) -> IotaResult<()> {
4352-
let tables = self.tables().unwrap();
4353-
4354-
info!("Verifying that all executed transactions are in a checkpoint");
4355-
4356-
let mut executed_iter = tables.executed_in_epoch.safe_iter();
4357-
let mut checkpointed_iter = tables.executed_transactions_to_checkpoint.safe_iter();
4364+
pub(crate) fn check_all_executed_transactions_in_checkpoint(&self) {
4365+
let uncheckpointed_transactions = self
4366+
.consensus_output_cache
4367+
.get_uncheckpointed_transactions();
43584368

4359-
// verify that the two iterators (which are both sorted) are identical
4360-
loop {
4361-
let executed = executed_iter.next().transpose()?;
4362-
let checkpointed = checkpointed_iter.next().transpose()?;
4363-
match (executed, checkpointed) {
4364-
(Some((left, ())), Some((right, _))) => {
4365-
if left != right {
4366-
panic!(
4367-
"Executed transactions and checkpointed transactions do not match: {left:?} {right:?}"
4368-
);
4369-
}
4370-
}
4371-
(None, None) => break Ok(()),
4372-
(left, right) => panic!(
4373-
"Executed transactions and checkpointed transactions do not match: {left:?} {right:?}"
4374-
),
4375-
}
4369+
if uncheckpointed_transactions.is_empty() {
4370+
info!("Verified that all executed transactions are in a checkpoint");
4371+
return;
43764372
}
4373+
4374+
// TODO: should this be debug_fatal? Its potentially very serious in that it
4375+
// could indicate that we broke the checkpoint inclusion guarantee, but
4376+
// we won't be able to do anything about it if it happens.
4377+
fatal!(
4378+
"The following transactions were neither reverted nor checkpointed: {:?}",
4379+
uncheckpointed_transactions
4380+
);
43774381
}
43784382
}
43794383

crates/iota-core/src/authority/consensus_quarantine.rs

Lines changed: 72 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use iota_types::{
1717
messages_consensus::{TimestampMs, VersionedDkgConfirmation},
1818
signature::GenericSignature,
1919
};
20+
use moka::{policy::EvictionPolicy, sync::SegmentedCache as MokaCache};
2021
use parking_lot::Mutex;
22+
use rand::seq::SliceRandom;
2123
use tracing::{debug, info};
2224
use typed_store::{Map, rocks::DBBatch};
2325

@@ -316,6 +318,9 @@ pub(crate) struct ConsensusOutputCache {
316318
pub(super) user_signatures_for_checkpoints:
317319
Mutex<HashMap<TransactionDigest, Vec<GenericSignature>>>,
318320

321+
executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
322+
executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
323+
319324
metrics: Arc<EpochMetrics>,
320325
}
321326

@@ -333,28 +338,31 @@ impl ConsensusOutputCache {
333338
.get_all_deferred_transactions_v2()
334339
.expect("load deferred transactions v2 cannot fail");
335340

336-
if !epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch() {
337-
let shared_version_assignments = Self::get_all_shared_version_assignments(tables);
338-
339-
let user_signatures_for_checkpoints = tables
340-
.get_all_user_signatures_for_checkpoints()
341-
.expect("load user signatures for checkpoints cannot fail");
341+
assert!(
342+
epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(),
343+
"This version of iota-node can only run after data quarantining has been enabled. Please run the latest version to the end of the current epoch and retry"
344+
);
342345

343-
Self {
344-
shared_version_assignments: shared_version_assignments.into_iter().collect(),
345-
deferred_transactions: Mutex::new(deferred_transactions),
346-
deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
347-
user_signatures_for_checkpoints: Mutex::new(user_signatures_for_checkpoints),
348-
metrics,
349-
}
346+
let executed_in_epoch_cache_capacity = if cfg!(msim) {
347+
// Ensure that we test under conditions of constant, frequent,
348+
// and rare cache evictions.
349+
*[2, 100, 50000].choose(&mut rand::thread_rng()).unwrap()
350350
} else {
351-
Self {
352-
shared_version_assignments: Default::default(),
353-
deferred_transactions: Mutex::new(deferred_transactions),
354-
deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
355-
user_signatures_for_checkpoints: Default::default(),
356-
metrics,
357-
}
351+
50_000
352+
};
353+
354+
Self {
355+
shared_version_assignments: Default::default(),
356+
deferred_transactions: Mutex::new(deferred_transactions),
357+
deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
358+
user_signatures_for_checkpoints: Default::default(),
359+
executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
360+
executed_in_epoch_cache: MokaCache::builder(8)
361+
// most queries should be for recent transactions
362+
.max_capacity(executed_in_epoch_cache_capacity)
363+
.eviction_policy(EvictionPolicy::lru())
364+
.build(),
365+
metrics,
358366
}
359367
}
360368

@@ -414,17 +422,50 @@ impl ConsensusOutputCache {
414422
.sub(removed_count as i64);
415423
}
416424

417-
// Used to read pre-existing shared object versions from the database after a
418-
// crash. TODO: remove this once all nodes have upgraded to
419-
// data-quarantining.
420-
fn get_all_shared_version_assignments(
421-
tables: &AuthorityEpochTables,
422-
) -> Vec<(TransactionKey, Vec<(ObjectID, SequenceNumber)>)> {
423-
tables
424-
.assigned_shared_object_versions
425-
.safe_iter()
426-
.collect::<Result<Vec<_>, _>>()
427-
.expect("db error")
425+
pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
426+
self.executed_in_epoch
427+
.read()
428+
.contains_key(digest) ||
429+
// we use get instead of contains key to mark the entry as read
430+
self.executed_in_epoch_cache.get(digest).is_some()
431+
}
432+
433+
// Called by execution
434+
pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
435+
assert!(
436+
self.executed_in_epoch
437+
.read()
438+
.insert(tx_digest, ())
439+
.is_none(),
440+
"transaction already executed"
441+
);
442+
self.executed_in_epoch_cache.insert(tx_digest, ());
443+
}
444+
445+
// CheckpointExecutor calls this (indirectly) in order to prune the in-memory
446+
// cache of executed transactions. By the time this is called, the
447+
// transaction digests will have been committed to
448+
// the `executed_transactions_to_checkpoint` table.
449+
pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
450+
let executed_in_epoch = self.executed_in_epoch.read();
451+
for tx_digest in tx_digests {
452+
executed_in_epoch.remove(tx_digest);
453+
}
454+
}
455+
456+
pub fn remove_reverted_transaction(&self, tx_digest: &TransactionDigest) {
457+
// reverted transactions are not guaranteed to have been executed
458+
self.executed_in_epoch.read().remove(tx_digest);
459+
}
460+
461+
/// At reconfig time, all checkpointed transactions must have been removed
462+
/// from self.executed_in_epoch
463+
pub fn get_uncheckpointed_transactions(&self) -> Vec<TransactionDigest> {
464+
self.executed_in_epoch
465+
.write() // exclusive lock to ensure consistent view
466+
.iter()
467+
.map(|e| *e.key())
468+
.collect()
428469
}
429470
}
430471

crates/iota-core/src/authority/epoch_start_configuration.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub trait EpochStartConfigTrait {
5252
// is a collision in the value of some variant, the branch which has been
5353
// released should take precedence. In this case, the picked-from branch is
5454
// inconsistent with the released branch, and must be fixed.
55-
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
55+
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
5656
pub enum EpochFlag {
5757
// When switching between different cache types mid-epoch, partial checkpoint transactions
5858
// might already be on disk. During lock initialization, we check if there is any existing
@@ -63,21 +63,36 @@ pub enum EpochFlag {
6363
// This flag indicates whether data quarantining has been enabled from the
6464
// beginning of the epoch.
6565
DataQuarantineFromBeginningOfEpoch = 1,
66+
67+
// Used for `test_epoch_flag_upgrade`.
68+
#[cfg(msim)]
69+
DummyFlag = 2,
6670
}
6771

6872
impl EpochFlag {
6973
pub fn default_flags_for_new_epoch(config: &NodeConfig) -> Vec<Self> {
7074
Self::default_flags_impl(config.execution_cache)
7175
}
7276

77+
// Return flags that are mandatory for the current version of the code. This is
78+
// used so that `test_epoch_flag_upgrade` can still work correctly even when
79+
// there are no optional flags.
80+
pub fn mandatory_flags() -> Vec<Self> {
81+
vec![EpochFlag::DataQuarantineFromBeginningOfEpoch]
82+
}
83+
7384
/// For situations in which there is no config available (e.g. setting up a
7485
/// downloaded snapshot).
7586
pub fn default_for_no_config() -> Vec<Self> {
7687
Self::default_flags_impl(Default::default())
7788
}
7889

7990
fn default_flags_impl(cache_type: ExecutionCacheType) -> Vec<Self> {
80-
let mut new_flags = vec![EpochFlag::DataQuarantineFromBeginningOfEpoch];
91+
let mut new_flags = vec![
92+
EpochFlag::DataQuarantineFromBeginningOfEpoch,
93+
#[cfg(msim)]
94+
EpochFlag::DummyFlag,
95+
];
8196

8297
// Load cache type from env
8398
if matches!(cache_type.cache_type(), ExecutionCacheType::WritebackCache) {
@@ -97,6 +112,10 @@ impl fmt::Display for EpochFlag {
97112
EpochFlag::DataQuarantineFromBeginningOfEpoch => {
98113
write!(f, "DataQuarantineFromBeginningOfEpoch")
99114
}
115+
#[cfg(msim)]
116+
EpochFlag::DummyFlag => {
117+
write!(f, "DummyFlag")
118+
}
100119
}
101120
}
102121
}

crates/iota-core/src/checkpoints/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1823,7 +1823,7 @@ impl CheckpointBuilder {
18231823

18241824
let existing_effects = self
18251825
.epoch_store
1826-
.transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1826+
.transactions_executed_in_cur_epoch(effect.dependencies())?;
18271827

18281828
for (dependency, effects_signature_exists) in
18291829
effect.dependencies().iter().zip(existing_effects.iter())

0 commit comments

Comments
 (0)