Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iota.log.*
npm-debug.log*
yarn-debug.log*
yarn-error.log*
congestion_audit.jsonl

# misc
*.key
Expand Down
1 change: 1 addition & 0 deletions crates/iota-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tokio = { workspace = true, features = ["full", "tracing", "test-util"] }
tokio-stream.workspace = true
tracing.workspace = true
twox-hash = "1.6"
ordered-float = { version = "5.0", features = ["serde"] }

# internal dependencies
consensus-config.workspace = true
Expand Down
109 changes: 109 additions & 0 deletions crates/iota-core/src/congestion_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use iota_types::{
};
use moka::{ops::compute::Op, sync::Cache};
use serde::Deserialize;
use crate::model_updater::{
build_cp_update_batch, build_train_tx_batch, HttpModelUpdater, ObjectCheckpointStats,
ObjectSnapshot, RawTxItem, ModelUpdater,
};
use tracing::info;

use crate::execution_cache::TransactionCacheRead;
Expand Down Expand Up @@ -139,9 +143,109 @@ pub struct CongestionTracker {
reference_gas_price: u64,
/// Key-value cache for storing congestion info of objects.
object_congestion_info: Cache<ObjectID, CongestionInfo>,
/// HTTP client for posting model updates/training batches.
model_updater: HttpModelUpdater,
}

impl CongestionTracker {
/// Compose and send model update and training batches for the given checkpoint.
fn inform_model(
&self,
checkpoint: &VerifiedCheckpoint,
congestion_txs_data: &[TxData],
clearing_txs_data: &[TxData],
) {
// 1) Build touched set
let mut touched: std::collections::HashSet<ObjectID> = std::collections::HashSet::new();
for tx in congestion_txs_data {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you don't differentiate between write shared objects, read shared objects and owned objects. That means that read-only or owned objects would affect the prediction, which shouldn't be the case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, check the filter at line 327

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@salaheldinsoliman I think my comment was wrong here: you are using the same congestion_txs_data and cleared_txs_data from process_checkpoint_effect() so there is actually no need to include the mutate filter for congested objects -- in fact, we can only have congestion for write shared objects. I suggest to revert ddb1243

touched.extend(tx.objects.iter().cloned());
}
for tx in clearing_txs_data {
touched.extend(tx.objects.iter().cloned());
}

// 2) Build snapshots from current cache
let mut snapshots: HashMap<ObjectID, ObjectSnapshot> = HashMap::new();
for oid in &touched {
if let Some(info) = self.get_congestion_info(*oid) {
snapshots.insert(
*oid,
ObjectSnapshot {
latest_congestion_time: Some(info.latest_congestion_time),
highest_congestion_gas_price: info.highest_congestion_gas_price,
latest_clearing_time: info.latest_clearing_time,
lowest_clearing_gas_price: info.lowest_clearing_gas_price.unwrap_or(0),
hotness: info.hotness,
},
);
}
}

// 3) Build per-checkpoint stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both 2) and 3) are "per-checkpoint stats". The difference is that 2) is retrieved directly from congestion_info, while 3) is computed in this function. Stats in 2) provide richer information as they also include gas prices. I'm wondering if stats in 3) are really needed and if they actually contributed to improve the NN prediction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point 👍
Step (2) is a post‑checkpoint snapshot from the cache (latest_* times, gas‑price extremes, hotness),
while step (3) adds per‑checkpoint intensity: how many congested/clearing txs touched the object. The booleans in (3) are redundant (derivable from (2) by comparing latest_*_time to this checkpoint), but the counts are not and provide extra signal for the predictor.
I think we should keep the counts for now, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, there is no harm to keep them

let mut stats: HashMap<ObjectID, ObjectCheckpointStats> = HashMap::new();
for tx in congestion_txs_data {
for oid in &tx.objects {
let entry = stats.entry(*oid).or_default();
entry.was_congested = true;
entry.congested_tx_count += 1;
}
}
for tx in clearing_txs_data {
for oid in &tx.objects {
let entry = stats.entry(*oid).or_default();
entry.was_cleared = true;
entry.clearing_tx_count += 1;
}
}

// 4) Post update batch
let update_batch = build_cp_update_batch(
checkpoint.timestamp_ms,
self.reference_gas_price,
touched.iter().cloned(),
&snapshots,
&stats,
);
self.model_updater.post_update(update_batch);

// 5) Build raw tx items and per-object min clearing for training
let mut raw_txs: Vec<RawTxItem> = Vec::with_capacity(
congestion_txs_data.len() + clearing_txs_data.len(),
);
let mut per_obj_min_clearing: HashMap<ObjectID, u64> = HashMap::new();
for tx in congestion_txs_data {
raw_txs.push(RawTxItem {
tx_digest: tx.digest.to_string(),
is_congested: true,
gas_price: tx.gas_price,
gas_price_feedback: tx.gas_price_feedback,
touched_objects: tx.objects.clone(),
});
}
for tx in clearing_txs_data {
raw_txs.push(RawTxItem {
tx_digest: tx.digest.to_string(),
is_congested: false,
gas_price: tx.gas_price,
gas_price_feedback: None,
touched_objects: tx.objects.clone(),
});
for oid in &tx.objects {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your per_obj_min_clearing vector corresponds to get_prediction_suggested_gas_price computed for all objects in the checkpoint. However, your code doesn't handle the rare but possible case where no clearing data are present and all transactions are canceled for a given checkpoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, what I did now is basically:
A second accumulator that tracks, per object, the highest congestion requirement seen in the checkpoint (using gas_price_feedback if present, otherwise the gas price,).
After the single pass over transactions, I merge the two into a per_obj_required_in_cp map: it prefers the lowest clearing price when available, and otherwise falls back to that highest congestion value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I would suggest to reuse the existing congestion tracker logic. Specifically, I would suggest to directly use the get_suggested_gas_price_for_objects() function which already does what you want. I would remove per_obj_min_clearing and per_obj_max_congestion and use something like:

let price = self.get_suggested_gas_price_for_objects(std::iter::once(*oid));

to store data into per_obj_required_in_cp.

per_obj_min_clearing
.entry(*oid)
.and_modify(|m| *m = (*m).min(tx.gas_price))
.or_insert(tx.gas_price);
}
}
if let Some(train_batch) = build_train_tx_batch(
checkpoint.timestamp_ms,
self.reference_gas_price,
&raw_txs,
&per_obj_min_clearing,
) {
self.model_updater.post_train_tx(train_batch);
}
}
/// Create a new `CongestionTracker`. The cache capacity will be
/// set to `CONGESTION_TRACKER_CACHE_CAPACITY`, which is `10_000`.
pub fn new(reference_gas_price: u64) -> Self {
Expand All @@ -157,6 +261,7 @@ impl CongestionTracker {
Self {
reference_gas_price,
object_congestion_info: Cache::new(CONGESTION_TRACKER_CACHE_CAPACITY),
model_updater: HttpModelUpdater::default(),
}
}

Expand Down Expand Up @@ -292,6 +397,9 @@ impl CongestionTracker {
);
}

// Inform model right before dumping hotness CSV
self.inform_model(checkpoint, &congestion_txs_data, &clearing_txs_data);

if !self.get_all_hotness().is_empty() {
info!(
"Hotness after checkpoint {}: {:?}",
Expand Down Expand Up @@ -1257,6 +1365,7 @@ mod tests {
"obj1 should be removed from cache"
);
let hotness = tracker.get_hotness_for_object(&obj2).unwrap_or(0.0);
println!("hotness for obj2: {}", hotness);
assert!(
hotness == 1.25 * HOTNESS_ADJUSTMENT_FACTOR,
"hotness for obj2 should be 1.25"
Expand Down
1 change: 1 addition & 0 deletions crates/iota-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod authority_client;
pub mod authority_server;
pub mod checkpoints;
pub mod congestion_tracker;
pub mod model_updater;
pub mod connection_monitor;
pub mod consensus_adapter;
pub mod consensus_handler;
Expand Down
Loading
Loading