Skip to content

Commit 51a04d8

Browse files
jkrvivianmuXxer
authored andcommitted
upstream(core): Introduce indexes pruner to authority store (#8536)
# Description of change - Upstream range: [v1.44.3, v1.45.3) - Port commit: - MystenLabs/sui@5bc3504 - Description: Make indexes prunable. ## Links to any relevant issues Part of #6153 ## How the change has been tested - [x] Basic tests (linting, compilation, formatting, unit/integration tests) - [ ] Patch-specific tests (correctness, functionality coverage) - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have checked that new and existing unit tests pass locally with my changes
1 parent 1e30dfb commit 51a04d8

File tree

5 files changed

+284
-64
lines changed

5 files changed

+284
-64
lines changed

crates/iota-config/src/node.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,8 @@ pub struct AuthorityStorePruningConfig {
987987
/// may result in some old versions that will never be pruned.
988988
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
989989
pub enable_compaction_filter: bool,
990+
#[serde(skip_serializing_if = "Option::is_none")]
991+
pub num_epochs_to_retain_for_indexes: Option<u64>,
990992
}
991993

992994
fn default_num_latest_epoch_dbs_to_retain() -> usize {
@@ -1026,6 +1028,7 @@ impl Default for AuthorityStorePruningConfig {
10261028
num_epochs_to_retain_for_checkpoints: if cfg!(msim) { Some(2) } else { None },
10271029
smooth: true,
10281030
enable_compaction_filter: cfg!(test) || cfg!(msim),
1031+
num_epochs_to_retain_for_indexes: None,
10291032
}
10301033
}
10311034
}

crates/iota-core/src/authority.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2827,6 +2827,7 @@ impl AuthorityState {
28272827
store.perpetual_tables.clone(),
28282828
checkpoint_store.clone(),
28292829
rest_index.clone(),
2830+
indexes.clone(),
28302831
config.authority_store_pruning_config.clone(),
28312832
epoch_store.committee().authority_exists(&name),
28322833
epoch_store.epoch_start_state().epoch_duration_ms(),

crates/iota-core/src/authority/authority_store_pruner.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use super::authority_store_tables::{AuthorityPerpetualTables, AuthorityPrunerTab
4141
use crate::{
4242
authority::authority_store_types::{StoreObject, StoreObjectWrapper},
4343
checkpoints::{CheckpointStore, CheckpointWatermark},
44+
jsonrpc_index::IndexStore,
4445
rest_index::RestIndexStore,
4546
};
4647

@@ -58,6 +59,7 @@ static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
5859
.collect()
5960
});
6061
pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
62+
pub const MIN_EPOCHS_TO_RETAIN_FOR_INDEXES: u64 = 7;
6163

6264
/// The `AuthorityStorePruner` manages the pruning process for object stores
6365
/// within the `AuthorityStore`. It includes a cancellation handle that can be
@@ -75,6 +77,7 @@ pub struct AuthorityStorePruningMetrics {
7577
pub num_pruned_objects: IntCounter,
7678
pub num_pruned_tombstones: IntCounter,
7779
pub last_pruned_effects_checkpoint: IntGauge,
80+
pub last_pruned_indexes_transaction: IntGauge,
7881
pub num_epochs_to_retain_for_objects: IntGauge,
7982
pub num_epochs_to_retain_for_checkpoints: IntGauge,
8083
}
@@ -109,6 +112,12 @@ impl AuthorityStorePruningMetrics {
109112
registry
110113
)
111114
.unwrap(),
115+
last_pruned_indexes_transaction: register_int_gauge_with_registry!(
116+
"last_pruned_indexes_transaction",
117+
"Last pruned indexes transaction",
118+
registry
119+
)
120+
.unwrap(),
112121
num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
113122
"num_epochs_to_retain_for_objects",
114123
"Number of epochs to retain for objects",
@@ -552,6 +561,32 @@ impl AuthorityStorePruner {
552561
Ok(())
553562
}
554563

564+
fn prune_indexes(
565+
indexes: Option<&IndexStore>,
566+
config: &AuthorityStorePruningConfig,
567+
epoch_duration_ms: u64,
568+
metrics: &AuthorityStorePruningMetrics,
569+
) -> anyhow::Result<()> {
570+
if let (Some(mut epochs_to_retain), Some(indexes)) =
571+
(config.num_epochs_to_retain_for_indexes, indexes)
572+
{
573+
if epochs_to_retain < MIN_EPOCHS_TO_RETAIN_FOR_INDEXES {
574+
warn!("num_epochs_to_retain_for_indexes is too low. Resetting it to 7");
575+
epochs_to_retain = MIN_EPOCHS_TO_RETAIN_FOR_INDEXES;
576+
}
577+
let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
578+
if let Some(cut_time_ms) =
579+
u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
580+
{
581+
let transaction_id = indexes.prune(cut_time_ms)?;
582+
metrics
583+
.last_pruned_indexes_transaction
584+
.set(transaction_id as i64);
585+
}
586+
}
587+
Ok(())
588+
}
589+
555590
/// Identifies and compacts the next eligible SST file in the
556591
/// `AuthorityStore` that meets the specified conditions for manual
557592
/// compaction. This function checks each SST file's metadata, including
@@ -650,6 +685,7 @@ impl AuthorityStorePruner {
650685
perpetual_db: Arc<AuthorityPerpetualTables>,
651686
checkpoint_store: Arc<CheckpointStore>,
652687
rest_index: Option<Arc<RestIndexStore>>,
688+
jsonrpc_index: Option<Arc<IndexStore>>,
653689
pruner_db: Option<Arc<AuthorityPrunerTables>>,
654690
metrics: Arc<AuthorityStorePruningMetrics>,
655691
archive_readers: ArchiveReaderBalancer,
@@ -671,6 +707,8 @@ impl AuthorityStorePruner {
671707
tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
672708
let mut checkpoints_prune_interval =
673709
tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
710+
let mut indexes_prune_interval =
711+
tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
674712

675713
let perpetual_db_for_compaction = perpetual_db.clone();
676714
if let Some(delay_days) = config.periodic_compaction_threshold_days {
@@ -719,6 +757,11 @@ impl AuthorityStorePruner {
719757
error!("Failed to prune checkpoints: {:?}", err);
720758
}
721759
},
760+
_ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
761+
if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
762+
error!("Failed to prune indexes: {:?}", err);
763+
}
764+
}
722765
_ = &mut recv => break,
723766
}
724767
}
@@ -732,6 +775,7 @@ impl AuthorityStorePruner {
732775
perpetual_db: Arc<AuthorityPerpetualTables>,
733776
checkpoint_store: Arc<CheckpointStore>,
734777
rest_index: Option<Arc<RestIndexStore>>,
778+
jsonrpc_index: Option<Arc<IndexStore>>,
735779
mut pruning_config: AuthorityStorePruningConfig,
736780
is_validator: bool,
737781
epoch_duration_ms: u64,
@@ -759,6 +803,7 @@ impl AuthorityStorePruner {
759803
perpetual_db,
760804
checkpoint_store,
761805
rest_index,
806+
jsonrpc_index,
762807
pruner_db,
763808
AuthorityStorePruningMetrics::new(registry),
764809
archive_readers,

0 commit comments

Comments
 (0)