Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iota.log.*
npm-debug.log*
yarn-debug.log*
yarn-error.log*
congestion_audit.jsonl

# misc
*.key
Expand Down
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iota-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tokio = { workspace = true, features = ["full", "tracing", "test-util"] }
tokio-stream.workspace = true
tracing.workspace = true
twox-hash = "1.6"
ordered-float = { version = "5.0", features = ["serde"] }

# internal dependencies
consensus-config.workspace = true
Expand Down
183 changes: 158 additions & 25 deletions crates/iota-core/src/congestion_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use iota_types::{
};
use moka::{ops::compute::Op, sync::Cache};
use serde::Deserialize;
use crate::model_updater::{
build_cp_update_batch, build_train_tx_batch, HttpModelUpdater, ObjectCheckpointStats,
ObjectSnapshot, RawTxItem, ModelUpdater,
};
use tracing::info;

use crate::execution_cache::TransactionCacheRead;
Expand Down Expand Up @@ -139,9 +143,128 @@ pub struct CongestionTracker {
reference_gas_price: u64,
/// Key-value cache for storing congestion info of objects.
object_congestion_info: Cache<ObjectID, CongestionInfo>,
/// HTTP client for posting model updates/training batches.
model_updater: HttpModelUpdater,
}

impl CongestionTracker {
/// Compose and send model update and training batches for the given checkpoint.
fn inform_model(
&self,
checkpoint: &VerifiedCheckpoint,
congestion_txs_data: &[TxData],
clearing_txs_data: &[TxData],
) {
// Single pass over both congested and clearing txs to build:
// - touched set
// - per-checkpoint stats (counts + flags)
// - raw training items and per-object min clearing map
let mut touched: std::collections::HashSet<ObjectID> = std::collections::HashSet::new();
let mut stats: HashMap<ObjectID, ObjectCheckpointStats> = HashMap::new();
let mut raw_txs: Vec<RawTxItem> =
Vec::with_capacity(congestion_txs_data.len() + clearing_txs_data.len());
let mut per_obj_min_clearing: HashMap<ObjectID, u64> = HashMap::new();
let mut per_obj_max_congestion: HashMap<ObjectID, u64> = HashMap::new();

for (is_congested, tx) in congestion_txs_data
.iter()
.map(|t| (true, t))
.chain(clearing_txs_data.iter().map(|t| (false, t)))
{
// touched set
touched.extend(tx.objects.iter().cloned());

// stats per object
for oid in &tx.objects {
let entry = stats.entry(*oid).or_default();
if is_congested {
entry.was_congested = true;
entry.congested_tx_count += 1;
} else {
entry.was_cleared = true;
entry.clearing_tx_count += 1;
}
}

// raw training items
raw_txs.push(RawTxItem {
tx_digest: tx.digest.to_string(),
is_congested,
gas_price: tx.gas_price,
gas_price_feedback: if is_congested { tx.gas_price_feedback } else { None },
touched_objects: tx.objects.clone(),
});

// per-object min clearing for clearing txs
if !is_congested {
for oid in &tx.objects {
per_obj_min_clearing
.entry(*oid)
.and_modify(|m| *m = (*m).min(tx.gas_price))
.or_insert(tx.gas_price);
}
} else {
// Track per-object max congestion (using feedback if present, otherwise gas price)
let cong_req = tx.gas_price_feedback.unwrap_or(tx.gas_price);
for oid in &tx.objects {
per_obj_max_congestion
.entry(*oid)
.and_modify(|m| *m = (*m).max(cong_req))
.or_insert(cong_req);
}
}
}

// Build per-object required price in checkpoint:
// - prefer lowest clearing if present; otherwise use highest congestion observed.
let mut per_obj_required_in_cp: HashMap<ObjectID, u64> = HashMap::new();
for oid in &touched {
if let Some(c) = per_obj_min_clearing.get(oid) {
per_obj_required_in_cp.insert(*oid, *c);
} else if let Some(max_cong) = per_obj_max_congestion.get(oid) {
per_obj_required_in_cp.insert(*oid, *max_cong);
}
}

// Build snapshots from current cache using the touched set
let mut snapshots: HashMap<ObjectID, ObjectSnapshot> = HashMap::new();
for oid in &touched {
if let Some(info) = self.get_congestion_info(*oid) {
snapshots.insert(
*oid,
ObjectSnapshot {
latest_congestion_time: Some(info.latest_congestion_time),
highest_congestion_gas_price: info.highest_congestion_gas_price,
latest_clearing_time: info.latest_clearing_time,
lowest_clearing_gas_price: info
.lowest_clearing_gas_price
.unwrap_or(self.reference_gas_price),
hotness: info.hotness,
},
);
}
}

// Post update batch
let update_batch = build_cp_update_batch(
checkpoint.timestamp_ms,
self.reference_gas_price,
touched.iter().cloned(),
&snapshots,
&stats,
);
self.model_updater.post_update(update_batch);

// Post train batch (if any)
if let Some(train_batch) = build_train_tx_batch(
checkpoint.timestamp_ms,
self.reference_gas_price,
&raw_txs,
&per_obj_required_in_cp,
) {
self.model_updater.post_train_tx(train_batch);
}
}
/// Create a new `CongestionTracker`. The cache capacity will be
/// set to `CONGESTION_TRACKER_CACHE_CAPACITY`, which is `10_000`.
pub fn new(reference_gas_price: u64) -> Self {
Expand All @@ -157,6 +280,7 @@ impl CongestionTracker {
Self {
reference_gas_price,
object_congestion_info: Cache::new(CONGESTION_TRACKER_CACHE_CAPACITY),
model_updater: HttpModelUpdater::default(),
}
}

Expand Down Expand Up @@ -200,34 +324,39 @@ impl CongestionTracker {
continue;
}

if let Some(CongestedObjects(congested_objects)) =
// Compute the set of mutated shared objects for this transaction once,
// and only use those for congestion/clearing accounting and model input.
let mutated_objects: Vec<ObjectID> = effects
.input_shared_objects()
.into_iter()
.filter_map(|object| match object {
InputSharedObject::Mutate((id, _, _)) => Some(id),
_ => None,
})
.collect();

if let Some(CongestedObjects(_congested_objects)) =
effects.status().get_congested_objects()
{
congestion_txs_data.push(TxData {
checkpoint: checkpoint.sequence_number,
digest: *effects.transaction_digest(),
objects: congested_objects.clone(),
gas_price,
gas_price_feedback: Some(
effects
.status()
.get_feedback_suggested_gas_price()
.unwrap_or(self.reference_gas_price),
),
sui_prediction: sui_prediction,
ogd_prediction: ogd_prediction,
cleread: false,
});
// Only consider transactions that actually mutate shared objects.
if !mutated_objects.is_empty() {
congestion_txs_data.push(TxData {
checkpoint: checkpoint.sequence_number,
digest: *effects.transaction_digest(),
objects: mutated_objects.clone(),
gas_price,
gas_price_feedback: Some(
effects
.status()
.get_feedback_suggested_gas_price()
.unwrap_or(self.reference_gas_price),
),
sui_prediction: sui_prediction,
ogd_prediction: ogd_prediction,
cleread: false,
});
}
} else {
let mutated_objects: Vec<ObjectID> = effects
.input_shared_objects()
.into_iter()
.filter_map(|object| match object {
InputSharedObject::Mutate((id, _, _)) => Some(id),
_ => None,
})
.collect();

// Only push to clearing_txs_data if there are mutated objects
if !mutated_objects.is_empty() {
clearing_txs_data.push(TxData {
Expand Down Expand Up @@ -292,6 +421,9 @@ impl CongestionTracker {
);
}

// Inform model right before dumping hotness CSV
self.inform_model(checkpoint, &congestion_txs_data, &clearing_txs_data);

if !self.get_all_hotness().is_empty() {
info!(
"Hotness after checkpoint {}: {:?}",
Expand Down Expand Up @@ -1257,6 +1389,7 @@ mod tests {
"obj1 should be removed from cache"
);
let hotness = tracker.get_hotness_for_object(&obj2).unwrap_or(0.0);
println!("hotness for obj2: {}", hotness);
assert!(
hotness == 1.25 * HOTNESS_ADJUSTMENT_FACTOR,
"hotness for obj2 should be 1.25"
Expand Down
1 change: 1 addition & 0 deletions crates/iota-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod authority_client;
pub mod authority_server;
pub mod checkpoints;
pub mod congestion_tracker;
pub mod model_updater;
pub mod connection_monitor;
pub mod consensus_adapter;
pub mod consensus_handler;
Expand Down
Loading
Loading