-
Notifications
You must be signed in to change notification settings - Fork 54
feat(iota-core): update neural network model server from congestion tracker module #8993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: protocol-research/ogd-predictor-experiments
Are you sure you want to change the base?
Changes from 4 commits
ec2ec7a
058b4e1
d61cf93
e9ef14b
ddb1243
f312ef8
a9360db
12cace1
8d6cd4d
5cc2aad
b546779
57fd3a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,7 @@ iota.log.* | |
| npm-debug.log* | ||
| yarn-debug.log* | ||
| yarn-error.log* | ||
| congestion_audit.jsonl | ||
|
|
||
| # misc | ||
| *.key | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,10 @@ use iota_types::{ | |
| }; | ||
| use moka::{ops::compute::Op, sync::Cache}; | ||
| use serde::Deserialize; | ||
| use crate::model_updater::{ | ||
| build_cp_update_batch, build_train_tx_batch, HttpModelUpdater, ObjectCheckpointStats, | ||
| ObjectSnapshot, RawTxItem, ModelUpdater, | ||
| }; | ||
| use tracing::info; | ||
|
|
||
| use crate::execution_cache::TransactionCacheRead; | ||
|
|
@@ -139,9 +143,109 @@ pub struct CongestionTracker { | |
| reference_gas_price: u64, | ||
| /// Key-value cache for storing congestion info of objects. | ||
| object_congestion_info: Cache<ObjectID, CongestionInfo>, | ||
| /// HTTP client for posting model updates/training batches. | ||
| model_updater: HttpModelUpdater, | ||
| } | ||
|
|
||
| impl CongestionTracker { | ||
| /// Compose and send model update and training batches for the given checkpoint. | ||
| fn inform_model( | ||
| &self, | ||
| checkpoint: &VerifiedCheckpoint, | ||
| congestion_txs_data: &[TxData], | ||
| clearing_txs_data: &[TxData], | ||
| ) { | ||
| // 1) Build touched set | ||
| let mut touched: std::collections::HashSet<ObjectID> = std::collections::HashSet::new(); | ||
| for tx in congestion_txs_data { | ||
| touched.extend(tx.objects.iter().cloned()); | ||
| } | ||
| for tx in clearing_txs_data { | ||
| touched.extend(tx.objects.iter().cloned()); | ||
| } | ||
|
|
||
| // 2) Build snapshots from current cache | ||
| let mut snapshots: HashMap<ObjectID, ObjectSnapshot> = HashMap::new(); | ||
| for oid in &touched { | ||
| if let Some(info) = self.get_congestion_info(*oid) { | ||
| snapshots.insert( | ||
| *oid, | ||
| ObjectSnapshot { | ||
| latest_congestion_time: Some(info.latest_congestion_time), | ||
| highest_congestion_gas_price: info.highest_congestion_gas_price, | ||
| latest_clearing_time: info.latest_clearing_time, | ||
| lowest_clearing_gas_price: info.lowest_clearing_gas_price.unwrap_or(0), | ||
vekkiokonio marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| hotness: info.hotness, | ||
| }, | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // 3) Build per-checkpoint stats | ||
|
||
| let mut stats: HashMap<ObjectID, ObjectCheckpointStats> = HashMap::new(); | ||
| for tx in congestion_txs_data { | ||
| for oid in &tx.objects { | ||
| let entry = stats.entry(*oid).or_default(); | ||
| entry.was_congested = true; | ||
| entry.congested_tx_count += 1; | ||
| } | ||
| } | ||
| for tx in clearing_txs_data { | ||
| for oid in &tx.objects { | ||
| let entry = stats.entry(*oid).or_default(); | ||
| entry.was_cleared = true; | ||
| entry.clearing_tx_count += 1; | ||
| } | ||
| } | ||
|
|
||
| // 4) Post update batch | ||
| let update_batch = build_cp_update_batch( | ||
| checkpoint.timestamp_ms, | ||
| self.reference_gas_price, | ||
| touched.iter().cloned(), | ||
| &snapshots, | ||
| &stats, | ||
| ); | ||
| self.model_updater.post_update(update_batch); | ||
|
|
||
| // 5) Build raw tx items and per-object min clearing for training | ||
| let mut raw_txs: Vec<RawTxItem> = Vec::with_capacity( | ||
| congestion_txs_data.len() + clearing_txs_data.len(), | ||
| ); | ||
| let mut per_obj_min_clearing: HashMap<ObjectID, u64> = HashMap::new(); | ||
| for tx in congestion_txs_data { | ||
vekkiokonio marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| raw_txs.push(RawTxItem { | ||
| tx_digest: tx.digest.to_string(), | ||
| is_congested: true, | ||
| gas_price: tx.gas_price, | ||
| gas_price_feedback: tx.gas_price_feedback, | ||
| touched_objects: tx.objects.clone(), | ||
| }); | ||
| } | ||
| for tx in clearing_txs_data { | ||
| raw_txs.push(RawTxItem { | ||
| tx_digest: tx.digest.to_string(), | ||
| is_congested: false, | ||
| gas_price: tx.gas_price, | ||
| gas_price_feedback: None, | ||
| touched_objects: tx.objects.clone(), | ||
| }); | ||
| for oid in &tx.objects { | ||
|
||
| per_obj_min_clearing | ||
| .entry(*oid) | ||
| .and_modify(|m| *m = (*m).min(tx.gas_price)) | ||
| .or_insert(tx.gas_price); | ||
| } | ||
| } | ||
| if let Some(train_batch) = build_train_tx_batch( | ||
| checkpoint.timestamp_ms, | ||
| self.reference_gas_price, | ||
| &raw_txs, | ||
| &per_obj_min_clearing, | ||
| ) { | ||
| self.model_updater.post_train_tx(train_batch); | ||
| } | ||
| } | ||
| /// Create a new `CongestionTracker`. The cache capacity will be | ||
| /// set to `CONGESTION_TRACKER_CACHE_CAPACITY`, which is `10_000`. | ||
| pub fn new(reference_gas_price: u64) -> Self { | ||
|
|
@@ -157,6 +261,7 @@ impl CongestionTracker { | |
| Self { | ||
| reference_gas_price, | ||
| object_congestion_info: Cache::new(CONGESTION_TRACKER_CACHE_CAPACITY), | ||
| model_updater: HttpModelUpdater::default(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -292,6 +397,9 @@ impl CongestionTracker { | |
| ); | ||
| } | ||
|
|
||
| // Inform model right before dumping hotness CSV | ||
| self.inform_model(checkpoint, &congestion_txs_data, &clearing_txs_data); | ||
|
|
||
| if !self.get_all_hotness().is_empty() { | ||
| info!( | ||
| "Hotness after checkpoint {}: {:?}", | ||
|
|
@@ -1257,6 +1365,7 @@ mod tests { | |
| "obj1 should be removed from cache" | ||
| ); | ||
| let hotness = tracker.get_hotness_for_object(&obj2).unwrap_or(0.0); | ||
| println!("hotness for obj2: {}", hotness); | ||
| assert!( | ||
| hotness == 1.25 * HOTNESS_ADJUSTMENT_FACTOR, | ||
| "hotness for obj2 should be 1.25" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you don't differentiate between write shared objects, read shared objects and owned objects. That means that read-only or owned objects would affect the prediction, which shouldn't be the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, check the filter at line 327
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@salaheldinsoliman I think my comment was wrong here: you are using the same
congestion_txs_dataandcleared_txs_datafromprocess_checkpoint_effect()so there is actually no need to include the mutate filter for congested objects -- in fact, we can only have congestion for write shared objects. I suggest to revert ddb1243