diff --git a/.gitignore b/.gitignore index 98f2e7e1f98..0b8608ac7cf 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ iota.log.* npm-debug.log* yarn-debug.log* yarn-error.log* +congestion_audit.jsonl # misc *.key diff --git a/Cargo.lock b/Cargo.lock index c5bdcc74a71..669879c6eae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5861,6 +5861,7 @@ dependencies = [ "num_cpus", "object_store 0.10.2", "once_cell", + "ordered-float 5.1.0", "parking_lot 0.12.3", "pprof", "pretty_assertions", @@ -10385,6 +10386,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" +dependencies = [ + "num-traits", + "rand 0.8.5", + "serde", +] + [[package]] name = "ouroboros" version = "0.17.2" @@ -11678,6 +11690,7 @@ dependencies = [ "libc", "rand_chacha 0.3.1", "rand_core 0.6.4", + "serde", ] [[package]] @@ -11736,6 +11749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ "getrandom 0.2.15", + "serde", ] [[package]] @@ -14294,7 +14308,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "ordered-float", + "ordered-float 2.10.1", ] [[package]] diff --git a/crates/iota-core/Cargo.toml b/crates/iota-core/Cargo.toml index 7c9f44bff73..6181d5f365c 100644 --- a/crates/iota-core/Cargo.toml +++ b/crates/iota-core/Cargo.toml @@ -57,6 +57,7 @@ tokio = { workspace = true, features = ["full", "tracing", "test-util"] } tokio-stream.workspace = true tracing.workspace = true twox-hash = "1.6" +ordered-float = { version = "5.0", features = ["serde"] } # internal dependencies consensus-config.workspace = true diff --git a/crates/iota-core/src/congestion_tracker.rs b/crates/iota-core/src/congestion_tracker.rs index 0f8fd83417e..e47a783c2a3 100644 --- a/crates/iota-core/src/congestion_tracker.rs +++ b/crates/iota-core/src/congestion_tracker.rs @@ -19,6 +19,10 @@ use iota_types::{ }; use moka::{ops::compute::Op, sync::Cache}; use serde::Deserialize; +use crate::model_updater::{ + build_cp_update_batch, build_train_tx_batch, HttpModelUpdater, ObjectCheckpointStats, + ObjectSnapshot, RawTxItem, ModelUpdater, +}; use tracing::info; use crate::execution_cache::TransactionCacheRead; @@ -139,9 +143,106 @@ pub struct CongestionTracker { reference_gas_price: u64, /// Key-value cache for storing congestion info of objects. object_congestion_info: Cache, + /// HTTP client for posting model updates/training batches. + model_updater: HttpModelUpdater, } impl CongestionTracker { + /// Compose and send model update and training batches for the given checkpoint. + fn inform_model( + &self, + checkpoint: &VerifiedCheckpoint, + congestion_txs_data: &[TxData], + clearing_txs_data: &[TxData], + ) { + // Single pass over both congested and clearing txs to build: + // - touched set + // - per-checkpoint stats (counts + flags) + // - raw training items and per-object min clearing map + let mut touched: std::collections::HashSet = std::collections::HashSet::new(); + let mut stats: HashMap = HashMap::new(); + let mut raw_txs: Vec = + Vec::with_capacity(congestion_txs_data.len() + clearing_txs_data.len()); + + for (is_congested, tx) in congestion_txs_data + .iter() + .map(|t| (true, t)) + .chain(clearing_txs_data.iter().map(|t| (false, t))) + { + // touched set + touched.extend(tx.objects.iter().cloned()); + + // stats per object + for oid in &tx.objects { + let entry = stats.entry(*oid).or_default(); + if is_congested { + entry.was_congested = true; + entry.congested_tx_count += 1; + } else { + entry.was_cleared = true; + entry.clearing_tx_count += 1; + } + } + + // raw training items + raw_txs.push(RawTxItem { + tx_digest: tx.digest.to_string(), + is_congested, + gas_price: tx.gas_price, + gas_price_feedback: if is_congested { tx.gas_price_feedback } else { None }, + touched_objects: tx.objects.clone(), + }); + + } + + // Build per-object required price in checkpoint using + // get_suggested_gas_price_for_objects() + let mut per_obj_required_in_cp: HashMap = HashMap::new(); + for oid in &touched { + if let Some(price) = self.get_suggested_gas_price_for_objects(std::iter::once(*oid)) { + per_obj_required_in_cp.insert(*oid, price); + } + } + + // Build snapshots from current cache using the touched set + let mut snapshots: HashMap = HashMap::new(); + for oid in &touched { + if let Some(info) = self.get_congestion_info(*oid) { + snapshots.insert( + *oid, + ObjectSnapshot { + latest_congestion_time: Some(info.latest_congestion_time), + highest_congestion_gas_price: info.highest_congestion_gas_price, + latest_clearing_time: info.latest_clearing_time, + lowest_clearing_gas_price: info + .lowest_clearing_gas_price + .unwrap_or(self.reference_gas_price), + hotness: info.hotness, + }, + ); + } + } + + // Post update batch + let update_batch = build_cp_update_batch( + checkpoint.timestamp_ms, + self.reference_gas_price, + touched.iter().cloned(), + &snapshots, + &stats, + ); + self.model_updater.post_update(update_batch); + + // Post train batch (if any) + if let Some(train_batch) = build_train_tx_batch( + checkpoint.timestamp_ms, + self.reference_gas_price, + &raw_txs, + &per_obj_required_in_cp, + ) { + self.model_updater.post_train_tx(train_batch); + } + } /// Create a new `CongestionTracker`. The cache capacity will be /// set to `CONGESTION_TRACKER_CACHE_CAPACITY`, which is `10_000`. pub fn new(reference_gas_price: u64) -> Self { @@ -157,6 +258,7 @@ impl CongestionTracker { Self { reference_gas_price, object_congestion_info: Cache::new(CONGESTION_TRACKER_CACHE_CAPACITY), + model_updater: HttpModelUpdater::default(), } } @@ -292,6 +394,9 @@ impl CongestionTracker { ); } + // Inform model right before dumping hotness CSV + self.inform_model(checkpoint, &congestion_txs_data, &clearing_txs_data); + if !self.get_all_hotness().is_empty() { info!( "Hotness after checkpoint {}: {:?}", @@ -1257,6 +1362,7 @@ mod tests { "obj1 should be removed from cache" ); let hotness = tracker.get_hotness_for_object(&obj2).unwrap_or(0.0); + println!("hotness for obj2: {}", hotness); assert!( hotness == 1.25 * HOTNESS_ADJUSTMENT_FACTOR, "hotness for obj2 should be 1.25" diff --git a/crates/iota-core/src/lib.rs b/crates/iota-core/src/lib.rs index 9b9f6b63b6b..73fa138c848 100644 --- a/crates/iota-core/src/lib.rs +++ b/crates/iota-core/src/lib.rs @@ -11,6 +11,7 @@ pub mod authority_client; pub mod authority_server; pub mod checkpoints; pub mod congestion_tracker; +pub mod model_updater; pub mod connection_monitor; pub mod consensus_adapter; pub mod consensus_handler; diff --git a/crates/iota-core/src/model_updater.rs b/crates/iota-core/src/model_updater.rs new file mode 100644 index 00000000000..879a4b90a41 --- /dev/null +++ b/crates/iota-core/src/model_updater.rs @@ -0,0 +1,481 @@ +// model posting / training logic for congestion tracking. + +use serde::Serialize; +use std::{ + collections::HashMap, + fs::{File, OpenOptions}, + io::Write, + path::PathBuf, + sync::{Arc, Mutex}, +}; +use iota_types::base_types::ObjectID; + +/// Default base URL for the predictor service. +pub const DEFAULT_PREDICTOR_BASE_URL: &str = "http://predictor:9666"; // docker-compose private network only; without compose use http://0.0.0.0:9666 +/// Default JSONL audit log path (created/appended in current working dir). +pub const AUDIT_LOG_PATH: &str = "congestion_audit.jsonl"; +/// Minimum reference gas price used in required-price lower bound. +pub const MIN_REFERENCE_GAS_PRICE: u64 = 1000; + +/// Rows describing per-object state used by the predictor's /update endpoint. +/// Note: This mirrors the shape currently produced inside CongestionTracker. +#[derive(Debug, Clone, Serialize)] +pub struct CpUpdateRow { + pub checkpoint_ms: u64, + pub reference_gas_price: u64, + pub object_id: String, + + // CongestionInfo snapshot (post-update if present) + pub latest_congestion_time: Option, + pub highest_congestion_gas_price: u64, + pub latest_clearing_time: Option, + pub lowest_clearing_gas_price: u64, + pub hotness: f64, + + // Per-checkpoint flags & counts for the object + pub was_touched_in_cp: bool, + pub was_congested_in_cp: bool, + pub was_cleared_in_cp: bool, + pub congested_tx_count_in_cp: u32, + pub clearing_tx_count_in_cp: u32, + + // Convenience normalized helpers + pub hotness_over_ref: f64, + pub highest_congestion_over_ref: f64, + pub lowest_clearing_over_ref: f64, +} + +#[derive(Debug, Clone, Serialize)] +pub struct CpUpdateBatch { + pub checkpoint_ms: u64, + pub rows: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct TrainTxItem { + pub checkpoint_ms: u64, + pub reference_gas_price: u64, + pub required_price_in_cp: u64, + pub object_ids: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct TrainTxBatch { + pub items: Vec, +} + +/// Snapshot of per-object state needed to compose update rows. +#[derive(Debug, Clone)] +pub struct ObjectSnapshot { + pub latest_congestion_time: Option, + pub highest_congestion_gas_price: u64, + pub latest_clearing_time: Option, + pub lowest_clearing_gas_price: u64, + pub hotness: f64, +} + +/// Per-checkpoint object stats derived from txs in this checkpoint. +#[derive(Debug, Clone, Default)] +pub struct ObjectCheckpointStats { + pub was_congested: bool, + pub was_cleared: bool, + pub congested_tx_count: u32, + pub clearing_tx_count: u32, +} + +/// Build a CpUpdateBatch from touched object ids, snapshots and per-checkpoint stats. +pub fn build_cp_update_batch( + checkpoint_ms: u64, + reference_gas_price: u64, + touched: impl IntoIterator, + snapshots: &HashMap, + stats: &HashMap, +) -> CpUpdateBatch { + let mut rows = Vec::new(); + for oid in touched { + if let Some(s) = snapshots.get(&oid) { + let st = stats.get(&oid).cloned().unwrap_or_default(); + let hot = s.hotness; + let highest_cong = s.highest_congestion_gas_price; + let lowest_clear = s.lowest_clearing_gas_price; + rows.push(CpUpdateRow { + checkpoint_ms, + reference_gas_price, + object_id: oid.to_string(), + + latest_congestion_time: s.latest_congestion_time, + highest_congestion_gas_price: highest_cong, + latest_clearing_time: s.latest_clearing_time, + lowest_clearing_gas_price: lowest_clear, + hotness: hot, + + was_touched_in_cp: true, + was_congested_in_cp: st.was_congested, + was_cleared_in_cp: st.was_cleared, + congested_tx_count_in_cp: st.congested_tx_count, + clearing_tx_count_in_cp: st.clearing_tx_count, + + hotness_over_ref: hot / reference_gas_price as f64, + highest_congestion_over_ref: if highest_cong > 0 { + highest_cong as f64 / reference_gas_price as f64 + } else { + 0.0 + }, + lowest_clearing_over_ref: if lowest_clear > 0 { + lowest_clear as f64 / reference_gas_price as f64 + } else { + 0.0 + }, + }); + } else { + let st = stats.get(&oid).cloned().unwrap_or_default(); + rows.push(CpUpdateRow { + checkpoint_ms, + reference_gas_price, + object_id: oid.to_string(), + + latest_congestion_time: None, + highest_congestion_gas_price: 0, + latest_clearing_time: None, + lowest_clearing_gas_price: 0, + hotness: 0.0, + + was_touched_in_cp: true, + was_congested_in_cp: st.was_congested, + was_cleared_in_cp: st.was_cleared, + congested_tx_count_in_cp: st.congested_tx_count, + clearing_tx_count_in_cp: st.clearing_tx_count, + + hotness_over_ref: 0.0, + highest_congestion_over_ref: 0.0, + lowest_clearing_over_ref: 0.0, + }); + } + } + CpUpdateBatch { checkpoint_ms, rows } +} + +/// Build a TrainTxBatch from raw txs and per-object min clearing prices for this checkpoint. +pub fn build_train_tx_batch( + checkpoint_ms: u64, + reference_gas_price: u64, + raw_txs: &[RawTxItem], + per_object_required_in_cp: &HashMap, +) -> Option { + let mut items = Vec::new(); + for tx in raw_txs { + let oids: Vec = tx.touched_objects.iter().map(|o| o.to_string()).collect(); + if oids.is_empty() { + continue; + } + + let required = if tx.is_congested { + tx.gas_price_feedback + .unwrap_or(tx.gas_price) + .max(MIN_REFERENCE_GAS_PRICE) + } else { + // For non-congested txs, use the per-object required price in this checkpoint + // (lowest clearing if present, otherwise highest congestion for that object), + // falling back to MIN_REFERENCE_GAS_PRICE if absent. + let mut req = MIN_REFERENCE_GAS_PRICE; + for oid in &tx.touched_objects { + if let Some(val) = per_object_required_in_cp.get(oid) { + req = req.max(*val); + } + } + req + }; + + items.push(TrainTxItem { + checkpoint_ms, + reference_gas_price, + required_price_in_cp: required, + object_ids: oids, + }); + } + if items.is_empty() { None } else { Some(TrainTxBatch { items }) } +} + +/// Trait abstraction for sending model-related updates. +/// +/// Implementations should be resilient: never panic on send errors and prefer +/// logging/metrics over propagating failures. +pub trait ModelUpdater: Send + Sync { + /// Post a batch of per-object updates to the predictor. Should send even if empty. + fn post_update(&self, batch: CpUpdateBatch); + + /// Post a batch of tx-level training items to the predictor. + fn post_train_tx(&self, batch: TrainTxBatch); +} + +/// No-op implementation useful for tests or when disabled by config. +pub struct NoopModelUpdater; + +impl ModelUpdater for NoopModelUpdater { + fn post_update(&self, batch: CpUpdateBatch) { + println!( + "[congestion/noop] skip POST /update (rows={})", + batch.rows.len() + ); + } + + fn post_train_tx(&self, batch: TrainTxBatch) { + println!( + "[congestion/noop] skip POST /train_tx (items={})", + batch.items.len() + ); + } +} + +/// HTTP implementation using reqwest, with async-or-blocking fallback depending +/// on the presence of a Tokio runtime. +pub struct HttpModelUpdater { + base_url: String, +} + +impl HttpModelUpdater { + pub fn new>(base_url: S) -> Self { + Self { + base_url: base_url.into(), + } + } + + fn update_url(&self) -> String { + format!("{}/update", self.base_url.trim_end_matches('/')) + } + + fn train_tx_url(&self) -> String { + format!("{}/train_tx", self.base_url.trim_end_matches('/')) + } +} + +impl Default for HttpModelUpdater { + fn default() -> Self { + Self::new(DEFAULT_PREDICTOR_BASE_URL) + } +} + +impl ModelUpdater for HttpModelUpdater { + fn post_update(&self, batch: CpUpdateBatch) { + let url = self.update_url(); + let rows = batch.rows.len(); + + if let Ok(_h) = tokio::runtime::Handle::try_current() { + // Prefer async if a runtime exists. + tokio::spawn(async move { + let client = reqwest::Client::new(); + match client.post(&url).json(&batch).send().await { + Ok(resp) if resp.status().is_success() => { + println!("[congestion] POST /update ok (rows={rows})"); + } + Ok(resp) => { + println!( + "[congestion] POST /update failed: status={} (rows={rows})", + resp.status() + ); + } + Err(e) => { + println!("[congestion] POST /update error: {e} (rows={rows})"); + } + } + }); + } else { + // Fallback to a blocking client on a background thread. + std::thread::spawn(move || { + let client = reqwest::blocking::Client::new(); + match client.post(&url).json(&batch).send() { + Ok(resp) if resp.status().is_success() => { + println!("[congestion/blocking] POST /update ok (rows={rows})"); + } + Ok(resp) => { + println!( + "[congestion/blocking] POST /update failed: status={} (rows={rows})", + resp.status() + ); + } + Err(e) => { + println!("[congestion/blocking] POST /update error: {e} (rows={rows})"); + } + } + }); + } + } + + fn post_train_tx(&self, batch: TrainTxBatch) { + let url = self.train_tx_url(); + let items = batch.items.len(); + + if let Ok(_h) = tokio::runtime::Handle::try_current() { + tokio::spawn(async move { + let client = reqwest::Client::new(); + match client.post(&url).json(&batch).send().await { + Ok(resp) if resp.status().is_success() => { + println!("[congestion] POST /train_tx ok (items={items})"); + } + Ok(resp) => { + println!( + "[congestion] POST /train_tx failed: status={} (items={items})", + resp.status() + ); + } + Err(e) => { + println!("[congestion] POST /train_tx error: {e} (items={items})"); + } + } + }); + } else { + std::thread::spawn(move || { + let client = reqwest::blocking::Client::new(); + match client.post(&url).json(&batch).send() { + Ok(resp) if resp.status().is_success() => { + println!( + "[congestion/blocking] POST /train_tx ok (items={items})" + ); + } + Ok(resp) => { + println!( + "[congestion/blocking] POST /train_tx failed: status={} (items={items})", + resp.status() + ); + } + Err(e) => { + println!( + "[congestion/blocking] POST /train_tx error: {e} (items={items})" + ); + } + } + }); + } + } +} + +// ------------------------------- +// Audit JSONL support +// ------------------------------- + +#[derive(Debug, Clone, Serialize)] +pub struct TxAuditRow { + pub checkpoint_id: u64, + pub reference_gas_price: u64, + pub tx_digest: String, + pub is_congested: bool, + pub gas_price: u64, + pub gas_price_feedback: Option, + pub touched_objects: Vec, + pub required_price_in_cp: u64, + pub overpay: u64, +} + +/// Raw tx representation used to derive audit rows. +#[derive(Debug, Clone)] +pub struct RawTxItem { + pub tx_digest: String, + pub is_congested: bool, + pub gas_price: u64, + pub gas_price_feedback: Option, + pub touched_objects: Vec, +} + +/// Helper for stable default of the object-id to skip in audit. +pub fn default_skip_audit_object_id() -> ObjectID { + ObjectID::from_single_byte(0x6) +} + +/// Build audit rows based on raw tx records and per-object min clearing price in the checkpoint. +pub fn build_tx_audit_rows( + raw_txs: &[RawTxItem], + per_object_min_clearing_in_cp: &HashMap, + reference_gas_price: u64, + checkpoint_id: u64, + skip_object: Option, +) -> Vec { + let mut out = Vec::with_capacity(raw_txs.len()); + for tx in raw_txs { + if let Some(skip) = skip_object { + if tx.touched_objects.len() == 1 && tx.touched_objects[0] == skip { + continue; + } + } + + // required price: congested -> feedback (or gas price) ; + // non-congested -> max per-object min clearing price observed in CP. + let required = if tx.is_congested { + tx.gas_price_feedback + .unwrap_or(tx.gas_price) + .max(MIN_REFERENCE_GAS_PRICE) + } else { + let mut req = MIN_REFERENCE_GAS_PRICE; + for oid in &tx.touched_objects { + if let Some(min_clear) = per_object_min_clearing_in_cp.get(oid) { + if *min_clear > MIN_REFERENCE_GAS_PRICE { + req = req.max(*min_clear); + } + } + } + req + }; + + let overpay = tx.gas_price.saturating_sub(required); + out.push(TxAuditRow { + checkpoint_id, + reference_gas_price, + tx_digest: tx.tx_digest.clone(), + is_congested: tx.is_congested, + gas_price: tx.gas_price, + gas_price_feedback: tx.gas_price_feedback, + touched_objects: tx + .touched_objects + .iter() + .map(|o| o.to_string()) + .collect(), + required_price_in_cp: required, + overpay, + }); + } + out +} + +/// Lightweight JSONL writer with an internal mutex; safe to share. +pub struct AuditLogger { + writer: Option>>, +} + +impl AuditLogger { + /// Opens (or creates) a JSONL file at `path` for append. + pub fn new_with_path(path: PathBuf) -> Self { + let writer = match OpenOptions::new().create(true).append(true).open(&path) { + Ok(f) => Some(Arc::new(Mutex::new(f))), + Err(e) => { + eprintln!("[congestion/audit] failed to open {:?}: {e}", path); + None + } + }; + Self { writer } + } + + /// Opens (or creates) the default audit file `AUDIT_LOG_PATH` for append. + pub fn new_default() -> Self { + Self::new_with_path(PathBuf::from(AUDIT_LOG_PATH)) + } + + /// Writes each row as a serialized JSON line; logs errors and continues. + pub fn write_rows(&self, rows: &[TxAuditRow]) { + if let Some(w) = &self.writer { + let mut guard = w.lock().unwrap(); + for row in rows { + match serde_json::to_string(row) { + Ok(line) => { + if let Err(e) = writeln!(guard, "{}", line) { + eprintln!("[congestion/audit] write error: {e}"); + break; + } + } + Err(e) => { + eprintln!("[congestion/audit] serialize error: {e}"); + } + } + } + let _ = guard.flush(); + } + } +} diff --git a/dev-tools/iota-private-network/docker-compose.yaml b/dev-tools/iota-private-network/docker-compose.yaml index 5a3f0cab0cc..1df37e0bc72 100644 --- a/dev-tools/iota-private-network/docker-compose.yaml +++ b/dev-tools/iota-private-network/docker-compose.yaml @@ -264,16 +264,20 @@ services: networks: iota-network: ipv4_address: 10.0.0.35 + working_dir: /opt/iota/logs volumes: - ./data/fullnode-1:/opt/iota/db:rw - ./configs/fullnodes/fullnode.yaml:/opt/iota/config/fullnode.yaml:ro - ./configs/genesis/genesis.blob:/opt/iota/config/genesis.blob:ro - ./results:/iota/crates/iota-core/src/results + - ./logs/fullnode-1:/opt/iota/logs:rw expose: - "9000" ports: - "127.0.0.1:9000:9000/tcp" - "127.0.0.1:9184:9184/tcp" + depends_on: + - predictor fullnode-2: <<: *common-fullnode @@ -282,13 +286,17 @@ services: networks: iota-network: ipv4_address: 10.0.0.36 + working_dir: /opt/iota/logs volumes: - ./data/fullnode-2:/opt/iota/db:rw - ./configs/fullnodes/backup.yaml:/opt/iota/config/fullnode.yaml:ro - ./configs/genesis/genesis.blob:/opt/iota/config/genesis.blob:ro + - ./logs/fullnode-2:/opt/iota/logs:rw ports: - "127.0.0.1:9001:9000/tcp" - "127.0.0.1:9185:9184/tcp" + depends_on: + - predictor fullnode-3: <<: *common-fullnode @@ -297,13 +305,17 @@ services: networks: iota-network: ipv4_address: 10.0.0.37 + working_dir: /opt/iota/logs volumes: - ./data/fullnode-3:/opt/iota/db:rw - ./configs/fullnodes/fullnode.yaml:/opt/iota/config/fullnode.yaml:ro - ./configs/genesis/genesis.blob:/opt/iota/config/genesis.blob:ro + - ./logs/fullnode-3:/opt/iota/logs:rw ports: - "127.0.0.1:9002:9000/tcp" - "127.0.0.1:9186:9184/tcp" + depends_on: + - predictor fullnode-4: <<: *common-fullnode @@ -312,13 +324,33 @@ services: networks: iota-network: ipv4_address: 10.0.0.38 + working_dir: /opt/iota/logs volumes: - ./data/fullnode-4:/opt/iota/db:rw - ./configs/fullnodes/fullnode.yaml:/opt/iota/config/fullnode.yaml:ro - ./configs/genesis/genesis.blob:/opt/iota/config/genesis.blob:ro + - ./logs/fullnode-4:/opt/iota/logs:rw ports: - "127.0.0.1:9003:9000/tcp" - "127.0.0.1:9187:9184/tcp" + depends_on: + - predictor + + predictor: + image: iota-predictor:latest + container_name: predictor + hostname: predictor + networks: + iota-network: + ipv4_address: 10.0.0.44 + ports: + - "127.0.0.1:9666:9666/tcp" + healthcheck: + test: ["CMD", "curl", "-fsS", "http://localhost:9666/health"] + interval: 10s + timeout: 5s + retries: 10 + restart: unless-stopped indexer-1: image: iotaledger/iota-indexer