Skip to content
82 changes: 26 additions & 56 deletions crates/starfish/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures::{Stream, StreamExt, ready, stream, task};
use iota_macros::fail_point_async;
use parking_lot::RwLock;
use starfish_config::AuthorityIndex;
use tokio::sync::{Mutex, broadcast, mpsc::Sender, oneshot};
use tokio::sync::{Mutex, broadcast, mpsc::Sender};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, warn};

Expand All @@ -31,16 +31,13 @@ use crate::{
commit::{CommitAPI as _, CommitRange, TrustedCommit},
commit_vote_monitor::CommitVoteMonitor,
context::Context,
cordial_knowledge::{
AdditionalPartsForBundle, ConnectionKnowledgeMessage::TakeAdditionalPartForBundle,
CordialKnowledgeHandle,
},
cordial_knowledge::CordialKnowledgeHandle,
core_thread::CoreThreadDispatcher,
dag_state::DagState,
encoder::ShardEncoder,
error::{ConsensusError, ConsensusResult},
network::{
BlockBundle, BlockBundleStream, NetworkService, SerializedBlock, SerializedBlockBundle,
BlockBundleStream, NetworkService, SerializedBlock, SerializedBlockBundle,
SerializedBlockBundleParts, SerializedHeaderAndTransactions, SerializedTransactions,
},
shard_reconstructor::TransactionMessage,
Expand Down Expand Up @@ -550,15 +547,13 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
}

// 13. Report useful info for cordial and connection knowledge
self.cordial_knowledge
.report_useful_authors(
peer,
&serialized_block_bundle_parts,
&additional_block_headers,
&missing_ancestors,
block_round,
)
.await?;
self.cordial_knowledge.report_useful_authors(
peer,
&serialized_block_bundle_parts,
&additional_block_headers,
&missing_ancestors,
block_round,
)?;

// 14. schedule the fetching of missing ancestors (if any) from this peer
if !missing_ancestors.is_empty() {
Expand Down Expand Up @@ -618,52 +613,27 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
self.subscription_counter.clone(),
);
let context = self.context.clone();
let connection_knowledge_sender = self.cordial_knowledge.connection_knowledge_sender(peer);
let connection_knowledge = self.cordial_knowledge.connection_knowledge(peer);
// Return a stream of blocks that first yields missed blocks as requested, then
// new blocks.
Ok(Box::pin(missed_blocks.chain({
broadcasted_blocks.filter_map(move |block| {
let connection_knowledge_sender = connection_knowledge_sender.clone();
let context = context.clone();
let ts = block.timestamp_ms();
let connection_knowledge = connection_knowledge.clone();
async move {
let (tx, rx) = oneshot::channel();
let ts = block.timestamp_ms();

let msg = TakeAdditionalPartForBundle {
round_upper_bound_exclusive: block.round(),
respond_to: tx,
let block_bundle = {
let mut conn = connection_knowledge.write();
conn.create_bundle(block)
};

// Send message asynchronously to corresponding ConnectionKnowledge task
if let Err(e) = connection_knowledge_sender.send(vec![msg]).await {
warn!("Failed to send TakeAdditionalPartForBundle to Connection Knowledge: {e}");
return None;
}

// Await response from ConnectionKnowledge
let block_bundle = match rx.await {
Ok(AdditionalPartsForBundle {
headers,
shards,
useful_headers_authors_from_peer,
useful_shards_authors_from_peer,
}) => BlockBundle {
verified_block: block,
verified_headers: headers,
serialized_shards: shards,
useful_headers_authors: useful_headers_authors_from_peer,
useful_shards_authors: useful_shards_authors_from_peer,
},
Err(_) => {
warn!("Connection Knowledge oneshot dropped before response");
// Construct bundle (fill in your actual vars)
return None;
}
};


let now = context.clock.timestamp_utc_ms();
context.metrics.node_metrics.delay_in_sending_blocks.observe((now - ts) as f64);
context
.metrics
.node_metrics
.delay_in_sending_blocks
.observe((now - ts) as f64);

match SerializedBlockBundle::try_from(block_bundle) {
Ok(serialized_block_bundle) => Some(serialized_block_bundle),
Expand Down Expand Up @@ -2493,8 +2463,7 @@ mod tests {
}

// Inject useful info
let connection_knowledge_sender =
cordial_knowledge.connection_knowledge_sender(to_whom_authority);
let connection_knowledge = cordial_knowledge.connection_knowledge(to_whom_authority);
let msg = ConnectionKnowledgeMessage::UsefulAuthors {
useful_headers_to_peer: BTreeMap::from([
(AuthorityIndex::new_for_test(2), GENESIS_ROUND),
Expand All @@ -2508,10 +2477,11 @@ mod tests {
(AuthorityIndex::new_for_test(1), GENESIS_ROUND),
(AuthorityIndex::new_for_test(3), GENESIS_ROUND),
]),
useful_shards_from_peers: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)],
useful_shards_from_peer: vec![None, Some(GENESIS_ROUND), None, Some(GENESIS_ROUND)],
};
let _ = connection_knowledge_sender.send(vec![msg]).await;

{
connection_knowledge.write().process_one_message(msg);
}
// WHEN
// Call handle_subscribe_block_bundles_request with last_received = 2
let last_received_round = 2;
Expand Down
Loading
Loading