diff --git a/crates/starfish/core/src/authority_service.rs b/crates/starfish/core/src/authority_service.rs index fc4d880c99c..00033f6fb5c 100644 --- a/crates/starfish/core/src/authority_service.rs +++ b/crates/starfish/core/src/authority_service.rs @@ -16,7 +16,7 @@ use futures::{Stream, StreamExt, ready, stream, task}; use iota_macros::fail_point_async; use parking_lot::RwLock; use starfish_config::AuthorityIndex; -use tokio::sync::{Mutex, broadcast, mpsc::Sender, oneshot}; +use tokio::sync::{Mutex, broadcast, mpsc::Sender}; use tokio_util::sync::ReusableBoxFuture; use tracing::{debug, info, warn}; @@ -31,16 +31,13 @@ use crate::{ commit::{CommitAPI as _, CommitRange, TrustedCommit}, commit_vote_monitor::CommitVoteMonitor, context::Context, - cordial_knowledge::{ - AdditionalPartsForBundle, ConnectionKnowledgeMessage::TakeAdditionalPartForBundle, - CordialKnowledgeHandle, - }, + cordial_knowledge::CordialKnowledgeHandle, core_thread::CoreThreadDispatcher, dag_state::DagState, encoder::ShardEncoder, error::{ConsensusError, ConsensusResult}, network::{ - BlockBundle, BlockBundleStream, NetworkService, SerializedBlock, SerializedBlockBundle, + BlockBundleStream, NetworkService, SerializedBlock, SerializedBlockBundle, SerializedBlockBundleParts, SerializedHeaderAndTransactions, SerializedTransactions, }, shard_reconstructor::TransactionMessage, @@ -550,15 +547,13 @@ impl NetworkService for AuthorityService { } // 13. Report useful info for cordial and connection knowledge - self.cordial_knowledge - .report_useful_authors( - peer, - &serialized_block_bundle_parts, - &additional_block_headers, - &missing_ancestors, - block_round, - ) - .await?; + self.cordial_knowledge.report_useful_authors( + peer, + &serialized_block_bundle_parts, + &additional_block_headers, + &missing_ancestors, + block_round, + )?; // 14. schedule the fetching of missing ancestors (if any) from this peer if !missing_ancestors.is_empty() { @@ -618,52 +613,27 @@ impl NetworkService for AuthorityService { self.subscription_counter.clone(), ); let context = self.context.clone(); - let connection_knowledge_sender = self.cordial_knowledge.connection_knowledge_sender(peer); + let connection_knowledge = self.cordial_knowledge.connection_knowledge(peer); // Return a stream of blocks that first yields missed blocks as requested, then // new blocks. Ok(Box::pin(missed_blocks.chain({ broadcasted_blocks.filter_map(move |block| { - let connection_knowledge_sender = connection_knowledge_sender.clone(); let context = context.clone(); - let ts = block.timestamp_ms(); + let connection_knowledge = connection_knowledge.clone(); async move { - let (tx, rx) = oneshot::channel(); + let ts = block.timestamp_ms(); - let msg = TakeAdditionalPartForBundle { - round_upper_bound_exclusive: block.round(), - respond_to: tx, + let block_bundle = { + let mut conn = connection_knowledge.write(); + conn.create_bundle(block) }; - // Send message asynchronously to corresponding ConnectionKnowledge task - if let Err(e) = connection_knowledge_sender.send(vec![msg]).await { - warn!("Failed to send TakeAdditionalPartForBundle to Connection Knowledge: {e}"); - return None; - } - - // Await response from ConnectionKnowledge - let block_bundle = match rx.await { - Ok(AdditionalPartsForBundle { - headers, - shards, - useful_headers_authors_from_peer, - useful_shards_authors_from_peer, - }) => BlockBundle { - verified_block: block, - verified_headers: headers, - serialized_shards: shards, - useful_headers_authors: useful_headers_authors_from_peer, - useful_shards_authors: useful_shards_authors_from_peer, - }, - Err(_) => { - warn!("Connection Knowledge oneshot dropped before response"); - // Construct bundle (fill in your actual vars) - return None; - } - }; - - let now = context.clock.timestamp_utc_ms(); - context.metrics.node_metrics.delay_in_sending_blocks.observe((now - ts) as f64); + context + .metrics + .node_metrics + .delay_in_sending_blocks + .observe((now - ts) as f64); match SerializedBlockBundle::try_from(block_bundle) { Ok(serialized_block_bundle) => Some(serialized_block_bundle), @@ -2493,8 +2463,7 @@ mod tests { } // Inject useful info - let connection_knowledge_sender = - cordial_knowledge.connection_knowledge_sender(to_whom_authority); + let connection_knowledge = cordial_knowledge.connection_knowledge(to_whom_authority); let msg = ConnectionKnowledgeMessage::UsefulAuthors { useful_headers_to_peer: BTreeMap::from([ (AuthorityIndex::new_for_test(2), GENESIS_ROUND), @@ -2508,10 +2477,11 @@ mod tests { (AuthorityIndex::new_for_test(1), GENESIS_ROUND), (AuthorityIndex::new_for_test(3), GENESIS_ROUND), ]), - useful_shards_from_peers: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)], + useful_shards_from_peer: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)], }; - let _ = connection_knowledge_sender.send(vec![msg]).await; - + { + connection_knowledge.write().process_one_message(msg); + } // WHEN // Call handle_subscribe_block_bundles_request with last_received = 2 let last_received_round = 2; diff --git a/crates/starfish/core/src/cordial_knowledge.rs b/crates/starfish/core/src/cordial_knowledge.rs index b4ad2a39582..194d33baa61 100644 --- a/crates/starfish/core/src/cordial_knowledge.rs +++ b/crates/starfish/core/src/cordial_knowledge.rs @@ -2,8 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ + cmp::max, collections::{BTreeMap, BTreeSet}, sync::Arc, + time::Duration, }; use ahash::{AHashMap, AHashSet}; @@ -13,24 +15,24 @@ use starfish_config::AuthorityIndex; use tokio::{ sync::{ Mutex, - mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel}, - oneshot, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, }, task::JoinError, }; -use tracing::{debug, warn}; +use tracing::debug; use crate::{ BlockHeaderAPI, BlockRef, Round, VerifiedBlockHeader, - block_header::{BlockHeaderDigest, GENESIS_ROUND}, + block_header::{BlockHeaderDigest, VerifiedBlock}, context::Context, dag_state::DagState, error::{ConsensusError, ConsensusResult}, - network::SerializedBlockBundleParts, + network::{BlockBundle, SerializedBlockBundleParts}, }; /// Maximum round gap to consider a peer's useful shards/headers as still -/// relevant. 40 rounds correspond to around 2 seconds +/// relevant. 40 rounds correspond to at least 2 second due to the minimum block +/// delay const MAX_ROUND_GAP_FOR_USEFUL_PARTS: Round = 40; /// Represents a subset of authorities using a bitmask. @@ -72,17 +74,17 @@ impl SubsetAuthorities { } /// Manages the global cordial knowledge state. -/// Receives high-level updates from DAG state and Authority service and +/// Receives high-level updates from DagState and AuthorityService and /// notifies per-connection tasks. pub(crate) struct CordialKnowledge { context: Arc, /// Receives high-level updates from DAG state (new headers, new own shards, - /// evictions) and Authority Service + /// evictions) and AuthorityService cordial_knowledge_receiver: UnboundedReceiver, /// Keeps track of the last round for which each peer's shards were /// considered useful to us. This is a global knowledge and is shared with /// all connection tasks. Initialized to None for all authorities and - /// updated over time once Authority Service reports useful shards from + /// updated over time once AuthorityService reports useful shards from /// peers. last_useful_shards_from_peer_round: Vec>, /// Keeps track of the most recent DAG cordial @@ -95,14 +97,14 @@ pub(crate) struct CordialKnowledge { /// tuple of (ancestors, who knows the block header). cordial_knowledge: Vec>>, - /// Per-connection message channels. They are used to notify each - /// connection task about updates from cordial knowledge. - connections: Vec>>, + /// Sender of updates for ConnectionKnowledges. Updates are produced by + /// CordialKnowledge + dissemination_sender: UnboundedSender>>, } /// High-level messages sent to the CordialKnowledge task. /// NewHeader, NewShard, EvictBelow are received from DAG state. -/// UsefulShardsFromPeers is received from Authority Service. +/// UsefulShardsFromPeers is received from AuthorityService. #[derive(Debug)] pub enum CordialKnowledgeMessage { /// A new verified block header to integrate into cordial knowledge. @@ -112,11 +114,12 @@ pub enum CordialKnowledgeMessage { /// Evict old rounds globally. EvictBelow(Vec), /// Update internal state about shards from which authorities are useful for - /// us + /// the local node UsefulShardsFromPeers(BTreeMap), } impl CordialKnowledgeMessage { + /// Outputs the type of CordialKnowledgeMessage in a string slice format fn type_label(&self) -> &'static str { match self { CordialKnowledgeMessage::NewHeader(_) => "New header", @@ -131,25 +134,26 @@ impl CordialKnowledgeMessage { /// shutdown. pub struct CordialKnowledgeHandle { cordial_knowledge_sender: UnboundedSender, - connection_knowledge_senders: Vec>>, - connection_handles: Mutex>>>, - join_handle: Mutex>>, + connection_knowledges: Vec>>, + cordial_knowledge_handle: Mutex>>, + dissemination_handle: Mutex>>, } impl CordialKnowledgeHandle { - /// Get a specific sender to send messages to the respected - /// ConnectionKnowledge task. - pub fn connection_knowledge_sender( + /// Outputs specific ConnectionKnowledge corresponding to a given + /// AuthorityIndex. + pub fn connection_knowledge( &self, authority_index: AuthorityIndex, - ) -> Sender> { - self.connection_knowledge_senders[authority_index].clone() + ) -> Arc> { + self.connection_knowledges[authority_index].clone() } + /// Gracefully stop the CordialKnowledge background task and all connection /// tasks. pub async fn stop(&self) -> Result<(), JoinError> { // Stop main CordialKnowledge loop - let mut guard = self.join_handle.lock().await; + let mut guard = self.cordial_knowledge_handle.lock().await; if let Some(main_handle) = guard.take() { main_handle.abort(); @@ -159,26 +163,23 @@ impl CordialKnowledgeHandle { Err(e) => return Err(e), } } + // Stop DisseminationWorker loop + let mut guard = self.dissemination_handle.lock().await; - // --- Stop all per-connection tasks --- - let mut conn_guard = self.connection_handles.lock().await; - for handle_opt in conn_guard.iter_mut() { - if let Some(handle) = handle_opt.take() { - handle.abort(); - match handle.await { - Ok(_) => (), - Err(e) if e.is_cancelled() => (), - Err(e) => return Err(e), - } + if let Some(dissemination_handle) = guard.take() { + dissemination_handle.abort(); + match dissemination_handle.await { + Ok(_) => (), + Err(e) if e.is_cancelled() => (), + Err(e) => return Err(e), } } Ok(()) } - - // Report from Authority Service useful information about headers and + /// Report from AuthorityService useful information about headers and /// shards to global knowledge and connection knowledge. - pub async fn report_useful_authors( + pub fn report_useful_authors( &self, peer: AuthorityIndex, serialized_block_bundle_parts: &SerializedBlockBundleParts, @@ -186,7 +187,6 @@ impl CordialKnowledgeHandle { missing_ancestors: &BTreeSet, block_round: Round, ) -> ConsensusResult<()> { - let connection_knowledge_sender = &self.connection_knowledge_senders[peer]; let cordial_knowledge_sender = &self.cordial_knowledge_sender; // Extract authorities this peer has useful headers from let useful_headers_authors_from_peer = additional_block_headers @@ -233,33 +233,121 @@ impl CordialKnowledgeHandle { useful_headers_to_peer, useful_shards_to_peer, useful_headers_from_peer, - useful_shards_from_peers: vec![], + useful_shards_from_peer: vec![None; self.connection_knowledges.len()], }; - connection_knowledge_sender - .send(vec![connection_knowledge_message]) - .await - .map_err(|_err| ConsensusError::Shutdown)?; - + { + let mut connection_knowledge_guard = self.connection_knowledges[peer].write(); + connection_knowledge_guard.process_one_message(connection_knowledge_message); + } // Notify global cordial knowledge about useful shards from this peer - let cordial_knowledge_message = - CordialKnowledgeMessage::UsefulShardsFromPeers(useful_shard_authors); - cordial_knowledge_sender - .send(cordial_knowledge_message) - .map_err(|_err| ConsensusError::Shutdown)?; + if !useful_shard_authors.is_empty() { + let cordial_knowledge_message = + CordialKnowledgeMessage::UsefulShardsFromPeers(useful_shard_authors); + cordial_knowledge_sender + .send(cordial_knowledge_message) + .map_err(|_err| ConsensusError::Shutdown)?; + } Ok(()) } } +/// Struct to disseminate information from CordialKnowledge to each +/// ConnectionKnowledge. It allows to reduce contention from a single runner of +/// CordialKnowledge. Otherwise, acquiring write locks for each update in +/// CordialKnowledge could take a significant time. For instance, for 150 +/// validators and 20 blocks per second, we could expect almost 10000 updates in +/// CordialKnowledge. Each update requires 150 write locks for +/// CordialKnowledge. One write lock could take 200ns, which results in +/// 300ms spent only for write locks. +pub struct DisseminationWorker { + context: Arc, + /// Each Connection Knowledge corresponds to one peer. Upon reception of a + /// message from CordialKnowledge, we propagate the respected + /// information for each connection. + connection_knowledges: Vec>>, + dissemination_receiver: UnboundedReceiver>>, +} + +impl DisseminationWorker { + fn new( + context: Arc, + connection_knowledges: Vec>>, + dissemination_receiver: UnboundedReceiver>>, + ) -> Self { + Self { + context, + connection_knowledges, + dissemination_receiver, + } + } + /// The dissemination worker makes dissemination to ConnectionKnowledge + /// structs in batches. It waits for + /// TIME_TO_BATCH_CONNECTION_KNOWLEDGE_MSGS, then drain the channel of + /// messages and disseminate them. With this approach, one acquire locks + /// in a predicted way while loosing some reactiveness. Instead of + /// potentially 10000 write locks, it could be up to 1 sec / + /// TIME_TO_BATCH_CONNECTION_KNOWLEDGE_MSGS + async fn run(mut self) { + const TIME_TO_BATCH_CONNECTION_KNOWLEDGE_MSGS: Duration = Duration::from_millis(5); + debug!("Dissemination Worker loop started"); + loop { + // Step 1: Wait for the first message in async + let first_batch_msgs = match self.dissemination_receiver.recv().await { + Some(batch) => batch, + None => { + debug!("Dissemination channel closed, worker exiting"); + break; + } + }; + let mut num_batches = 1; + + // Step 2: Initialize aggregation and add first batch + let mut aggregated: Vec> = first_batch_msgs; + + // Step 3: Drain the channel in sync and aggregate messages + while let Ok(batch) = self.dissemination_receiver.try_recv() { + for (i, msgs) in batch.into_iter().enumerate() { + aggregated[i].extend(msgs); + } + num_batches += 1; + } + + self.context + .metrics + .node_metrics + .cordial_knowledge_worker_batch_size + .observe(num_batches as f64); + + // Step 4: Process everything + for (connection_knowledge, msgs) in self + .connection_knowledges + .iter() + .zip(aggregated.into_iter()) + { + if !msgs.is_empty() { + let mut guard = connection_knowledge.write(); + guard.process_vec_messages(msgs); + } + } + + // Step 5: Sleep for short time before checking again + tokio::time::sleep(TIME_TO_BATCH_CONNECTION_KNOWLEDGE_MSGS).await; + } + } +} + impl CordialKnowledge { /// Create a new CordialKnowledge instance along with its associated /// channels. - pub fn new( + fn new( context: Arc, + dag_state: Arc>, ) -> ( Self, + Vec>>, UnboundedSender, - Vec>>, + UnboundedReceiver>>, ) { let num_authorities = context.committee.size(); @@ -269,30 +357,31 @@ impl CordialKnowledge { UnboundedReceiver, ) = unbounded_channel(); - // Bounded per-connection channels for controlled flow - let mut connections = Vec::new(); - let mut receivers = Vec::new(); + let (dissemination_sender, dissemination_receiver) = unbounded_channel(); + + let mut connection_knowledges = Vec::with_capacity(num_authorities); + + for peer_index in 0..num_authorities { + let peer = AuthorityIndex::from(peer_index as u8); + let connection_knowledge = + ConnectionKnowledge::new(context.clone(), peer, dag_state.clone()); - for _ in 0..num_authorities { - let (connection_sender, connection_receiver): ( - Sender>, - Receiver>, - ) = channel(512); + let connection_knowledge = Arc::new(RwLock::new(connection_knowledge)); - connections.push(connection_sender); - receivers.push(connection_receiver); + connection_knowledges.push(connection_knowledge); } ( Self { context, - connections, cordial_knowledge_receiver, + dissemination_sender, cordial_knowledge: vec![BTreeMap::new(); num_authorities], last_useful_shards_from_peer_round: vec![None; num_authorities], }, + connection_knowledges, cordial_knowledge_sender, - receivers, + dissemination_receiver, ) } @@ -304,59 +393,49 @@ impl CordialKnowledge { dag_state: Arc>, ) -> Arc { // Build main CordialKnowledge and associated channels - let (cordial_knowledge, sender, receivers) = CordialKnowledge::new(context.clone()); - let num_authorities = context.committee.size(); - - let connection_knowledge_sender = cordial_knowledge.connections.clone(); - - // Spawn one ConnectionKnowledge task per authority - let mut connection_handles = Vec::with_capacity(num_authorities); - - for (authority_index, receiver) in receivers.into_iter().enumerate() { - let connection_knowledge = ConnectionKnowledge::new( - context.clone(), - dag_state.clone(), - authority_index, - receiver, - ); - - // Spawn async run() for each peer connection - let task_handle = tokio::spawn(async move { - connection_knowledge.run().await; - }); - - connection_handles.push(Some(task_handle)); - } - + let ( + cordial_knowledge, + connection_knowledges, + cordial_knowledge_sender, + dissemination_receiver, + ) = CordialKnowledge::new(context.clone(), dag_state.clone()); // Spawn the main CordialKnowledge loop - let join_handle = tokio::spawn(async move { + let cordial_knowledge_handle = tokio::spawn(async move { cordial_knowledge.run().await; }); + let dissemination_worker = DisseminationWorker::new( + context.clone(), + connection_knowledges.clone(), + dissemination_receiver, + ); + let dissemination_handle = tokio::spawn(async move { + dissemination_worker.run().await; + }); + dag_state .write() - .set_cordial_knowledge_sender(sender.clone()); + .set_cordial_knowledge_sender(cordial_knowledge_sender.clone()); // Return handle with all pieces assembled Arc::new(CordialKnowledgeHandle { - cordial_knowledge_sender: sender, - connection_knowledge_senders: connection_knowledge_sender, - connection_handles: Mutex::new(connection_handles), - join_handle: Mutex::new(Some(join_handle)), + cordial_knowledge_sender, + connection_knowledges, + cordial_knowledge_handle: Mutex::new(Some(cordial_knowledge_handle)), + dissemination_handle: Mutex::new(Some(dissemination_handle)), }) } /// Main async loop: receives high-level updates (headers, shards, /// evictions) from DAG state and updates global knowledge + notifies /// per-connection tasks. - pub async fn run(mut self) { + async fn run(mut self) { debug!("Cordial Knowledge main loop started"); loop { match self.cordial_knowledge_receiver.recv().await { Some(msg) => { - // Handle the first received message - self.process_message(msg).await; + self.process_message(msg); // Report the buffer size after processing the first message let buffer_size = self.cordial_knowledge_receiver.len() + 1; @@ -377,7 +456,7 @@ impl CordialKnowledge { } /// Processes a single high-level cordial knowledge message. - async fn process_message(&mut self, cordial_knowledge_message: CordialKnowledgeMessage) { + fn process_message(&mut self, cordial_knowledge_message: CordialKnowledgeMessage) { // Report the type of message self.context .metrics @@ -387,117 +466,122 @@ impl CordialKnowledge { .inc(); // Handle the cordial knowledge message depending on its type - match cordial_knowledge_message { - CordialKnowledgeMessage::NewHeader(header) => { - self.update_cordial_knowledge(&header).await; - } - CordialKnowledgeMessage::NewShard(block_ref) => { - self.handle_new_shard(block_ref).await; - } - CordialKnowledgeMessage::EvictBelow(round) => { - self.handle_evict_below(round).await; - } + let vec_connection_knowledge_msgs = match cordial_knowledge_message { + CordialKnowledgeMessage::NewHeader(header) => self.update_cordial_knowledge(&header), + CordialKnowledgeMessage::NewShard(block_ref) => self.prepare_new_shard_msgs(block_ref), + CordialKnowledgeMessage::EvictBelow(round) => self.handle_evict_below(round), CordialKnowledgeMessage::UsefulShardsFromPeers(useful_shards_from_peer) => { self.handle_useful_shards_from(useful_shards_from_peer) - .await; } }; + if let Some(vec_msgs) = vec_connection_knowledge_msgs { + let _ = self.dissemination_sender.send(vec_msgs); + } } // Helper function to update authority rounds if the new round is greater fn update_authority_rounds_if_greater( target: &mut [Option], updates: BTreeMap, - ) { + ) -> bool { + let mut changed = false; for (authority, new_round) in updates { if let Some(existing_round) = &mut target[authority.value()] { if new_round > *existing_round { *existing_round = new_round; + changed = true; } } else { target[authority.value()] = Some(new_round); + changed = true; } } + changed } /// Update global knowledge about shards from which authors will be useful /// for us - async fn handle_useful_shards_from( + fn handle_useful_shards_from( &mut self, useful_shards_from_peer: BTreeMap, - ) { - Self::update_authority_rounds_if_greater( + ) -> Option>> { + if Self::update_authority_rounds_if_greater( &mut self.last_useful_shards_from_peer_round, useful_shards_from_peer, - ); - self.disseminate_useful_info_to_connection_tasks().await; + ) { + self.prepare_useful_shards_from_peers_msgs() + } else { + None + } } - /// Disseminate updated useful info to all connection tasks. - async fn disseminate_useful_info_to_connection_tasks(&mut self) { - for connection_sender in &self.connections { + /// Prepare useful authors message for each connection knowledge. + fn prepare_useful_shards_from_peers_msgs( + &mut self, + ) -> Option>> { + let mut vec_msgs: Vec> = + Vec::with_capacity(self.cordial_knowledge.len()); + for index in 0..self.cordial_knowledge.len() { + if index == self.context.own_index.value() { + vec_msgs.push(vec![]); + continue; + } let msg = ConnectionKnowledgeMessage::UsefulAuthors { - useful_shards_from_peers: self.last_useful_shards_from_peer_round.clone(), + useful_shards_from_peer: self.last_useful_shards_from_peer_round.clone(), useful_headers_from_peer: BTreeMap::new(), useful_headers_to_peer: BTreeMap::new(), useful_shards_to_peer: BTreeMap::new(), }; - if let Err(e) = connection_sender.send(vec![msg]).await { - warn!("Failed to send useful info to connection task: {}", e); - } + vec_msgs.push(vec![msg]); } + Some(vec_msgs) } /// Called when a new own shard (created locally) is added to dag state. - async fn handle_new_shard(&mut self, block_ref: BlockRef) { - for (index, tx) in self.connections.iter().enumerate() { + fn prepare_new_shard_msgs( + &mut self, + block_ref: BlockRef, + ) -> Option>> { + let mut vec_msgs: Vec> = + Vec::with_capacity(self.cordial_knowledge.len()); + for index in 0..self.cordial_knowledge.len() { + // Don't send own shard to the author of the block and local node if index == block_ref.author.value() || index == self.context.own_index.value() { + vec_msgs.push(vec![]); continue; } let msg = ConnectionKnowledgeMessage::NewShard { block_ref }; - let _ = tx.send(vec![msg]).await; + vec_msgs.push(vec![msg]); } + Some(vec_msgs) } /// Called when older rounds should be pruned globally. - async fn handle_evict_below(&mut self, rounds: Vec) { + fn handle_evict_below( + &mut self, + rounds: Vec, + ) -> Option>> { // Evict locally for (index, btree_map) in &mut self.cordial_knowledge.iter_mut().enumerate() { let split_round = rounds[index]; *btree_map = btree_map.split_off(&split_round); - self.context - .metrics - .node_metrics - .cordial_knowledge_rounds - .with_label_values(&[&index.to_string()]) - .set(btree_map.len() as i64); } - let largest_round = self.cordial_knowledge[self.context.own_index] - .keys() - .max() - .cloned() - .unwrap_or(GENESIS_ROUND); - let useful_shards_from_peer_count = self - .last_useful_shards_from_peer_round - .iter() - .flatten() - .filter(|&&r| r + MAX_ROUND_GAP_FOR_USEFUL_PARTS >= largest_round) - .count(); - self.context - .metrics - .node_metrics - .cordial_knowledge_useful_shards - .set(useful_shards_from_peer_count as i64); - // Notify per-connection tasks about eviction - self.notify_connection_tasks_for_eviction(rounds).await; + // Prepare message for per-connection knowledge about eviction + self.prepare_evict_msgs(rounds) } #[inline] - async fn notify_connection_tasks_for_eviction(&self, rounds: Vec) { - for tx in &self.connections { + fn prepare_evict_msgs( + &self, + rounds: Vec, + ) -> Option>> { + let mut vec_msgs: Vec> = + Vec::with_capacity(self.cordial_knowledge.len()); + for _ in 0..self.cordial_knowledge.len() { let msg = ConnectionKnowledgeMessage::EvictBelow(rounds.clone()); - let _ = tx.send(vec![msg]).await; + vec_msgs.push(vec![msg]); } + Some(vec_msgs) } /// Update cordial knowledge for exactly one new header. @@ -508,7 +592,10 @@ impl CordialKnowledge { /// marked as known by the block author as well. /// At the end, we notify all connections about new /// knowledge changes. - async fn update_cordial_knowledge(&mut self, header: &VerifiedBlockHeader) { + fn update_cordial_knowledge( + &mut self, + header: &VerifiedBlockHeader, + ) -> Option>> { let block_ref = header.reference(); let block_author = block_ref.author.value(); let block_round = block_ref.round; @@ -527,7 +614,7 @@ impl CordialKnowledge { // Already recorded — nothing else to do. if round_map.contains_key(&block_digest) { - return; + return None; } // Insert block into cordial knowledge @@ -594,19 +681,7 @@ impl CordialKnowledge { } } } - - // 5) Send all accumulated knowledge messages - self.send_connection_knowledge_messages(vec_knowledge_msgs) - .await; - } - - /// Send accumulated connection knowledge messages to all connection tasks. - async fn send_connection_knowledge_messages(&self, msgs: Vec>) { - for (index, msg) in msgs.into_iter().enumerate() { - if !msg.is_empty() { - let _ = self.connections[index].send(msg).await; - } - } + Some(vec_knowledge_msgs) } } @@ -626,13 +701,7 @@ pub enum ConnectionKnowledgeMessage { useful_headers_to_peer: BTreeMap, useful_shards_to_peer: BTreeMap, useful_headers_from_peer: BTreeMap, - useful_shards_from_peers: Vec>, - }, - /// Take useful headers and shards for authorities, up to the given round - /// (exclusive). - TakeAdditionalPartForBundle { - round_upper_bound_exclusive: Round, - respond_to: oneshot::Sender, + useful_shards_from_peer: Vec>, }, /// Global eviction (prune below round) EvictBelow(Vec), @@ -642,9 +711,8 @@ pub enum ConnectionKnowledgeMessage { /// Receives updates from the global cordial knowledge pub struct ConnectionKnowledge { context: Arc, + peer: AuthorityIndex, dag_state: Arc>, - /// Index of the peer authority this connection knowledge is for - peer_index: usize, /// Keeps track of which headers are not known by the peer yet. headers_not_known: Vec>>, /// Keeps track of which shards are not known by the peer yet. @@ -661,40 +729,33 @@ pub struct ConnectionKnowledge { /// Last rounds for (potentially) useful headers that could be received from /// this peer last_useful_headers_from_peer_round: Vec>, - /// Receives updates from the global cordial knowledge - receiver: Receiver>, -} - -/// Additional parts (headers, shards, useful_headers_authors, -/// useful_shards_authors) to include in a block bundle for a peer. -#[derive(Debug)] -pub(crate) struct AdditionalPartsForBundle { - pub headers: Vec, - pub shards: Vec, - pub useful_headers_authors_from_peer: BTreeSet, - pub useful_shards_authors_from_peer: BTreeSet, } impl ConnectionKnowledge { pub fn new( context: Arc, + peer: AuthorityIndex, dag_state: Arc>, - peer_index: usize, - receiver: Receiver>, ) -> Self { let num_authorities = context.committee.size(); Self { dag_state, + peer, last_useful_headers_to_peer_round: vec![None; num_authorities], last_useful_shards_to_peer_round: vec![None; num_authorities], last_useful_headers_from_peer_round: vec![None; num_authorities], last_useful_shards_from_peer_round: vec![None; num_authorities], context, - peer_index, headers_not_known: vec![BTreeMap::new(); num_authorities], shards_not_known: vec![BTreeMap::new(); num_authorities], - receiver, + } + } + + /// Processes a vector of ConnectionKnowledge messages + fn process_vec_messages(&mut self, msgs: Vec) { + for msg in msgs { + self.process_one_message(msg); } } /// Take useful block refs (headers or shards) for the given authorities @@ -800,27 +861,10 @@ impl ConnectionKnowledge { } } - /// Async task loop — receives messages and dispatches to processing - /// logic. - pub async fn run(mut self) { - debug!("Connection Knowledge started for peer {}", self.peer_index); - - while let Some(knowledge_msgs) = self.receiver.recv().await { - for knowledge_msg in knowledge_msgs { - self.process_message(knowledge_msg).await; - } - } - - debug!( - "Connection Knowledge loop ended for peer {}", - self.peer_index - ); - } - /// Processes a batch of knowledge updates for this connection. /// The only async message is `TakeAdditionalPartForBundle`, which awaits /// and provides the additional parts for the bundle - async fn process_message(&mut self, message: ConnectionKnowledgeMessage) { + pub fn process_one_message(&mut self, message: ConnectionKnowledgeMessage) { match message { ConnectionKnowledgeMessage::NewHeader { block_ref } => { self.handle_new_header(block_ref); @@ -841,7 +885,7 @@ impl ConnectionKnowledge { useful_headers_to_peer, useful_shards_to_peer, useful_headers_from_peer, - useful_shards_from_peers: useful_shards_from_peer, + useful_shards_from_peer, } => { self.handle_useful_authors( useful_headers_to_peer, @@ -850,16 +894,6 @@ impl ConnectionKnowledge { useful_shards_from_peer, ); } - ConnectionKnowledgeMessage::TakeAdditionalPartForBundle { - round_upper_bound_exclusive, - respond_to, - } => { - self.handle_take_additional_parts_for_bundle( - round_upper_bound_exclusive, - respond_to, - ) - .await; - } } } @@ -879,10 +913,17 @@ impl ConnectionKnowledge { self.handle_useful_shards_from(useful_shards_from_peer); } - /// Update last useful shards from peer rounds by copying the given vector - /// from Cordial Knowledge. + /// Update last useful shards from peer rounds fn handle_useful_shards_from(&mut self, useful_shards_from_peer_round: Vec>) { - self.last_useful_shards_from_peer_round = useful_shards_from_peer_round; + for (index, opt_round) in useful_shards_from_peer_round.into_iter().enumerate() { + if let Some(new_round) = opt_round { + if let Some(old_round) = &mut self.last_useful_shards_from_peer_round[index] { + *old_round = max(*old_round, new_round); + } else { + self.last_useful_shards_from_peer_round[index] = Some(new_round); + } + } + } } /// Update last rounds of useful headers from peer. Iterate over the given @@ -918,16 +959,10 @@ impl ConnectionKnowledge { ); } - /// Handles taking additional parts (headers, shards) for a block bundle - /// to send to the peer. In addition, it returns from which authors - /// the peer can send additional headers and shards to the peer. - /// This is an async function because it reads from the DAG state and - /// sends the response back via oneshot channel. - async fn handle_take_additional_parts_for_bundle( - &mut self, - round_upper_bound_exclusive: Round, - respond_to: oneshot::Sender, - ) { + /// Used by AuthorityService to create a block bundle + /// to send to the peer. + pub fn create_bundle(&mut self, block: VerifiedBlock) -> BlockBundle { + let round_upper_bound_exclusive = block.round(); // 1. Own headers and shards for round up to round_upper_bound_exclusive should // be marked as known let own_index = self.context.own_index; @@ -942,14 +977,15 @@ impl ConnectionKnowledge { .last_useful_headers_to_peer_round .iter() .enumerate() - .filter_map(|(i, &opt_round)| { - opt_round - .filter(|&r| { - r.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) - >= round_upper_bound_exclusive - }) - .map(|_| i) + .filter(|(_authority_index, &opt_round)| { + if let Some(round) = opt_round { + round.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) + >= round_upper_bound_exclusive + } else { + false + } }) + .map(|(authority_index, _opt_round)| authority_index) .collect(); let useful_headers_block_refs_to_peer = self.take_useful_header_block_refs_round( @@ -972,20 +1008,22 @@ impl ConnectionKnowledge { .last_useful_shards_to_peer_round .iter() .enumerate() - .filter_map(|(i, &opt_round)| { - opt_round - .filter(|&r| { - r.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) - >= round_upper_bound_exclusive - }) - .map(|_| i) + .filter(|(_authority_index, &opt_round)| { + if let Some(round) = opt_round { + round.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) + >= round_upper_bound_exclusive + } else { + false + } }) + .map(|(authority_index, _opt_round)| authority_index) .collect(); + let useful_shards_block_refs_to_peer = self.take_useful_shard_block_refs_round( round_upper_bound_exclusive, &useful_shards_authors_to_peer, ); - let useful_shards: Vec = { + let useful_shards_to_peer: Vec = { let dag_state_read = self.dag_state.read(); dag_state_read .get_cached_shards(&useful_shards_block_refs_to_peer) @@ -1002,14 +1040,15 @@ impl ConnectionKnowledge { .last_useful_headers_from_peer_round .iter() .enumerate() - .filter_map(|(index, &opt_round)| { - opt_round - .filter(|&r| { - r.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) - >= round_upper_bound_exclusive - }) - .map(|_| AuthorityIndex::from(index as u8)) + .filter(|(_authority_index, &opt_round)| { + if let Some(round) = opt_round { + round.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) + >= round_upper_bound_exclusive + } else { + false + } }) + .map(|(authority_index, _opt_round)| AuthorityIndex::from(authority_index as u8)) .collect::>(); // 5. Get useful shard authors from peer @@ -1017,25 +1056,46 @@ impl ConnectionKnowledge { .last_useful_shards_from_peer_round .iter() .enumerate() - .filter_map(|(index, &opt_round)| { - opt_round - .filter(|&r| { - r.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) - >= round_upper_bound_exclusive - }) - .map(|_| AuthorityIndex::from(index as u8)) + .filter(|(_authority_index, &opt_round)| { + if let Some(round) = opt_round { + round.saturating_add(MAX_ROUND_GAP_FOR_USEFUL_PARTS) + >= round_upper_bound_exclusive + } else { + false + } }) + .map(|(authority_index, _opt_round)| AuthorityIndex::from(authority_index as u8)) .collect::>(); - // 6. Build a response message and send it back - let message = AdditionalPartsForBundle { - headers: useful_headers_to_peer, - shards: useful_shards, - useful_headers_authors_from_peer, - useful_shards_authors_from_peer, - }; + // Report useful authors + let peer_hostname = self.context.authority_hostname(self.peer); + for author in &useful_headers_authors_from_peer { + let author_hostname = self.context.authority_hostname(*author); + self.context + .metrics + .node_metrics + .cordial_knowledge_useful_headers_authors + .with_label_values(&[peer_hostname, author_hostname]) + .inc(); + } - respond_to.send(message).ok(); + for author in &useful_shards_authors_from_peer { + let author_hostname = self.context.authority_hostname(*author); + self.context + .metrics + .node_metrics + .cordial_knowledge_useful_shards_authors + .with_label_values(&[author_hostname]) + .inc(); + } + + BlockBundle { + verified_block: block, + verified_headers: useful_headers_to_peer, + serialized_shards: useful_shards_to_peer, + useful_headers_authors: useful_headers_authors_from_peer, + useful_shards_authors: useful_shards_authors_from_peer, + } } /// Handles adding a new header to the set of potentially unknown headers @@ -1098,7 +1158,8 @@ mod tests { use super::*; use crate::{ - block_header::{VerifiedBlock, VerifiedOwnShard}, + TestBlockHeader, + block_header::{GENESIS_ROUND, VerifiedBlock, VerifiedOwnShard}, context::Context, dag_state::DagState, storage::mem_store::MemStore, @@ -1173,8 +1234,7 @@ mod tests { } // Report useful info to connection knowledge corresponding to to_whom_index - let connection_knowledge_sender = - cordial_knowledge.connection_knowledge_senders[to_whom_index].clone(); + let connection_knowledge = cordial_knowledge.connection_knowledges[to_whom_index].clone(); // Inject useful info for connection knowledge of peer 1 (B) // A says that C and D are useful for headers and shards when receiving from B // B says that A and C are useful for headers and shards when sending from A @@ -1191,9 +1251,11 @@ mod tests { (AuthorityIndex::new_for_test(1), GENESIS_ROUND), (AuthorityIndex::new_for_test(3), GENESIS_ROUND), ]), - useful_shards_from_peers: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)], + useful_shards_from_peer: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)], }; - let _ = connection_knowledge_sender.send(vec![msg]).await; + { + connection_knowledge.write().process_one_message(msg); + } // get all blocks of D. They will be injected to dag state at final_round let d_blocks = all_blocks @@ -1236,19 +1298,19 @@ mod tests { }; dag_state.write().add_shard(shard_for_core); } - sleep(std::time::Duration::from_millis(1)).await; // give some time for cordial knowledge to update + sleep(std::time::Duration::from_millis(10)).await; // give some time for cordial knowledge to update // By default, for MAX_ROUND_GAP_FOR_USEFUL_PARTS rounds, all unknown // shards/headers are useful - let (tx, rx) = oneshot::channel(); - let msg = ConnectionKnowledgeMessage::TakeAdditionalPartForBundle { - round_upper_bound_exclusive: round + 1, - respond_to: tx, + let block_bundle = { + connection_knowledge + .write() + .create_bundle(all_blocks[round as usize + 1][our_index].clone()) }; - let _ = connection_knowledge_sender.send(vec![msg]).await; - let additional_parts = rx.await.unwrap(); - let AdditionalPartsForBundle { - headers, shards, .. - } = additional_parts; + let BlockBundle { + verified_headers: headers, + serialized_shards: shards, + .. + } = block_bundle; // In rounds 1..final_round, A should not know any of D's blocks, so no headers // or shards should be sent to B. if round < final_round - 1 { @@ -1298,13 +1360,12 @@ mod tests { let (context, key_pairs) = Context::new_for_test(validators); let protocol_keypairs = key_pairs.iter().map(|kp| kp.1.clone()).collect(); let context = Arc::new(context); - let final_round: Round = 6; + let final_round: Round = MAX_ROUND_GAP_FOR_USEFUL_PARTS / 2; let store = Arc::new(MemStore::new()); let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); let cordial_knowledge = CordialKnowledge::start(context.clone(), dag_state.clone()); // Report useful info to connection knowledge corresponding to to_whom_index - let connection_knowledge_sender = - cordial_knowledge.connection_knowledge_senders[to_whom_index].clone(); + let connection_knowledge = cordial_knowledge.connection_knowledges[to_whom_index].clone(); // Inject useful info let msg = ConnectionKnowledgeMessage::UsefulAuthors { useful_headers_to_peer: BTreeMap::from([ @@ -1319,9 +1380,11 @@ mod tests { (AuthorityIndex::new_for_test(1), GENESIS_ROUND), (AuthorityIndex::new_for_test(3), GENESIS_ROUND), ]), - useful_shards_from_peers: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)], + useful_shards_from_peer: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)], }; - let _ = connection_knowledge_sender.send(vec![msg]).await; + { + connection_knowledge.write().process_one_message(msg); + } // Build DAG with blocks from all validators up to final_round and add to // dag_state let mut dag_builder = @@ -1331,20 +1394,21 @@ mod tests { .build() .persist_layers(dag_state.clone()); sleep(std::time::Duration::from_millis(1)).await; - - let (tx, rx) = oneshot::channel(); - let msg = ConnectionKnowledgeMessage::TakeAdditionalPartForBundle { - round_upper_bound_exclusive: final_round + 1, - respond_to: tx, + // create dummy own verified block for next round to create a bundle + let verified_block = VerifiedBlock::new_for_test( + TestBlockHeader::new(final_round + 1, our_index.value() as u8).build(), + ); + let bundle = { + connection_knowledge + .write() + .create_bundle(verified_block.clone()) }; - let _ = connection_knowledge_sender.send(vec![msg]).await; - let additional_parts = rx.await.unwrap(); - let AdditionalPartsForBundle { - headers, - shards: _, - useful_headers_authors_from_peer, - useful_shards_authors_from_peer, - } = additional_parts; + let BlockBundle { + verified_headers: headers, + useful_shards_authors: useful_headers_authors_from_peer, + useful_headers_authors: useful_shards_authors_from_peer, + .. + } = bundle; // Only headers and shards from authorities 2 and 3 should be included assert_eq!(headers.len(), 2); assert!( @@ -1361,14 +1425,19 @@ mod tests { BTreeSet::from([1, 3].map(AuthorityIndex::new_for_test)) ); // Repeat the request, should get no headers this time - let (tx, rx) = oneshot::channel(); - let msg = ConnectionKnowledgeMessage::TakeAdditionalPartForBundle { - round_upper_bound_exclusive: final_round + 1, - respond_to: tx, + // create dummy own verified block for next round to create a bundle + let verified_block = VerifiedBlock::new_for_test( + TestBlockHeader::new(final_round + 1, our_index.value() as u8).build(), + ); + let bundle = { + connection_knowledge + .write() + .create_bundle(verified_block.clone()) }; - let _ = connection_knowledge_sender.send(vec![msg]).await; - let additional_parts = rx.await.unwrap(); - let AdditionalPartsForBundle { headers, .. } = additional_parts; + let BlockBundle { + verified_headers: headers, + .. + } = bundle; assert_eq!(headers.len(), 0); // Add more rounds to DAG @@ -1382,19 +1451,22 @@ mod tests { // Make a request for a last round, should get no headers, no shards and no // useful authorities as the last useful rounds are beyond // MAX_ROUND_GAP_FOR_USEFUL_PARTS from last_round - let (tx, rx) = oneshot::channel(); - let msg = ConnectionKnowledgeMessage::TakeAdditionalPartForBundle { - round_upper_bound_exclusive: last_round + 1, - respond_to: tx, + // create dummy own verified block for next round to create a bundle + let verified_block = VerifiedBlock::new_for_test( + TestBlockHeader::new(last_round + 1, our_index.value() as u8).build(), + ); + let bundle = { + connection_knowledge + .write() + .create_bundle(verified_block.clone()) }; - let _ = connection_knowledge_sender.send(vec![msg]).await; - let additional_parts = rx.await.unwrap(); - let AdditionalPartsForBundle { - headers, - shards, - useful_headers_authors_from_peer, - useful_shards_authors_from_peer, - } = additional_parts; + let BlockBundle { + verified_headers: headers, + serialized_shards: shards, + useful_shards_authors: useful_headers_authors_from_peer, + useful_headers_authors: useful_shards_authors_from_peer, + .. + } = bundle; assert!(headers.is_empty()); assert!(shards.is_empty()); assert!(useful_headers_authors_from_peer.is_empty()); diff --git a/crates/starfish/core/src/metrics.rs b/crates/starfish/core/src/metrics.rs index 257befebade..822f6d57ca8 100644 --- a/crates/starfish/core/src/metrics.rs +++ b/crates/starfish/core/src/metrics.rs @@ -132,14 +132,15 @@ pub(crate) struct NodeMetrics { pub(crate) highest_accepted_round: IntGauge, pub(crate) accepted_block_time_drift_ms: IntCounterVec, pub(crate) accepted_block_headers: IntCounterVec, + pub(crate) cordial_knowledge_useful_headers_authors: IntCounterVec, + pub(crate) cordial_knowledge_useful_shards_authors: IntCounterVec, pub(crate) dag_state_recent_transactions: IntGauge, pub(crate) dag_state_recent_headers: IntGauge, pub(crate) dag_state_recent_shards: IntGauge, pub(crate) dag_state_recent_refs: IntGauge, pub(crate) cordial_knowledge_buffer_size: IntGauge, pub(crate) cordial_knowledge_processed_messages: IntCounterVec, - pub(crate) cordial_knowledge_rounds: IntGaugeVec, - pub(crate) cordial_knowledge_useful_shards: IntGauge, + pub(crate) cordial_knowledge_worker_batch_size: Histogram, pub(crate) dag_state_store_read_count: IntCounterVec, pub(crate) dag_state_store_write_count: IntCounter, pub(crate) fetch_block_headers_scheduler_inflight: IntGauge, @@ -454,10 +455,11 @@ impl NodeMetrics { "Size of the cordial knowledge buffer received", registry, ).unwrap(), - cordial_knowledge_useful_shards: register_int_gauge_with_registry!( - "cordial_knowledge_useful_shards", - "The number of authorities with useful shards", - registry, + cordial_knowledge_worker_batch_size: register_histogram_with_registry!( + "cordial_knowledge_worker_batch_size", + "Number of connection knowledge message batches processed by worker", + exponential_buckets(1.0, 1.4, 20).unwrap(), + registry, ).unwrap(), cordial_knowledge_processed_messages: register_int_counter_vec_with_registry!( "cordial_knowledge_processed_messages", @@ -465,12 +467,6 @@ impl NodeMetrics { &["type"], registry, ).unwrap(), - cordial_knowledge_rounds: register_int_gauge_vec_with_registry!( - "cordial_knowledge_rounds", - "Number of rounds in DAG of Cordial Knowledge stored per authority", - &["authority"], - registry, - ).unwrap(), dag_state_store_read_count: register_int_counter_vec_with_registry!( "dag_state_store_read_count", "Number of times DagState needs to read from store per operation type", @@ -499,6 +495,18 @@ impl NodeMetrics { &["peer", "type"], registry, ).unwrap(), + cordial_knowledge_useful_headers_authors: register_int_counter_vec_with_registry!( + "cordial_knowledge_useful_headers_authors", + "Useful authors for pushing headers to the local node", + &["peer", "author"], + registry, + ).unwrap(), + cordial_knowledge_useful_shards_authors: register_int_counter_vec_with_registry!( + "cordial_knowledge_useful_shards_authors", + "Useful authors for pushing shards to the local node", + &["author"], + registry, + ).unwrap(), synchronizer_requested_blocks_by_peer: register_int_counter_vec_with_registry!( "synchronizer_requested_blocks_by_peer", "Number of requested blocks per peer authority via the synchronizer and also by block authority",