From f17fa0d3b9da7f5473752c2b6866532dad81be8b Mon Sep 17 00:00:00 2001 From: atacann Date: Thu, 2 Oct 2025 12:33:43 +0200 Subject: [PATCH 01/18] do not give metrics error if fetching stopped tasks fails (cherry picked from commit cf52a5571ee46991f944139a0076e403ed98a6ff) --- core/src/operator.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/operator.rs b/core/src/operator.rs index e808a59e7..3d2e13a4d 100644 --- a/core/src/operator.rs +++ b/core/src/operator.rs @@ -20,7 +20,7 @@ use crate::errors::BridgeError; use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc; use crate::metrics::L1SyncStatusProvider; -use crate::rpc::clementine::EntityStatus; +use crate::rpc::clementine::{EntityStatus, StoppedTasks}; use crate::task::entity_metric_publisher::{ EntityMetricPublisher, ENTITY_METRIC_PUBLISHER_INTERVAL, }; @@ -219,7 +219,16 @@ where } pub async fn get_current_status(&self) -> Result { - let stopped_tasks = self.background_tasks.get_stopped_tasks().await?; + let stopped_tasks = match self.background_tasks.get_stopped_tasks().await { + Ok(stopped_tasks) => stopped_tasks, + Err(e) => { + tracing::error!("Failed to get stopped tasks: {:?}", e); + StoppedTasks { + stopped_tasks: vec![format!("Stopped tasks fetch failed {:?}", e)], + } + } + }; + // Determine if automation is enabled let automation_enabled = cfg!(feature = "automation"); From 16cd48f953dbce5baa421e1360575b2e323665d9 Mon Sep 17 00:00:00 2001 From: atacann Date: Thu, 2 Oct 2025 18:10:49 +0200 Subject: [PATCH 02/18] init better logging (cherry picked from commit ca4847d2d284e91e0dbbb4724e7a8d2076c10c22) --- core/src/aggregator.rs | 2 +- core/src/errors.rs | 54 ++++++++++++++++++++------- core/src/operator.rs | 12 +++--- core/src/rpc/aggregator.rs | 58 +++++++++++++++++++---------- core/src/rpc/operator.rs | 15 ++++---- core/src/rpc/parser/verifier.rs | 17 +++++++-- core/src/rpc/verifier.rs | 65 +++++++++++++++++++++++---------- core/src/utils.rs | 32 +++++++++++++++- core/src/verifier.rs | 9 +++-- 9 files changed, 190 insertions(+), 74 deletions(-) diff --git a/core/src/aggregator.rs b/core/src/aggregator.rs index 57c7fba4e..df15402ef 100644 --- a/core/src/aggregator.rs +++ b/core/src/aggregator.rs @@ -413,7 +413,7 @@ impl Aggregator { debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)), ) .await - .wrap_err(Status::internal("Operator key retrieval failed"))? + .wrap_err(Status::internal(format!("Operator {} key retrieval failed", operator_xonly_pk)))? .into_inner(); // A send error means that all receivers are closed, diff --git a/core/src/errors.rs b/core/src/errors.rs index 8edc9ac7c..dc84aab2c 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -191,6 +191,9 @@ pub trait ErrorExt: Sized { /// returns the first [`tonic::Status`] error. If it can't find one, it will /// return an Status::internal with the Display representation of the error. fn into_status(self) -> tonic::Status; + /// Converts the error into a tonic::Status::internal with the Display representation of the error. + /// Always returns the full error chain as the message. + fn into_full_internal_status(self) -> tonic::Status; } /// Extension traits for results to easily convert them to eyre::Report and @@ -200,6 +203,7 @@ pub trait ResultExt: Sized { fn map_to_eyre(self) -> Result; fn map_to_status(self) -> Result; + fn map_to_full_internal_status(self) -> Result; } impl> ErrorExt for T { @@ -212,6 +216,9 @@ impl> ErrorExt for T { fn into_status(self) -> tonic::Status { self.into().into() } + fn into_full_internal_status(self) -> tonic::Status { + self.into().into_full_internal_status() + } } impl> ResultExt for Result { @@ -224,26 +231,45 @@ impl> ResultExt for Result { fn map_to_status(self) -> Result { self.map_err(ErrorExt::into_status) } + + fn map_to_full_internal_status(self) -> Result { + self.map_err(ErrorExt::into_full_internal_status) + } +} + +impl BridgeError { + fn into_full_internal_status(self) -> tonic::Status { + tonic::Status::internal(match self { + BridgeError::Eyre(report) => report + .chain() + .map(|e| e.to_string()) + .collect::>() + .join(" | "), + _ => self.to_string(), + }) + } } impl From for tonic::Status { fn from(val: BridgeError) -> Self { - let eyre_report = val.into_eyre(); + val.into_full_internal_status() + // TODO: maybe uncomment later, first check how this works out + // let eyre_report = val.into_eyre(); - // eyre::Report can cast any error in the chain to a Status, so we use its downcast method to get the first Status. - eyre_report.downcast::().unwrap_or_else(|report| { - // We don't want this case to happen, all casts to Status should contain a Status that contains a user-facing error message. - tracing::error!( - "Returning internal error on RPC call, full error: {:?}", - report - ); + // // eyre::Report can cast any error in the chain to a Status, so we use its downcast method to get the first Status. + // eyre_report.downcast::().unwrap_or_else(|report| { + // // We don't want this case to happen, all casts to Status should contain a Status that contains a user-facing error message. + // tracing::error!( + // "Returning internal error on RPC call, full error: {:?}", + // report + // ); - let mut status = tonic::Status::internal(report.to_string()); - status.set_source(Into::into( - Into::>::into(report), - )); - status - }) + // let mut status = tonic::Status::internal(report.to_string()); + // status.set_source(Into::into( + // Into::>::into(report), + // )); + // status + // }) } } diff --git a/core/src/operator.rs b/core/src/operator.rs index 3d2e13a4d..80fe169e0 100644 --- a/core/src/operator.rs +++ b/core/src/operator.rs @@ -481,13 +481,14 @@ where pub async fn deposit_sign( &self, mut deposit_data: DepositData, - ) -> Result, BridgeError> { + ) -> Result>, BridgeError> { self.citrea_client .check_nofn_correctness(deposit_data.get_nofn_xonly_pk()?) .await?; let mut tweak_cache = TweakCache::default(); let (sig_tx, sig_rx) = mpsc::channel(constants::DEFAULT_CHANNEL_SIZE); + let monitor_err_sender = sig_tx.clone(); let deposit_blockhash = self .rpc @@ -513,14 +514,15 @@ where Some(&mut tweak_cache), )?; - if sig_tx.send(sig).await.is_err() { - break; - } + sig_tx + .send(Ok(sig)) + .await + .wrap_err("Failed to send signature in operator deposit sign")?; } Ok::<(), BridgeError>(()) }); - monitor_standalone_task(handle, "Operator deposit sign"); + monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender); Ok(sig_rx) } diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index cc1e305de..c06a59ad1 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -109,6 +109,7 @@ async fn nonce_aggregator( + Send + 'static, agg_nonce_sender: Sender, + needed_nofn_sigs: usize, ) -> Result<(AggregatedNonce, AggregatedNonce), BridgeError> { let mut total_sigs = 0; @@ -146,8 +147,13 @@ async fn nonce_aggregator( ); } - if total_sigs == 0 { - tracing::warn!("Sighash stream returned 0 signatures"); + if total_sigs != needed_nofn_sigs { + let err_msg = format!( + "Expected {} nofn signatures, got {} from sighash stream", + needed_nofn_sigs, total_sigs + ); + tracing::error!(err_msg); + return Err(eyre::eyre!(err_msg).into()); } // aggregate nonces for the movetx signature let pub_nonces = try_join_all(nonce_streams.iter_mut().map(|s| async { @@ -199,6 +205,7 @@ async fn nonce_distributor( )>, partial_sig_sender: Sender<(Vec, AggNonceQueueItem)>, ) -> Result<(), BridgeError> { + let mut nonce_count = 0; let mut sig_count = 0; let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) = partial_sig_streams.into_iter().unzip(); @@ -207,11 +214,11 @@ async fn nonce_distributor( let handle_1 = tokio::spawn(async move { while let Some(queue_item) = agg_nonce_receiver.recv().await { - sig_count += 1; + nonce_count += 1; - tracing::trace!( + tracing::warn!( "Received aggregated nonce {} in nonce_distributor", - sig_count + nonce_count ); let agg_nonce_wrapped = clementine::VerifierDepositSignParams { @@ -241,7 +248,7 @@ async fn nonce_distributor( tracing::trace!( "Sent aggregated nonce {} to verifiers in nonce_distributor", - sig_count + nonce_count ); } @@ -276,7 +283,9 @@ async fn nonce_distributor( )) .await?; - tracing::trace!( + sig_count += 1; + + tracing::warn!( "Received partial signature {} from verifiers in nonce_distributor", sig_count ); @@ -290,11 +299,12 @@ async fn nonce_distributor( }) })?; - tracing::trace!( + tracing::warn!( "Sent partial signature {} to signature_aggregator in nonce_distributor", sig_count ); } + tracing::warn!("Finished tasks in nonce_distributor handle 2"); Ok::<(), BridgeError>(()) }); @@ -307,6 +317,8 @@ async fn nonce_distributor( .wrap_err("Task crashed while receiving partial sigs")? .wrap_err("Error while receiving partial sigs")?; + tracing::warn!("Finished tasks in nonce_distributor"); + Ok(()) } @@ -1307,6 +1319,8 @@ impl ClementineAggregator for AggregatorServer { let verifiers_public_keys = deposit_data.get_verifiers(); + let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data); + // Create sighash stream for transaction signing let sighash_stream = Box::pin(create_nofn_sighash_stream( self.db.clone(), @@ -1326,6 +1340,7 @@ impl ClementineAggregator for AggregatorServer { nonce_streams, sighash_stream, agg_nonce_sender, + needed_nofn_sigs, )); // Start the nonce distribution pipe. @@ -1403,34 +1418,39 @@ impl ClementineAggregator for AggregatorServer { .await?; tracing::debug!("Pipeline tasks completed"); - + let verifiers_ids = verifiers.ids(); // send operators sigs to verifiers after all verifiers have signed - timed_request( + let deposit_finalize_futures = timed_request( SEND_OPERATOR_SIGS_TIMEOUT, "Sending operator signatures to verifiers", async { let send_operator_sigs: Vec<_> = deposit_finalize_sender .iter() - .map(|tx| async { + .zip(verifiers_ids.iter()) + .zip(deposit_finalize_futures.into_iter()) + .map(|((tx, id), dep_fin_fut)| async { for one_op_sigs in all_op_sigs.iter() { for sig in one_op_sigs.iter() { let deposit_finalize_param: VerifierDepositFinalizeParams = sig.into(); - tx.send(deposit_finalize_param).await.wrap_err_with(|| { - eyre::eyre!(AggregatorError::OutputStreamEndedEarly { - stream_name: "deposit_finalize_sender".into(), - }) - })?; + let send = tx.send(deposit_finalize_param).await; + match send { + Ok(()) => (), + Err(e) => { + // check exact error by awaiting the future + dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?; + return Err(BridgeError::from(eyre::eyre!(format!("Verifier {} deposit finalize stream sending returned error: {:?}", id.clone(), e)))); + } + } } } - Ok::<(), BridgeError>(()) + Ok::<_, BridgeError>(dep_fin_fut) }) .collect(); - try_join_all(send_operator_sigs).await?; - Ok(()) + try_join_all(send_operator_sigs).await }, ) .await?; diff --git a/core/src/rpc/operator.rs b/core/src/rpc/operator.rs index b42f3d454..a05ab741e 100644 --- a/core/src/rpc/operator.rs +++ b/core/src/rpc/operator.rs @@ -24,6 +24,7 @@ use alloy::primitives::PrimitiveSignature; use bitcoin::hashes::Hash; use bitcoin::{BlockHash, OutPoint}; use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256}; +use eyre::Context; use futures::TryFutureExt; use std::str::FromStr; use tokio::sync::mpsc; @@ -68,6 +69,7 @@ where let operator = self.operator.clone(); let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let out_stream: Self::GetParamsStream = ReceiverStream::new(rx); + let monitor_err_sender = tx.clone(); let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?; @@ -93,7 +95,7 @@ where Ok::<(), Status>(()) }); - monitor_standalone_task(handle, "Operator get_params"); + monitor_standalone_task(handle, "Operator get_params", monitor_err_sender); Ok(Response::new(out_stream)) } @@ -119,12 +121,12 @@ where tokio::spawn(async move { let mut sent_sigs = 0; while let Some(sig) = deposit_signatures_rx.recv().await { + let sig = sig?; let operator_burn_sig = SchnorrSig { schnorr_sig: sig.serialize().to_vec(), }; - if tx - .send(Ok(operator_burn_sig)) + tx.send(Ok(operator_burn_sig)) .inspect_ok(|_| { sent_sigs += 1; tracing::debug!( @@ -134,11 +136,10 @@ where ); }) .await - .is_err() - { - break; - } + .wrap_err("Failed to send signature in operator rpc deposit sign") + .map_to_status()?; } + Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) diff --git a/core/src/rpc/parser/verifier.rs b/core/src/rpc/parser/verifier.rs index a44218329..c3246b798 100644 --- a/core/src/rpc/parser/verifier.rs +++ b/core/src/rpc/parser/verifier.rs @@ -204,7 +204,12 @@ pub async fn parse_next_deposit_finalize_param_schnorr_sig( schnorr::Signature::from_slice(&final_sig) .map_err(invalid_argument("FinalSig", "Invalid signature length"))? } - _ => return Err(Status::internal("Expected FinalSig 1")), + _ => { + return Err(Status::internal(format!( + "Expected SchnorrSig, got {:?}", + sig + ))); + } }; Ok(Some(final_sig)) @@ -225,7 +230,10 @@ pub async fn parse_deposit_finalize_param_move_tx_agg_nonce( Ok(AggregatedNonce::from_byte_array(&arr) .map_err(invalid_argument("AggregatedNonce", "failed to parse"))?) } - _ => Err(Status::internal("Expected FinalSig 2")), + _ => Err(Status::internal(format!( + "Expected MoveTxAggNonce, got {:?}", + sig + ))), } } @@ -244,7 +252,10 @@ pub async fn parse_deposit_finalize_param_emergency_stop_agg_nonce( ) .map_err(invalid_argument("AggregatedNonce", "failed to parse"))?) } - _ => Err(Status::internal("Expected FinalSig 2")), + _ => Err(Status::internal(format!( + "Expected EmergencyStopAggNonce, got {:?}", + sig + ))), } } diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index f093e5f0e..f9d4ae1f6 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -11,6 +11,7 @@ use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestD use crate::builder::transaction::ContractContext; use crate::citrea::CitreaClientT; use crate::constants::RESTART_BACKGROUND_TASKS_TIMEOUT; +use crate::errors::ErrorExt; use crate::rpc::clementine::VerifierDepositFinalizeResponse; use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request}; use crate::verifier::VerifierServer; @@ -23,7 +24,7 @@ use alloy::primitives::PrimitiveSignature; use bitcoin::Witness; use clementine::verifier_deposit_finalize_params::Params; use secp256k1::musig::AggregatedNonce; -use tokio::sync::mpsc::{self, error::SendError}; +use tokio::sync::mpsc::{self}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; @@ -218,6 +219,7 @@ where let (session_id, pub_nonces) = self.verifier.nonce_gen(num_nonces).await?; let (tx, rx) = mpsc::channel(pub_nonces.len() + 1); + let monitor_sender = tx.clone(); let handle = tokio::spawn(async move { let nonce_gen_first_response = clementine::NonceGenFirstResponse { @@ -225,16 +227,20 @@ where num_nonces, }; let session_id: NonceGenResponse = nonce_gen_first_response.into(); - tx.send(Ok(session_id)).await?; + tx.send(Ok(session_id)).await.map_err(|e| { + Status::aborted(format!("Failed to send nonce gen first response: {e}")) + })?; for pub_nonce in &pub_nonces { let pub_nonce: NonceGenResponse = pub_nonce.into(); - tx.send(Ok(pub_nonce)).await?; + tx.send(Ok(pub_nonce)).await.map_err(|e| { + Status::aborted(format!("Failed to send nonce gen response: {e}")) + })?; } - Ok::<(), SendError<_>>(()) + Ok::<(), Status>(()) }); - monitor_standalone_task(handle, "Verifier nonce_gen"); + monitor_standalone_task(handle, "Verifier nonce_gen", monitor_sender); Ok(Response::new(ReceiverStream::new(rx))) } @@ -291,9 +297,10 @@ where } Ok(()) }); - monitor_standalone_task(handle, "Verifier deposit data receiver"); + monitor_standalone_task(handle, "Verifier deposit data receiver", tx.clone()); // Start partial sig job and return partial sig responses. + let tx_for_monitor = tx.clone(); let handle = tokio::spawn(async move { let (deposit_data, session_id) = param_rx .recv() @@ -306,16 +313,31 @@ where let mut nonce_idx = 0; let num_required_sigs = verifier.config.get_num_required_nofn_sigs(&deposit_data); - while let Some(partial_sig) = partial_sig_receiver.recv().await { - tx.send(Ok(PartialSig { - partial_sig: partial_sig.serialize().to_vec(), - })) - .await - .map_err(|e| { - Status::aborted(format!( - "Error sending partial sig, stream ended prematurely: {e}" - )) - })?; + while let Some(partial_sig_result) = partial_sig_receiver.recv().await { + match partial_sig_result { + Ok(partial_sig) => { + tx.send(Ok(PartialSig { + partial_sig: partial_sig.serialize().to_vec(), + })) + .await + .map_err(|e| { + Status::aborted(format!( + "Error sending partial sig, stream ended prematurely: {e}" + )) + })?; + } + Err(e) => { + tx + .send(Err(e.into_full_internal_status())) + .await + .map_err(|send_err| { + Status::aborted(format!( + "Error forwarding partial sig error, stream ended prematurely: {send_err}" + )) + })?; + break; + } + } nonce_idx += 1; tracing::trace!( @@ -331,7 +353,7 @@ where Ok::<(), Status>(()) }); - monitor_standalone_task(handle, "Verifier deposit signature sender"); + monitor_standalone_task(handle, "Verifier deposit signature sender", tx_for_monitor); Ok(Response::new(out_stream)) } @@ -388,12 +410,17 @@ where let verifier = self.verifier.clone(); let sig_handle = tokio::spawn(async move { let num_required_nofn_sigs = verifier.config.get_num_required_nofn_sigs(&deposit_data); + tracing::debug!( + "Needed nofn sigs for deposit {:?}: {}", + deposit_data, + num_required_nofn_sigs + ); let mut nonce_idx = 0; while let Some(sig) = parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream) .await? { - tracing::debug!( + tracing::trace!( "Received full nofn sig {} in deposit_finalize()", nonce_idx + 1 ); @@ -450,7 +477,7 @@ where parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream) .await? { - tracing::debug!( + tracing::warn!( "Received full operator sig {} in deposit_finalize()", op_sig_count + 1 ); diff --git a/core/src/utils.rs b/core/src/utils.rs index 51cebdcf2..7e2c00f4d 100644 --- a/core/src/utils.rs +++ b/core/src/utils.rs @@ -382,9 +382,14 @@ pub fn get_vergen_response() -> VergenResponse { /// Monitors a [`tokio::task::JoinHandle`] in the background and logs it's end /// result. -pub fn monitor_standalone_task( +pub fn monitor_standalone_task< + T: Send + 'static, + E: Debug + Send + 'static + From, + C: Send + 'static, +>( task_handle: tokio::task::JoinHandle>, task_name: &str, + monitor_err_sender: tokio::sync::mpsc::Sender>, ) { let task_name = task_name.to_string(); @@ -395,15 +400,38 @@ pub fn monitor_standalone_task( tracing::debug!("Task {} completed successfully", task_name); } Ok(Err(e)) => { - tracing::error!("Task {} throw an error: {:?}", task_name, e); + tracing::error!("Task {} threw an error: {:?}", task_name, e); + let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| { + tracing::error!("Failed to send error to monitoring channel: {:?}", e) + }); } Err(e) => { if e.is_cancelled() { // Task was cancelled, which is expected during cleanup tracing::debug!("Task {} has cancelled", task_name); + let _ = monitor_err_sender + .send(Err(Into::::into(eyre::eyre!( + "Task was cancelled due to: {:?}", + e + )) + .into())) + .await + .inspect_err(|e| { + tracing::error!("Failed to send error to monitoring channel: {:?}", e) + }); return; } tracing::error!("Task {} has panicked: {:?}", task_name, e); + let _ = monitor_err_sender + .send(Err(Into::::into(eyre::eyre!( + "Task has panicked due to: {:?}", + e + )) + .into())) + .await + .inspect_err(|e| { + tracing::error!("Failed to send error to monitoring channel: {:?}", e) + }); } } }); diff --git a/core/src/verifier.rs b/core/src/verifier.rs index c816021cb..83e4e0694 100644 --- a/core/src/verifier.rs +++ b/core/src/verifier.rs @@ -790,7 +790,7 @@ where mut deposit_data: DepositData, session_id: u128, mut agg_nonce_rx: mpsc::Receiver, - ) -> Result, BridgeError> { + ) -> Result>, BridgeError> { self.citrea_client .check_nofn_correctness(deposit_data.get_nofn_xonly_pk()?) .await?; @@ -811,6 +811,7 @@ where let (partial_sig_tx, partial_sig_rx) = mpsc::channel(constants::DEFAULT_CHANNEL_SIZE); let verifier_index = deposit_data.get_verifier_index(&self.signer.public_key)?; let verifiers_public_keys = deposit_data.get_verifiers(); + let monitor_sender = partial_sig_tx.clone(); let deposit_blockhash = self .rpc @@ -865,7 +866,7 @@ where )?; partial_sig_tx - .send(partial_sig) + .send(Ok(partial_sig)) .await .wrap_err("Failed to send partial signature")?; @@ -883,7 +884,7 @@ where if session.nonces.len() != 2 { return Err(eyre::eyre!( - "Expected 2 nonces remaining in session, one for move tx and one for emergency stop, got {}", + "Expected 2 nonces remaining in session, one for move tx and one for emergency stop, got {}, indicating aggregated nonce stream ended prematurely", session.nonces.len() ).into()); } @@ -893,7 +894,7 @@ where Ok::<(), BridgeError>(()) }); - monitor_standalone_task(handle, "Verifier deposit_sign"); + monitor_standalone_task(handle, "Verifier deposit_sign", monitor_sender); Ok(partial_sig_rx) } From 9f2db6d136814742f0cbc261c850b02600227aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Efe=20Ak=C3=A7a?= Date: Fri, 3 Oct 2025 17:48:39 +0300 Subject: [PATCH 03/18] do not return error from optional message fetcher (cherry picked from commit 28707368929e2d9cf95e657c0c451a665e733621) --- core/src/rpc/aggregator.rs | 32 ++++++++++++++++++++++++++------ core/src/rpc/parser/mod.rs | 6 +----- core/src/rpc/verifier.rs | 6 ++++-- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index c06a59ad1..c5cba2add 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -113,7 +113,7 @@ async fn nonce_aggregator( ) -> Result<(AggregatedNonce, AggregatedNonce), BridgeError> { let mut total_sigs = 0; - tracing::info!("Starting nonce aggregation"); + tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)"); // We assume the sighash stream returns the correct number of items. while let Some(msg) = sighash_stream.next().await { @@ -146,6 +146,7 @@ async fn nonce_aggregator( siginfo.signature_id ); } + tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream"); if total_sigs != needed_nofn_sigs { let err_msg = format!( @@ -216,7 +217,7 @@ async fn nonce_distributor( while let Some(queue_item) = agg_nonce_receiver.recv().await { nonce_count += 1; - tracing::warn!( + tracing::trace!( "Received aggregated nonce {} in nonce_distributor", nonce_count ); @@ -252,6 +253,10 @@ async fn nonce_distributor( ); } + tracing::trace!( + tmp_debug = 1, + "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue" + ); Ok::<(), BridgeError>(()) }); @@ -285,7 +290,7 @@ async fn nonce_distributor( sig_count += 1; - tracing::warn!( + tracing::trace!( "Received partial signature {} from verifiers in nonce_distributor", sig_count ); @@ -299,23 +304,29 @@ async fn nonce_distributor( }) })?; - tracing::warn!( + tracing::trace!( "Sent partial signature {} to signature_aggregator in nonce_distributor", sig_count ); } - tracing::warn!("Finished tasks in nonce_distributor handle 2"); + tracing::trace!( + tmp_debug = 1, + "Sent {sig_count} partial sig bundles to partial_sigs stream" + ); + + tracing::trace!("Finished tasks in nonce_distributor handle 2"); Ok::<(), BridgeError>(()) }); let (result_1, result_2) = tokio::join!(handle_1, handle_2); + result_1 .wrap_err("Task crashed while distributing aggnonces")? .wrap_err("Error while distributing aggnonces")?; result_2 .wrap_err("Task crashed while receiving partial sigs")? - .wrap_err("Error while receiving partial sigs")?; + .wrap_err("Error while receiving partial sigs").inspect_err(|e| tracing::error!("Failed to finish partial aggregation {e:?}"))?; tracing::warn!("Finished tasks in nonce_distributor"); @@ -360,6 +371,11 @@ async fn signature_aggregator( ); } + tracing::trace!( + tmp_debug = 1, + "Sent {sig_count} aggregated signatures to final_sig stream" + ); + Ok(()) } @@ -397,6 +413,10 @@ async fn signature_distributor( sig_count ); } + tracing::trace!( + tmp_debug = 1, + "Sent {sig_count} signatures to verifiers in deposit_finalize" + ); let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce .await diff --git a/core/src/rpc/parser/mod.rs b/core/src/rpc/parser/mod.rs index 526c543c5..00ad17d23 100644 --- a/core/src/rpc/parser/mod.rs +++ b/core/src/rpc/parser/mod.rs @@ -105,11 +105,7 @@ macro_rules! fetch_next_message_from_stream { #[macro_export] macro_rules! fetch_next_optional_message_from_stream { ($stream:expr, $field:ident) => { - $stream - .message() - .await? - .ok_or($crate::rpc::error::input_ended_prematurely())? - .$field + $stream.message().await?.and_then(|msg| msg.$field) }; } diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index f9d4ae1f6..131a2dd63 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -11,7 +11,7 @@ use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestD use crate::builder::transaction::ContractContext; use crate::citrea::CitreaClientT; use crate::constants::RESTART_BACKGROUND_TASKS_TIMEOUT; -use crate::errors::ErrorExt; +use crate::errors::{ErrorExt, ResultExt as _}; use crate::rpc::clementine::VerifierDepositFinalizeResponse; use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request}; use crate::verifier::VerifierServer; @@ -23,6 +23,7 @@ use crate::{ use alloy::primitives::PrimitiveSignature; use bitcoin::Witness; use clementine::verifier_deposit_finalize_params::Params; +use eyre::Context as _; use secp256k1::musig::AggregatedNonce; use tokio::sync::mpsc::{self}; use tokio_stream::wrappers::ReceiverStream; @@ -418,7 +419,8 @@ where let mut nonce_idx = 0; while let Some(sig) = parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream) - .await? + .await + .wrap_err_with(|| format!("While waiting for {nonce_idx} + 1th sig out of {num_required_nofn_sigs} ")).map_to_status()? { tracing::trace!( "Received full nofn sig {} in deposit_finalize()", From 7b3f67f40afad7ebfb318b3d7e634ba2232aab3e Mon Sep 17 00:00:00 2001 From: Ekrem BAL Date: Fri, 3 Oct 2025 11:03:03 -0400 Subject: [PATCH 04/18] fmt (cherry picked from commit a67ef17691322a3b7a72477f783cac8a09cf581f) --- core/src/aggregator.rs | 5 ++++- core/src/rpc/aggregator.rs | 4 ++-- core/src/rpc/verifier.rs | 12 ++++++++---- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/aggregator.rs b/core/src/aggregator.rs index df15402ef..f0b2b7ab7 100644 --- a/core/src/aggregator.rs +++ b/core/src/aggregator.rs @@ -413,7 +413,10 @@ impl Aggregator { debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)), ) .await - .wrap_err(Status::internal(format!("Operator {} key retrieval failed", operator_xonly_pk)))? + .wrap_err(Status::internal(format!( + "Operator {} key retrieval failed", + operator_xonly_pk + )))? .into_inner(); // A send error means that all receivers are closed, diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index c5cba2add..663730390 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -320,13 +320,13 @@ async fn nonce_distributor( let (result_1, result_2) = tokio::join!(handle_1, handle_2); - result_1 .wrap_err("Task crashed while distributing aggnonces")? .wrap_err("Error while distributing aggnonces")?; result_2 .wrap_err("Task crashed while receiving partial sigs")? - .wrap_err("Error while receiving partial sigs").inspect_err(|e| tracing::error!("Failed to finish partial aggregation {e:?}"))?; + .wrap_err("Error while receiving partial sigs") + .inspect_err(|e| tracing::error!("Failed to finish partial aggregation {e:?}"))?; tracing::warn!("Finished tasks in nonce_distributor"); diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index 131a2dd63..0f66d1e4b 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -417,10 +417,14 @@ where num_required_nofn_sigs ); let mut nonce_idx = 0; - while let Some(sig) = - parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream) - .await - .wrap_err_with(|| format!("While waiting for {nonce_idx} + 1th sig out of {num_required_nofn_sigs} ")).map_to_status()? + while let Some(sig) = parser::verifier::parse_next_deposit_finalize_param_schnorr_sig( + &mut in_stream, + ) + .await + .wrap_err_with(|| { + format!("While waiting for {nonce_idx} + 1th sig out of {num_required_nofn_sigs} ") + }) + .map_to_status()? { tracing::trace!( "Received full nofn sig {} in deposit_finalize()", From 220b77f05058dd771e63a40bc0e59fd93cfb779b Mon Sep 17 00:00:00 2001 From: atacann Date: Fri, 3 Oct 2025 17:24:42 +0200 Subject: [PATCH 05/18] change log level (cherry picked from commit 9fd73bd97526ffe36fb54d27d09d00f9bd0448e6) --- core/src/rpc/verifier.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index 0f66d1e4b..996847789 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -483,7 +483,7 @@ where parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream) .await? { - tracing::warn!( + tracing::trace!( "Received full operator sig {} in deposit_finalize()", op_sig_count + 1 ); @@ -491,7 +491,7 @@ where .send(operator_sig) .await .map_err(error::output_stream_ended_prematurely)?; - tracing::debug!( + tracing::trace!( "Sent full operator sig {} to src/verifier in deposit_finalize()", op_sig_count + 1 ); From 893629df4814df43073c9daf239c57e76fba4c18 Mon Sep 17 00:00:00 2001 From: atacann Date: Fri, 3 Oct 2025 17:44:44 +0200 Subject: [PATCH 06/18] check number sent messages in aggregator rpc streams, log errors (cherry picked from commit c18b2b49075a6f3e02f33534fce5c06741e288ed) --- core/src/rpc/aggregator.rs | 102 +++++++++++++++++++++++++++++++++---- 1 file changed, 92 insertions(+), 10 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 663730390..d39453e18 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -205,6 +205,7 @@ async fn nonce_distributor( Sender, )>, partial_sig_sender: Sender<(Vec, AggNonceQueueItem)>, + needed_nofn_sigs: usize, ) -> Result<(), BridgeError> { let mut nonce_count = 0; let mut sig_count = 0; @@ -232,11 +233,17 @@ async fn nonce_distributor( try_join_all(partial_sig_tx.iter_mut().enumerate().map(|(idx, tx)| { let agg_nonce_wrapped = agg_nonce_wrapped.clone(); async move { - tx.send(agg_nonce_wrapped).await.wrap_err_with(|| { - AggregatorError::OutputStreamEndedEarly { - stream_name: format!("Partial sig stream {idx}"), - } - }) + tx.send(agg_nonce_wrapped) + .await + .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly { + stream_name: format!("Partial sig {idx}"), + }) + .inspect_err(|e| { + tracing::error!( + "Failed to send aggregated nonce to verifier {idx}: {:?}", + e + ); + }) } })) .await @@ -251,6 +258,14 @@ async fn nonce_distributor( "Sent aggregated nonce {} to verifiers in nonce_distributor", nonce_count ); + if nonce_count == needed_nofn_sigs { + break; + } + } + if nonce_count != needed_nofn_sigs { + let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",); + tracing::error!(err_msg); + return Err(eyre::eyre!(err_msg).into()); } tracing::trace!( @@ -268,10 +283,21 @@ async fn nonce_distributor( .message() .await .wrap_err_with(|| AggregatorError::RequestFailed { - request_name: format!("Partial sig stream {idx}"), + request_name: format!("Partial sig {sig_count} from verifier {idx} error"), + }) + .inspect_err(|e| { + tracing::error!( + "Failed to receive partial signature {sig_count} from verifier {idx}, an error was sent: {:?}", + e + ); })? .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize { - stream_name: format!("Partial sig stream {idx}"), + stream_name: format!("Partial sig {sig_count} from verifier {idx} closed"), + }).inspect_err(|e| { + tracing::error!( + "Failed to receive partial signature {sig_count} from verifier {idx}, the stream was closed: {:?}", + e + ); })?; Ok::<_, BridgeError>( @@ -309,6 +335,14 @@ async fn nonce_distributor( sig_count ); } + + if sig_count != needed_nofn_sigs { + let err_msg = format!( + "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}", + ); + tracing::error!(err_msg); + return Err(eyre::eyre!(err_msg).into()); + } tracing::trace!( tmp_debug = 1, "Sent {sig_count} partial sig bundles to partial_sigs stream" @@ -338,6 +372,7 @@ async fn signature_aggregator( mut partial_sig_receiver: Receiver<(Vec, AggNonceQueueItem)>, verifiers_public_keys: Vec, final_sig_sender: Sender, + needed_nofn_sigs: usize, ) -> Result<(), BridgeError> { let mut sig_count = 0; while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await { @@ -369,6 +404,18 @@ async fn signature_aggregator( "Sent aggregated signature {} to signature_distributor in signature_aggregator", sig_count ); + + if sig_count == needed_nofn_sigs { + break; + } + } + + if sig_count != needed_nofn_sigs { + let err_msg = format!( + "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}", + ); + tracing::error!(err_msg); + return Err(eyre::eyre!(err_msg).into()); } tracing::trace!( @@ -385,6 +432,7 @@ async fn signature_distributor( mut final_sig_receiver: Receiver, deposit_finalize_sender: Vec>, agg_nonce: impl Future>, + needed_nofn_sigs: usize, ) -> Result<(), BridgeError> { use verifier_deposit_finalize_params::Params; let mut sig_count = 0; @@ -412,7 +460,20 @@ async fn signature_distributor( "Sent signature {} to verifiers in signature_distributor", sig_count ); + + if sig_count == needed_nofn_sigs { + break; + } } + + if sig_count != needed_nofn_sigs { + let err_msg = format!( + "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}", + ); + tracing::error!(err_msg); + return Err(eyre::eyre!(err_msg).into()); + } + tracing::trace!( tmp_debug = 1, "Sent {sig_count} signatures to verifiers in deposit_finalize" @@ -1360,7 +1421,7 @@ impl ClementineAggregator for AggregatorServer { nonce_streams, sighash_stream, agg_nonce_sender, - needed_nofn_sigs, + needed_nofn_sigs, // nonce_aggregator handles movetx and emergency stop signatures differently so no +2 needed here )); // Start the nonce distribution pipe. @@ -1368,6 +1429,7 @@ impl ClementineAggregator for AggregatorServer { agg_nonce_receiver, partial_sig_streams, partial_sig_sender, + needed_nofn_sigs + 2, // +2 for the movetx and emergency stop signatures )); // Start the signature aggregation pipe. @@ -1375,6 +1437,7 @@ impl ClementineAggregator for AggregatorServer { partial_sig_receiver, verifiers_public_keys, final_sig_sender, + needed_nofn_sigs + 2, // +2 for the movetx and emergency stop signatures )); tracing::debug!("Getting signatures from operators"); @@ -1413,6 +1476,7 @@ impl ClementineAggregator for AggregatorServer { final_sig_receiver, deposit_finalize_sender.clone(), nonce_agg_handle.clone(), + needed_nofn_sigs, )); tracing::debug!( @@ -1430,13 +1494,31 @@ impl ClementineAggregator for AggregatorServer { tracing::debug!("Waiting for pipeline tasks to complete"); // Wait for all pipeline tasks to complete - timed_request( + // join_all should be enough here as if one fails other tasks should fail too as they are connected through streams + // one should not hang if any other task fails, the others should finish + // not sure if this is needed instead of try_join_all, I am not sure if try_join_all will definitely return the error of the first task that fails + let task_outputs = timed_request( PIPELINE_COMPLETION_TIMEOUT, "MuSig2 signing pipeline", - try_join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).map_err(|join_err| -> BridgeError { eyre::Report::from(join_err).wrap_err("Failed to join on pipelined tasks").into()}), + async move { + Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await) + }, ) .await?; + let mut task_errors = Vec::new(); + + for (task_name, task_output) in ["Nonce distribution", "Signature aggregation", "Signature distribution"].into_iter().zip(task_outputs.into_iter()) { + if let Err(e) = task_output { + tracing::error!("{} failed with error: {:?}", task_name, e); + task_errors.push(e); + } + } + + if !task_errors.is_empty() { + return Err(eyre::eyre!(format!("Pipeline tasks failed with errors: {:?}", task_errors)).into()); + } + tracing::debug!("Pipeline tasks completed"); let verifiers_ids = verifiers.ids(); From bf85836bd40b9112d2f143dd631c47cf0cd43a83 Mon Sep 17 00:00:00 2001 From: atacann Date: Fri, 3 Oct 2025 17:48:06 +0200 Subject: [PATCH 07/18] change comment (cherry picked from commit 5218bf67a778650fc2accacbd6c029ca8a18b0a9) --- core/src/rpc/aggregator.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index d39453e18..5c526fcc6 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -1496,7 +1496,8 @@ impl ClementineAggregator for AggregatorServer { // Wait for all pipeline tasks to complete // join_all should be enough here as if one fails other tasks should fail too as they are connected through streams // one should not hang if any other task fails, the others should finish - // not sure if this is needed instead of try_join_all, I am not sure if try_join_all will definitely return the error of the first task that fails + // this is needed because try_join_all can potentially not return the error of the first task that failed, just the one it polled first + // that returned an error let task_outputs = timed_request( PIPELINE_COMPLETION_TIMEOUT, "MuSig2 signing pipeline", From de2f6015a45bffa3eab67209f21d38bfcde9ba01 Mon Sep 17 00:00:00 2001 From: atacann Date: Mon, 6 Oct 2025 11:06:15 +0200 Subject: [PATCH 08/18] catch more errors (cherry picked from commit af8cb5ff79703d7f803d0c746e0ad641a9de0345) --- core/src/rpc/aggregator.rs | 59 +++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 5c526fcc6..97f4075c3 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -283,7 +283,7 @@ async fn nonce_distributor( .message() .await .wrap_err_with(|| AggregatorError::RequestFailed { - request_name: format!("Partial sig {sig_count} from verifier {idx} error"), + request_name: format!("Partial sig {sig_count} from verifier {idx}"), }) .inspect_err(|e| { tracing::error!( @@ -354,15 +354,39 @@ async fn nonce_distributor( let (result_1, result_2) = tokio::join!(handle_1, handle_2); - result_1 - .wrap_err("Task crashed while distributing aggnonces")? - .wrap_err("Error while distributing aggnonces")?; - result_2 - .wrap_err("Task crashed while receiving partial sigs")? - .wrap_err("Error while receiving partial sigs") - .inspect_err(|e| tracing::error!("Failed to finish partial aggregation {e:?}"))?; + let mut task_errors = Vec::new(); - tracing::warn!("Finished tasks in nonce_distributor"); + match result_1 { + Ok(inner_result) => { + if let Err(e) = inner_result { + task_errors.push(format!("Task crashed while distributing aggnonces: {e:#?}")); + } + } + Err(e) => { + task_errors.push(format!("Failed to distribute aggnonces: {e:#?}")); + } + } + + match result_2 { + Ok(inner_result) => { + if let Err(e) = inner_result { + task_errors.push(format!("Task crashed while receiving partial sigs: {e:#?}")); + } + } + Err(e) => { + task_errors.push(format!("Failed to receive partial sigs: {e:#?}")); + } + } + + if !task_errors.is_empty() { + return Err(eyre::eyre!(format!( + "nonce_distributor failed with errors: {:#?}", + task_errors + )) + .into()); + } + + tracing::debug!("Finished tasks in nonce_distributor"); Ok(()) } @@ -1510,14 +1534,23 @@ impl ClementineAggregator for AggregatorServer { let mut task_errors = Vec::new(); for (task_name, task_output) in ["Nonce distribution", "Signature aggregation", "Signature distribution"].into_iter().zip(task_outputs.into_iter()) { - if let Err(e) = task_output { - tracing::error!("{} failed with error: {:?}", task_name, e); - task_errors.push(e); + match task_output { + Ok(inner_result) => { + if let Err(e) = inner_result { + let err_msg = format!("{} failed with error: {:#?}", task_name, e); + task_errors.push(err_msg); + } + }, + Err(e) => { + let err_msg = format!("{} failed with error: {:#?}", task_name, e); + task_errors.push(err_msg); + } } } if !task_errors.is_empty() { - return Err(eyre::eyre!(format!("Pipeline tasks failed with errors: {:?}", task_errors)).into()); + tracing::error!("Pipeline tasks failed with errors: {:#?}", task_errors); + return Err(eyre::eyre!(format!("Pipeline tasks failed with errors: {:#?}", task_errors)).into()); } tracing::debug!("Pipeline tasks completed"); From ced99a49210df13b5ecb080bbf811a9426edb0ff Mon Sep 17 00:00:00 2001 From: atacann Date: Mon, 6 Oct 2025 11:49:49 +0200 Subject: [PATCH 09/18] fix wrong num needed sigs issue (cherry picked from commit 301dcf62893c664d9f99f0da9ad9465633304796) --- core/src/rpc/aggregator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 97f4075c3..92d459e6c 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -1453,7 +1453,7 @@ impl ClementineAggregator for AggregatorServer { agg_nonce_receiver, partial_sig_streams, partial_sig_sender, - needed_nofn_sigs + 2, // +2 for the movetx and emergency stop signatures + needed_nofn_sigs, )); // Start the signature aggregation pipe. @@ -1461,7 +1461,7 @@ impl ClementineAggregator for AggregatorServer { partial_sig_receiver, verifiers_public_keys, final_sig_sender, - needed_nofn_sigs + 2, // +2 for the movetx and emergency stop signatures + needed_nofn_sigs, )); tracing::debug!("Getting signatures from operators"); From d558bd02680e1cf3f00df3bf220e06cbc63fbe77 Mon Sep 17 00:00:00 2001 From: atacann Date: Mon, 6 Oct 2025 12:00:08 +0200 Subject: [PATCH 10/18] better errors on nonce aggregation (cherry picked from commit bfb3d8e6166d53e291d6fbd6a6a9d43e02f14811) --- core/src/rpc/aggregator.rs | 65 +++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 92d459e6c..06ea3073f 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -86,7 +86,8 @@ async fn get_next_pub_nonces( .map(|(i, s)| async move { s.next() .await - .transpose()? // Return the inner error if it exists + .transpose() + .wrap_err(format!("Failed to get nonce from verifier {i}"))? // Return the inner error if it exists .ok_or_else(|| -> eyre::Report { AggregatorError::InputStreamEndedEarlyUnknownSize { // Return an early end error if the stream is empty @@ -157,18 +158,24 @@ async fn nonce_aggregator( return Err(eyre::eyre!(err_msg).into()); } // aggregate nonces for the movetx signature - let pub_nonces = try_join_all(nonce_streams.iter_mut().map(|s| async { - s.next() - .await - .transpose()? // Return the inner error if it exists - .ok_or_else(|| -> eyre::Report { - AggregatorError::InputStreamEndedEarlyUnknownSize { - // Return an early end error if the stream is empty - stream_name: "Nonce stream".to_string(), - } - .into() - }) - })) + let pub_nonces = try_join_all( + nonce_streams + .iter_mut() + .enumerate() + .map(|(i, s)| async move { + s.next() + .await + .transpose() + .wrap_err(format!("Failed to get movetx nonce from verifier {i}",))? // Return the inner error if it exists + .ok_or_else(|| -> eyre::Report { + AggregatorError::InputStreamEndedEarlyUnknownSize { + // Return an early end error if the stream is empty + stream_name: format!("Movetx nonce stream for verifier {i}"), + } + .into() + }) + }), + ) .await .wrap_err("Failed to aggregate nonces for the move tx")?; @@ -176,18 +183,26 @@ async fn nonce_aggregator( let move_tx_agg_nonce = aggregate_nonces(pub_nonces.iter().collect::>().as_slice())?; - let pub_nonces = try_join_all(nonce_streams.iter_mut().map(|s| async { - s.next() - .await - .transpose()? // Return the inner error if it exists - .ok_or_else(|| -> eyre::Report { - AggregatorError::InputStreamEndedEarlyUnknownSize { - // Return an early end error if the stream is empty - stream_name: "Nonce stream".to_string(), - } - .into() - }) - })) + let pub_nonces = try_join_all( + nonce_streams + .iter_mut() + .enumerate() + .map(|(i, s)| async move { + s.next() + .await + .transpose() + .wrap_err(format!( + "Failed to get emergency stop nonce from verifier {i}" + ))? // Return the inner error if it exists + .ok_or_else(|| -> eyre::Report { + AggregatorError::InputStreamEndedEarlyUnknownSize { + // Return an early end error if the stream is empty + stream_name: format!("Emergency stop nonce stream for verifier {i}"), + } + .into() + }) + }), + ) .await .wrap_err("Failed to aggregate nonces for the emergency stop tx")?; From dfc1c909c471de4fcd55499cf1ea15c780690465 Mon Sep 17 00:00:00 2001 From: atacann Date: Tue, 23 Sep 2025 14:37:01 +0200 Subject: [PATCH 11/18] add partial signature verification (cherry picked from commit eb5843f6c92c09fd6a7207c56387fd3befb4b1bf) --- core/src/aggregator.rs | 36 +------ core/src/musig2.rs | 212 +++++++++++++++++++++++-------------- core/src/rpc/aggregator.rs | 177 ++++++++++++++++++------------- core/src/test/musig2.rs | 117 +++++++++++--------- 4 files changed, 301 insertions(+), 241 deletions(-) diff --git a/core/src/aggregator.rs b/core/src/aggregator.rs index f0b2b7ab7..07a8b2f4b 100644 --- a/core/src/aggregator.rs +++ b/core/src/aggregator.rs @@ -18,11 +18,9 @@ use crate::task::TaskExt; use crate::tx_sender::TxSenderClient; use crate::utils::{timed_request, timed_try_join_all}; use crate::{ - builder::{self}, config::BridgeConfig, database::Database, errors::BridgeError, - musig2::aggregate_partial_signatures, rpc::{ self, clementine::{ @@ -31,12 +29,10 @@ use crate::{ }, }, }; -use bitcoin::hashes::Hash; -use bitcoin::secp256k1::{schnorr, Message, PublicKey}; +use bitcoin::secp256k1::PublicKey; use bitcoin::XOnlyPublicKey; use eyre::Context; use futures::future::join_all; -use secp256k1::musig::{AggregatedNonce, PartialSignature}; use std::future::Future; use tokio::sync::RwLock; use tonic::{Request, Status}; @@ -526,36 +522,6 @@ impl Aggregator { Ok(()) } - #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - async fn _aggregate_move_partial_sigs( - &self, - deposit_data: &mut DepositData, - agg_nonce: &AggregatedNonce, - partial_sigs: Vec, - ) -> Result { - let tx = builder::transaction::create_move_to_vault_txhandler( - deposit_data, - self.config.protocol_paramset(), - )?; - - let message = Message::from_digest( - tx.calculate_script_spend_sighash_indexed(0, 0, bitcoin::TapSighashType::Default)? - .to_byte_array(), - ); - - let verifiers_public_keys = deposit_data.get_verifiers(); - - let final_sig = aggregate_partial_signatures( - verifiers_public_keys, - None, - *agg_nonce, - &partial_sigs, - message, - )?; - - Ok(final_sig) - } - /// Returns a list of verifier clients that are participating in the deposit. pub async fn get_participating_verifiers( &self, diff --git a/core/src/musig2.rs b/core/src/musig2.rs index 694b9cb5f..df16e9d96 100644 --- a/core/src/musig2.rs +++ b/core/src/musig2.rs @@ -170,15 +170,34 @@ pub fn aggregate_partial_signatures( pks: Vec, tweak: Option, agg_nonce: AggregatedNonce, - partial_sigs: &[PartialSignature], + partial_sigs: &[(PartialSignature, PublicNonce)], message: Message, ) -> Result { - let musig_key_agg_cache = create_key_agg_cache(pks, tweak)?; + let musig_key_agg_cache = create_key_agg_cache(pks.clone(), tweak)?; let secp_message = to_secp_msg(&message); let session = Session::new(SECP256K1, &musig_key_agg_cache, agg_nonce, secp_message); - let partial_sigs: Vec<&PartialSignature> = partial_sigs.iter().collect(); + let partial_sigs_and_nonces: Vec<&(PartialSignature, PublicNonce)> = + partial_sigs.iter().collect(); + let partial_sigs: Vec<&PartialSignature> = + partial_sigs_and_nonces.iter().map(|(sig, _)| sig).collect(); + for ((partial_sig, pub_nonce), pub_key) in + partial_sigs_and_nonces.into_iter().zip(pks.into_iter()) + { + if !session.partial_verify( + SECP256K1, + &musig_key_agg_cache, + *partial_sig, + *pub_nonce, + to_secp_pk(pub_key), + ) { + return Err(BridgeError::from(eyre::eyre!( + "MuSig2 Error: partial signature verification failed for pub key: {}", + pub_key + ))); + } + } let final_sig = session.partial_sig_agg(&partial_sigs); SECP256K1 @@ -262,7 +281,7 @@ mod tests { secp256k1::{schnorr, Message, PublicKey}, Amount, OutPoint, TapNodeHash, TapSighashType, TxOut, Txid, XOnlyPublicKey, }; - use secp256k1::{musig::PartialSignature, rand::Rng}; + use secp256k1::rand::Rng; use std::sync::Arc; use std::vec; @@ -306,15 +325,18 @@ mod tests { .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - super::partial_sign( - public_keys.clone(), - None, - nonce_pair.0, - aggregated_nonce, - kp, - message, + ( + super::partial_sign( + public_keys.clone(), + None, + nonce_pair.0, + aggregated_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) .collect::>(); @@ -348,27 +370,36 @@ mod tests { let agg_nonce = super::aggregate_nonces(&[&pub_nonce_0, &pub_nonce_1, &pub_nonce_2]).unwrap(); - let partial_sig_0 = - super::partial_sign(pks.clone(), None, sec_nonce_0, agg_nonce, kp_0, message).unwrap(); - let partial_sig_1 = - super::partial_sign(pks.clone(), None, sec_nonce_1, agg_nonce, kp_1, message).unwrap(); + let partial_sig_0 = ( + super::partial_sign(pks.clone(), None, sec_nonce_0, agg_nonce, kp_0, message).unwrap(), + pub_nonce_0, + ); + let partial_sig_1 = ( + super::partial_sign(pks.clone(), None, sec_nonce_1, agg_nonce, kp_1, message).unwrap(), + pub_nonce_1, + ); // Oops, a verifier accidentally added some tweak! - let partial_sig_2 = super::partial_sign( - pks.clone(), - Some(Musig2Mode::KeySpendWithScript( - TapNodeHash::from_slice(&[1u8; 32]).unwrap(), - )), - sec_nonce_2, - agg_nonce, - kp_2, - message, - ) - .unwrap(); + let partial_sig_2 = ( + super::partial_sign( + pks.clone(), + Some(Musig2Mode::KeySpendWithScript( + TapNodeHash::from_slice(&[1u8; 32]).unwrap(), + )), + sec_nonce_2, + agg_nonce, + kp_2, + message, + ) + .unwrap(), + pub_nonce_2, + ); let partial_sigs = vec![partial_sig_0, partial_sig_1, partial_sig_2]; let final_signature: Result = super::aggregate_partial_signatures(pks, None, agg_nonce, &partial_sigs, message); + println!("final_signature: {:?}", final_signature); + assert!(final_signature.is_err()); } @@ -403,17 +434,20 @@ mod tests { .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - super::partial_sign( - public_keys.clone(), - Some(Musig2Mode::KeySpendWithScript( - TapNodeHash::from_slice(&tweak).unwrap(), - )), - nonce_pair.0, - aggregated_nonce, - kp, - message, + ( + super::partial_sign( + public_keys.clone(), + Some(Musig2Mode::KeySpendWithScript( + TapNodeHash::from_slice(&tweak).unwrap(), + )), + nonce_pair.0, + aggregated_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) .collect::>(); @@ -451,31 +485,39 @@ mod tests { let agg_nonce = super::aggregate_nonces(&[&pub_nonce_0, &pub_nonce_1, &pub_nonce_2]).unwrap(); - let partial_sig_0 = super::partial_sign( - pks.clone(), - Some(Musig2Mode::KeySpendWithScript( - TapNodeHash::from_slice(&tweak).unwrap(), - )), - sec_nonce_0, - agg_nonce, - kp_0, - message, - ) - .unwrap(); - let partial_sig_1 = super::partial_sign( - pks.clone(), - Some(Musig2Mode::KeySpendWithScript( - TapNodeHash::from_slice(&tweak).unwrap(), - )), - sec_nonce_1, - agg_nonce, - kp_1, - message, - ) - .unwrap(); + let partial_sig_0 = ( + super::partial_sign( + pks.clone(), + Some(Musig2Mode::KeySpendWithScript( + TapNodeHash::from_slice(&tweak).unwrap(), + )), + sec_nonce_0, + agg_nonce, + kp_0, + message, + ) + .unwrap(), + pub_nonce_0, + ); + let partial_sig_1 = ( + super::partial_sign( + pks.clone(), + Some(Musig2Mode::KeySpendWithScript( + TapNodeHash::from_slice(&tweak).unwrap(), + )), + sec_nonce_1, + agg_nonce, + kp_1, + message, + ) + .unwrap(), + pub_nonce_1, + ); // Oops, a verifier accidentally forgot to put the tweak! - let partial_sig_2 = - super::partial_sign(pks.clone(), None, sec_nonce_2, agg_nonce, kp_2, message).unwrap(); + let partial_sig_2 = ( + super::partial_sign(pks.clone(), None, sec_nonce_2, agg_nonce, kp_2, message).unwrap(), + pub_nonce_2, + ); let partial_sigs = vec![partial_sig_0, partial_sig_1, partial_sig_2]; let final_signature = super::aggregate_partial_signatures( @@ -566,21 +608,24 @@ mod tests { ); let merkle_root = sending_address_spend_info.merkle_root().unwrap(); - let partial_sigs: Vec = key_pairs + let partial_sigs = key_pairs .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - super::partial_sign( - public_keys.clone(), - Some(Musig2Mode::KeySpendWithScript(merkle_root)), - nonce_pair.0, - agg_nonce, - kp, - message, + ( + super::partial_sign( + public_keys.clone(), + Some(Musig2Mode::KeySpendWithScript(merkle_root)), + nonce_pair.0, + agg_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); let final_signature = super::aggregate_partial_signatures( public_keys.clone(), @@ -673,21 +718,24 @@ mod tests { .to_byte_array(), ); - let partial_sigs: Vec = key_pairs + let partial_sigs = key_pairs .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - super::partial_sign( - public_keys.clone(), - None, - nonce_pair.0, - agg_nonce, - kp, - message, + ( + super::partial_sign( + public_keys.clone(), + None, + nonce_pair.0, + agg_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); let final_signature = super::aggregate_partial_signatures( public_keys, @@ -778,7 +826,7 @@ mod tests { public_keys.clone(), Some(key_spend_with_script_tweak), agg_nonce, - &[partial_sig1, partial_sig2], + &[(partial_sig1, pub_nonce1), (partial_sig2, pub_nonce2)], message, ) .unwrap(); diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 06ea3073f..6f732c7e1 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -109,9 +109,15 @@ async fn nonce_aggregator( + Unpin + Send + 'static, - agg_nonce_sender: Sender, + agg_nonce_sender: Sender<(AggNonceQueueItem, Vec)>, needed_nofn_sigs: usize, -) -> Result<(AggregatedNonce, AggregatedNonce), BridgeError> { +) -> Result< + ( + (AggregatedNonce, Vec), + (AggregatedNonce, Vec), + ), + BridgeError, +> { let mut total_sigs = 0; tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)"); @@ -136,7 +142,7 @@ async fn nonce_aggregator( let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::>().as_slice())?; agg_nonce_sender - .send(AggNonceQueueItem { agg_nonce, sighash }) + .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces)) .await .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly { stream_name: "nonce_aggregator".to_string(), @@ -158,68 +164,70 @@ async fn nonce_aggregator( return Err(eyre::eyre!(err_msg).into()); } // aggregate nonces for the movetx signature - let pub_nonces = try_join_all( - nonce_streams - .iter_mut() - .enumerate() - .map(|(i, s)| async move { - s.next() - .await - .transpose() - .wrap_err(format!("Failed to get movetx nonce from verifier {i}",))? // Return the inner error if it exists - .ok_or_else(|| -> eyre::Report { - AggregatorError::InputStreamEndedEarlyUnknownSize { - // Return an early end error if the stream is empty - stream_name: format!("Movetx nonce stream for verifier {i}"), - } - .into() - }) - }), - ) + let movetx_pub_nonces = try_join_all(nonce_streams.iter_mut().enumerate().map( + |(i, s)| async move { + s.next() + .await + .transpose() + .wrap_err(format!("Failed to get movetx nonce from verifier {i}",))? // Return the inner error if it exists + .ok_or_else(|| -> eyre::Report { + AggregatorError::InputStreamEndedEarlyUnknownSize { + // Return an early end error if the stream is empty + stream_name: format!("Movetx nonce stream for verifier {i}"), + } + .into() + }) + }, + )) .await .wrap_err("Failed to aggregate nonces for the move tx")?; tracing::trace!("Received nonces for movetx in nonce_aggregator"); - let move_tx_agg_nonce = aggregate_nonces(pub_nonces.iter().collect::>().as_slice())?; + let move_tx_agg_nonce = + aggregate_nonces(movetx_pub_nonces.iter().collect::>().as_slice())?; - let pub_nonces = try_join_all( - nonce_streams - .iter_mut() - .enumerate() - .map(|(i, s)| async move { - s.next() - .await - .transpose() - .wrap_err(format!( - "Failed to get emergency stop nonce from verifier {i}" - ))? // Return the inner error if it exists - .ok_or_else(|| -> eyre::Report { - AggregatorError::InputStreamEndedEarlyUnknownSize { - // Return an early end error if the stream is empty - stream_name: format!("Emergency stop nonce stream for verifier {i}"), - } - .into() - }) - }), - ) + let emergency_stop_pub_nonces = try_join_all(nonce_streams.iter_mut().enumerate().map( + |(i, s)| async move { + s.next() + .await + .transpose() + .wrap_err(format!( + "Failed to get emergency stop nonce from verifier {i}" + ))? // Return the inner error if it exists + .ok_or_else(|| -> eyre::Report { + AggregatorError::InputStreamEndedEarlyUnknownSize { + // Return an early end error if the stream is empty + stream_name: format!("Emergency stop nonce stream for verifier {i}"), + } + .into() + }) + }, + )) .await .wrap_err("Failed to aggregate nonces for the emergency stop tx")?; - let emergency_stop_agg_nonce = - aggregate_nonces(pub_nonces.iter().collect::>().as_slice())?; - - Ok((move_tx_agg_nonce, emergency_stop_agg_nonce)) + let emergency_stop_agg_nonce = aggregate_nonces( + emergency_stop_pub_nonces + .iter() + .collect::>() + .as_slice(), + )?; + + Ok(( + (move_tx_agg_nonce, movetx_pub_nonces), + (emergency_stop_agg_nonce, emergency_stop_pub_nonces), + )) } /// Reroutes aggregated nonces to the signature aggregator. async fn nonce_distributor( - mut agg_nonce_receiver: Receiver, + mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec)>, partial_sig_streams: Vec<( Streaming, Sender, )>, - partial_sig_sender: Sender<(Vec, AggNonceQueueItem)>, + partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>, needed_nofn_sigs: usize, ) -> Result<(), BridgeError> { let mut nonce_count = 0; @@ -230,7 +238,7 @@ async fn nonce_distributor( let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE); let handle_1 = tokio::spawn(async move { - while let Some(queue_item) = agg_nonce_receiver.recv().await { + while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await { nonce_count += 1; tracing::trace!( @@ -265,7 +273,7 @@ async fn nonce_distributor( .wrap_err("Failed to send aggregated nonces to verifiers")?; queue_tx - .send(queue_item) + .send((queue_item, pub_nonces)) .await .wrap_err("Other end of channel closed")?; @@ -291,7 +299,8 @@ async fn nonce_distributor( }); let handle_2 = tokio::spawn(async move { - while let Some(queue_item) = queue_rx.recv().await { + while let Some((queue_item, pub_nonces)) = queue_rx.recv().await { + let pub_nonces_ref = pub_nonces.as_slice(); let partial_sigs = try_join_all(partial_sig_rx.iter_mut().enumerate().map( |(idx, stream)| async move { let partial_sig = stream @@ -314,17 +323,16 @@ async fn nonce_distributor( e ); })?; - - Ok::<_, BridgeError>( - PartialSignature::from_byte_array( - &partial_sig - .partial_sig - .as_slice() - .try_into() - .wrap_err("PartialSignature must be 32 bytes")?, - ) - .wrap_err("Failed to parse partial signature")?, + let partial_sig = PartialSignature::from_byte_array( + &partial_sig + .partial_sig + .as_slice() + .try_into() + .wrap_err("PartialSignature must be 32 bytes")?, ) + .wrap_err("Failed to parse partial signature")?; + + Ok::<_, BridgeError>((partial_sig, pub_nonces_ref[idx])) }, )) .await?; @@ -408,7 +416,7 @@ async fn nonce_distributor( /// Collects partial signatures from given stream and aggregates them. async fn signature_aggregator( - mut partial_sig_receiver: Receiver<(Vec, AggNonceQueueItem)>, + mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>, verifiers_public_keys: Vec, final_sig_sender: Sender, needed_nofn_sigs: usize, @@ -470,7 +478,15 @@ async fn signature_aggregator( async fn signature_distributor( mut final_sig_receiver: Receiver, deposit_finalize_sender: Vec>, - agg_nonce: impl Future>, + agg_nonce: impl Future< + Output = Result< + ( + (AggregatedNonce, Vec), + (AggregatedNonce, Vec), + ), + Status, + >, + >, needed_nofn_sigs: usize, ) -> Result<(), BridgeError> { use verifier_deposit_finalize_params::Params; @@ -528,7 +544,7 @@ async fn signature_distributor( for tx in &deposit_finalize_sender { tx.send(VerifierDepositFinalizeParams { params: Some(Params::MoveTxAggNonce( - movetx_agg_nonce.serialize().to_vec(), + movetx_agg_nonce.0.serialize().to_vec(), )), }) .await @@ -542,7 +558,7 @@ async fn signature_distributor( for tx in &deposit_finalize_sender { tx.send(VerifierDepositFinalizeParams { params: Some(Params::EmergencyStopAggNonce( - emergency_stop_agg_nonce.serialize().to_vec(), + emergency_stop_agg_nonce.0.serialize().to_vec(), )), }) .await @@ -772,7 +788,7 @@ impl Aggregator { async fn create_movetx( &self, partial_sigs: Vec>, - movetx_agg_nonce: AggregatedNonce, + movetx_agg_nonce: (AggregatedNonce, Vec), deposit_params: DepositParams, ) -> Result, Status> { let mut deposit_data: DepositData = deposit_params.try_into()?; @@ -788,13 +804,18 @@ impl Aggregator { bitcoin::TapSighashType::Default, )?; + let musig_sigs_and_nonces = musig_partial_sigs + .into_iter() + .zip(movetx_agg_nonce.1) + .collect::>(); + // aggregate partial signatures let verifiers_public_keys = deposit_data.get_verifiers(); let final_sig = crate::musig2::aggregate_partial_signatures( verifiers_public_keys, None, - movetx_agg_nonce, - &musig_partial_sigs, + movetx_agg_nonce.0, + &musig_sigs_and_nonces, Message::from_digest(sighash.to_byte_array()), )?; @@ -807,7 +828,7 @@ impl Aggregator { async fn verify_and_save_emergency_stop_sigs( &self, emergency_stop_sigs: Vec>, - emergency_stop_agg_nonce: AggregatedNonce, + emergency_stop_agg_nonce: (AggregatedNonce, Vec), deposit_params: DepositParams, ) -> Result<(), BridgeError> { let mut deposit_data: DepositData = deposit_params @@ -834,11 +855,16 @@ impl Aggregator { let verifiers_public_keys = deposit_data.get_verifiers(); + let musig_sigs_and_nonces = musig_partial_sigs + .into_iter() + .zip(emergency_stop_agg_nonce.1) + .collect::>(); + let final_sig = crate::musig2::aggregate_partial_signatures( verifiers_public_keys, None, - emergency_stop_agg_nonce, - &musig_partial_sigs, + emergency_stop_agg_nonce.0, + &musig_sigs_and_nonces, Message::from_digest(sighash.to_byte_array()), ) .wrap_err("Failed to aggregate emergency stop signatures")?; @@ -1104,12 +1130,17 @@ impl ClementineAggregator for AggregatorServer { .collect::, _>>() .map_err(|e| Status::internal(format!("Failed to parse partial sig: {:?}", e)))?; + let musig_sigs_and_nonces = musig_partial_sigs + .into_iter() + .zip(pub_nonces.into_iter()) + .collect::>(); + let final_sig = bitcoin::taproot::Signature { signature: crate::musig2::aggregate_partial_signatures( deposit_data.get_verifiers(), None, agg_nonce, - &musig_partial_sigs, + &musig_sigs_and_nonces, Message::from_digest(sighash.to_byte_array()), )?, sighash_type: bitcoin::TapSighashType::Default, @@ -1504,7 +1535,7 @@ impl ClementineAggregator for AggregatorServer { let nonce_agg_handle = nonce_agg_handle .map_err(|_| Status::internal("panic when aggregating nonces")) .map( - |res| -> Result<(AggregatedNonce, AggregatedNonce), Status> { + |res| -> Result<((AggregatedNonce, Vec), (AggregatedNonce, Vec)), Status> { res.and_then(|r| r.map_err(Into::into)) }, ) diff --git a/core/src/test/musig2.rs b/core/src/test/musig2.rs index ec71a9031..4db4acf58 100644 --- a/core/src/test/musig2.rs +++ b/core/src/test/musig2.rs @@ -20,7 +20,7 @@ use bitcoin::secp256k1::{Message, PublicKey}; use bitcoin::{hashes::Hash, script, Amount, TapSighashType}; use bitcoin::{taproot, Sequence, TxOut, XOnlyPublicKey}; use bitcoincore_rpc::RpcApi; -use secp256k1::musig::{AggregatedNonce, PartialSignature}; +use secp256k1::musig::AggregatedNonce; use std::sync::Arc; #[cfg(test)] @@ -117,21 +117,24 @@ async fn key_spend() { let merkle_root = from_address_spend_info.merkle_root(); assert!(merkle_root.is_none()); - let partial_sigs: Vec = verifiers_secret_public_keys + let partial_sigs = verifiers_secret_public_keys .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - partial_sign( - verifier_public_keys.clone(), - Some(Musig2Mode::OnlyKeySpend), - nonce_pair.0, - agg_nonce, - kp, - message, + ( + partial_sign( + verifier_public_keys.clone(), + Some(Musig2Mode::OnlyKeySpend), + nonce_pair.0, + agg_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); let final_signature = aggregate_partial_signatures( verifier_public_keys.clone(), @@ -218,21 +221,24 @@ async fn key_spend_with_script() { ); let merkle_root = from_address_spend_info.merkle_root().unwrap(); - let partial_sigs: Vec = verifiers_secret_public_keys + let partial_sigs = verifiers_secret_public_keys .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - partial_sign( - verifier_public_keys.clone(), - Some(Musig2Mode::KeySpendWithScript(merkle_root)), - nonce_pair.0, - agg_nonce, - kp, - message, + ( + partial_sign( + verifier_public_keys.clone(), + Some(Musig2Mode::KeySpendWithScript(merkle_root)), + nonce_pair.0, + agg_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); let final_signature = aggregate_partial_signatures( verifier_public_keys.clone(), @@ -325,21 +331,24 @@ async fn script_spend() { .to_byte_array(), ); - let partial_sigs: Vec = verifiers_secret_public_keys + let partial_sigs = verifiers_secret_public_keys .into_iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - partial_sign( - verifier_public_keys.clone(), - None, - nonce_pair.0, - agg_nonce, - kp, - message, + ( + partial_sign( + verifier_public_keys.clone(), + None, + nonce_pair.0, + agg_nonce, + kp, + message, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); let final_signature = aggregate_partial_signatures( verifier_public_keys, None, @@ -495,21 +504,24 @@ async fn key_and_script_spend() { // Musig2 Partial Signatures // Script Spend let final_signature_1 = { - let partial_sigs: Vec = verifiers_secret_public_keys + let partial_sigs = verifiers_secret_public_keys .iter() .zip(nonce_pairs) .map(|(kp, nonce_pair)| { - partial_sign( - verifier_public_keys.clone(), - None, - nonce_pair.0, - agg_nonce, - *kp, - sighash_1, + ( + partial_sign( + verifier_public_keys.clone(), + None, + nonce_pair.0, + agg_nonce, + *kp, + sighash_1, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); // Musig2 Aggregate aggregate_partial_signatures( @@ -524,21 +536,24 @@ async fn key_and_script_spend() { // Key spend let final_signature_2 = { - let partial_sigs: Vec = verifiers_secret_public_keys + let partial_sigs = verifiers_secret_public_keys .iter() .zip(nonce_pairs_2) .map(|(kp, nonce_pair)| { - partial_sign( - verifier_public_keys.clone(), - Some(Musig2Mode::KeySpendWithScript(merkle_root)), - nonce_pair.0, - agg_nonce_2, - *kp, - sighash_2, + ( + partial_sign( + verifier_public_keys.clone(), + Some(Musig2Mode::KeySpendWithScript(merkle_root)), + nonce_pair.0, + agg_nonce_2, + *kp, + sighash_2, + ) + .unwrap(), + nonce_pair.1, ) - .unwrap() }) - .collect(); + .collect::>(); aggregate_partial_signatures( verifier_public_keys, From 38dacb62ffb2fe898731ff9d5edf3ffb1204d53a Mon Sep 17 00:00:00 2001 From: atacann Date: Tue, 23 Sep 2025 15:44:08 +0200 Subject: [PATCH 12/18] verifty partial sigs only when env var is true (cherry picked from commit 1a4edb9a70480f036ddd30b46a0838364b43d947) --- core/src/musig2.rs | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/core/src/musig2.rs b/core/src/musig2.rs index df16e9d96..3c328fbd2 100644 --- a/core/src/musig2.rs +++ b/core/src/musig2.rs @@ -182,20 +182,34 @@ pub fn aggregate_partial_signatures( partial_sigs.iter().collect(); let partial_sigs: Vec<&PartialSignature> = partial_sigs_and_nonces.iter().map(|(sig, _)| sig).collect(); - for ((partial_sig, pub_nonce), pub_key) in - partial_sigs_and_nonces.into_iter().zip(pks.into_iter()) - { - if !session.partial_verify( - SECP256K1, - &musig_key_agg_cache, - *partial_sig, - *pub_nonce, - to_secp_pk(pub_key), - ) { - return Err(BridgeError::from(eyre::eyre!( - "MuSig2 Error: partial signature verification failed for pub key: {}", - pub_key - ))); + // enable partial signature verification with an environment variable to see which verifier is giving bad partial signatures + // this is an env var so that we can turn it off for better performance + let enable_partial_sig_verification = std::env::var("PARTIAL_SIG_VERIFICATION") + .ok() + .map(|value| { + matches!( + value.to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "y" | "on" + ) + }) + .unwrap_or(false); + + if enable_partial_sig_verification { + for ((partial_sig, pub_nonce), pub_key) in + partial_sigs_and_nonces.into_iter().zip(pks.into_iter()) + { + if !session.partial_verify( + SECP256K1, + &musig_key_agg_cache, + *partial_sig, + *pub_nonce, + to_secp_pk(pub_key), + ) { + return Err(BridgeError::from(eyre::eyre!( + "MuSig2 Error: partial signature verification failed for pub key: {}", + pub_key + ))); + } } } let final_sig = session.partial_sig_agg(&partial_sigs); From 872fab748e5c0ad2ad7002e957a3f2e9db26b20d Mon Sep 17 00:00:00 2001 From: atacann Date: Tue, 23 Sep 2025 16:50:12 +0200 Subject: [PATCH 13/18] change var naming (cherry picked from commit 0f14130899df8df271ec42afe96be11e82df40fb) --- core/src/rpc/aggregator.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 6f732c7e1..e8c9c851d 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -788,7 +788,7 @@ impl Aggregator { async fn create_movetx( &self, partial_sigs: Vec>, - movetx_agg_nonce: (AggregatedNonce, Vec), + movetx_agg_and_pub_nonces: (AggregatedNonce, Vec), deposit_params: DepositParams, ) -> Result, Status> { let mut deposit_data: DepositData = deposit_params.try_into()?; @@ -806,7 +806,7 @@ impl Aggregator { let musig_sigs_and_nonces = musig_partial_sigs .into_iter() - .zip(movetx_agg_nonce.1) + .zip(movetx_agg_and_pub_nonces.1) .collect::>(); // aggregate partial signatures @@ -814,7 +814,7 @@ impl Aggregator { let final_sig = crate::musig2::aggregate_partial_signatures( verifiers_public_keys, None, - movetx_agg_nonce.0, + movetx_agg_and_pub_nonces.0, &musig_sigs_and_nonces, Message::from_digest(sighash.to_byte_array()), )?; @@ -828,7 +828,7 @@ impl Aggregator { async fn verify_and_save_emergency_stop_sigs( &self, emergency_stop_sigs: Vec>, - emergency_stop_agg_nonce: (AggregatedNonce, Vec), + emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec), deposit_params: DepositParams, ) -> Result<(), BridgeError> { let mut deposit_data: DepositData = deposit_params @@ -857,13 +857,13 @@ impl Aggregator { let musig_sigs_and_nonces = musig_partial_sigs .into_iter() - .zip(emergency_stop_agg_nonce.1) + .zip(emergency_stop_agg_and_pub_nonces.1) .collect::>(); let final_sig = crate::musig2::aggregate_partial_signatures( verifiers_public_keys, None, - emergency_stop_agg_nonce.0, + emergency_stop_agg_and_pub_nonces.0, &musig_sigs_and_nonces, Message::from_digest(sighash.to_byte_array()), ) @@ -2072,11 +2072,14 @@ mod tests { }; // Generate and broadcast the move-to-vault transaction + let start_time = std::time::Instant::now(); let raw_move_tx = aggregator .new_deposit(clementine::Deposit::from(deposit_info)) .await .unwrap() .into_inner(); + let end_time = std::time::Instant::now(); + println!("New deposit time: {:?}", end_time - start_time); let movetx_txid = aggregator .send_move_to_vault_tx(SendMoveTxRequest { From aa2a46213273482fcef7909c7df9c9b26d12ee16 Mon Sep 17 00:00:00 2001 From: atacann Date: Wed, 24 Sep 2025 14:51:02 +0200 Subject: [PATCH 14/18] update docs (cherry picked from commit a77117da77ec081b0de458d93668dcd0b99933cb) --- core/src/musig2.rs | 2 +- core/src/rpc/aggregator.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/musig2.rs b/core/src/musig2.rs index 3c328fbd2..999fdfb3f 100644 --- a/core/src/musig2.rs +++ b/core/src/musig2.rs @@ -165,7 +165,7 @@ pub fn aggregate_nonces(pub_nonces: &[&PublicNonce]) -> Result, tweak: Option, diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index e8c9c851d..b059fea57 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -100,7 +100,7 @@ async fn get_next_pub_nonces( .await?) } -/// For each expected sighash, we collect a batch of public nonces from all verifiers. We aggregate and send to the agg_nonce_sender. Then repeat for the next sighash. +/// For each expected sighash, we collect a batch of public nonces from all verifiers. We aggregate and send aggregatod nonce and all public nonces (needed for partial signature verification) to the agg_nonce_sender. Then repeat for the next sighash. async fn nonce_aggregator( mut nonce_streams: Vec< impl Stream> + Unpin + Send + 'static, @@ -220,7 +220,7 @@ async fn nonce_aggregator( )) } -/// Reroutes aggregated nonces to the signature aggregator. +/// Reroutes aggregated nonces and public nonces for each aggregatedd nonce to the signature aggregator. async fn nonce_distributor( mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec)>, partial_sig_streams: Vec<( @@ -414,7 +414,7 @@ async fn nonce_distributor( Ok(()) } -/// Collects partial signatures from given stream and aggregates them. +/// Collects partial signatures and the corresponding public nonce from given stream and aggregates them, while aggregating each partial signature will also be verified if PARTIAL_SIG_VERIFICATION is set to true. async fn signature_aggregator( mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>, verifiers_public_keys: Vec, From 99a668f173a9077eb3d7a8af770d9b90097393d2 Mon Sep 17 00:00:00 2001 From: atacann Date: Thu, 25 Sep 2025 09:20:22 +0200 Subject: [PATCH 15/18] add logging (cherry picked from commit f360533bcedccb7eeecdbcf24ec67d406d3db7da) --- core/src/musig2.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/musig2.rs b/core/src/musig2.rs index 999fdfb3f..64ad9aee4 100644 --- a/core/src/musig2.rs +++ b/core/src/musig2.rs @@ -205,6 +205,10 @@ pub fn aggregate_partial_signatures( *pub_nonce, to_secp_pk(pub_key), ) { + tracing::error!( + "MuSig2 Error: partial signature verification failed for pub key: {}", + pub_key + ); return Err(BridgeError::from(eyre::eyre!( "MuSig2 Error: partial signature verification failed for pub key: {}", pub_key From 199ae71cebb47502816d0133e4e66ab59db8b203 Mon Sep 17 00:00:00 2001 From: atacann Date: Mon, 6 Oct 2025 12:34:10 +0200 Subject: [PATCH 16/18] show all partial verify errors, not only the first (cherry picked from commit 5984323ac12a016a9ffe1aa54cd6b063fbe2d48e) --- core/src/musig2.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/core/src/musig2.rs b/core/src/musig2.rs index 64ad9aee4..493c69d6f 100644 --- a/core/src/musig2.rs +++ b/core/src/musig2.rs @@ -195,8 +195,11 @@ pub fn aggregate_partial_signatures( .unwrap_or(false); if enable_partial_sig_verification { - for ((partial_sig, pub_nonce), pub_key) in - partial_sigs_and_nonces.into_iter().zip(pks.into_iter()) + let mut partial_sig_verification_errors = Vec::new(); + for (idx, ((partial_sig, pub_nonce), pub_key)) in partial_sigs_and_nonces + .into_iter() + .zip(pks.into_iter()) + .enumerate() { if !session.partial_verify( SECP256K1, @@ -205,16 +208,18 @@ pub fn aggregate_partial_signatures( *pub_nonce, to_secp_pk(pub_key), ) { - tracing::error!( - "MuSig2 Error: partial signature verification failed for pub key: {}", - pub_key - ); - return Err(BridgeError::from(eyre::eyre!( - "MuSig2 Error: partial signature verification failed for pub key: {}", - pub_key - ))); + let error_msg = format!("(index: {idx}, pub key: {pub_key})"); + partial_sig_verification_errors.push(error_msg); } } + if !partial_sig_verification_errors.is_empty() { + let error_msg = format!( + "MuSig2 Error: partial signature verification failed for verifiers: {}", + partial_sig_verification_errors.join(", ") + ); + tracing::error!(error_msg); + return Err(BridgeError::from(eyre::eyre!(error_msg))); + } } let final_sig = session.partial_sig_agg(&partial_sigs); From 4bda5fe51522a954633c77da30e9d304f7eedbc8 Mon Sep 17 00:00:00 2001 From: atacann Date: Tue, 7 Oct 2025 12:28:13 +0200 Subject: [PATCH 17/18] fix tracing errors (cherry picked from commit fcdc72bd38a3bea66605808354e8b85e8b41b3b1) --- core/src/musig2.rs | 2 +- core/src/rpc/aggregator.rs | 10 +++++----- core/src/rpc/verifier.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/musig2.rs b/core/src/musig2.rs index 493c69d6f..60e38ceea 100644 --- a/core/src/musig2.rs +++ b/core/src/musig2.rs @@ -217,7 +217,7 @@ pub fn aggregate_partial_signatures( "MuSig2 Error: partial signature verification failed for verifiers: {}", partial_sig_verification_errors.join(", ") ); - tracing::error!(error_msg); + tracing::error!("{error_msg}"); return Err(BridgeError::from(eyre::eyre!(error_msg))); } } diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index b059fea57..141815139 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -160,7 +160,7 @@ async fn nonce_aggregator( "Expected {} nofn signatures, got {} from sighash stream", needed_nofn_sigs, total_sigs ); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(eyre::eyre!(err_msg).into()); } // aggregate nonces for the movetx signature @@ -287,7 +287,7 @@ async fn nonce_distributor( } if nonce_count != needed_nofn_sigs { let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(eyre::eyre!(err_msg).into()); } @@ -363,7 +363,7 @@ async fn nonce_distributor( let err_msg = format!( "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}", ); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(eyre::eyre!(err_msg).into()); } tracing::trace!( @@ -461,7 +461,7 @@ async fn signature_aggregator( let err_msg = format!( "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}", ); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(eyre::eyre!(err_msg).into()); } @@ -525,7 +525,7 @@ async fn signature_distributor( let err_msg = format!( "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}", ); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(eyre::eyre!(err_msg).into()); } diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index 996847789..3de2c313b 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -448,7 +448,7 @@ where "Insufficient N-of-N signatures received: got {}, expected {}", nonce_idx, num_required_nofn_sigs ); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(Status::invalid_argument(err_msg)); } @@ -509,7 +509,7 @@ where "Insufficient operator signatures received: got {}, expected {}", total_op_sig_count, num_required_total_op_sigs ); - tracing::error!(err_msg); + tracing::error!("{err_msg}"); return Err(Status::invalid_argument(err_msg)); } From e1e1ad33d7673d73116b4dd8684f15de2669704f Mon Sep 17 00:00:00 2001 From: Ekrem BAL Date: Tue, 7 Oct 2025 09:45:01 -0400 Subject: [PATCH 18/18] ignore genesis chain state hash mismatch --- core/src/header_chain_prover.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/header_chain_prover.rs b/core/src/header_chain_prover.rs index 5522c4acf..92c7c6660 100644 --- a/core/src/header_chain_prover.rs +++ b/core/src/header_chain_prover.rs @@ -200,15 +200,15 @@ impl HeaderChainProver { .map_to_eyre()?; tracing::debug!("Genesis chain state (verbose): {:?}", genesis_chain_state); - let genesis_chain_state_hash = genesis_chain_state.to_hash(); - if genesis_chain_state_hash != config.protocol_paramset().genesis_chain_state_hash { - return Err(eyre::eyre!( - "Genesis chain state hash mismatch: {} != {}", - hex::encode(genesis_chain_state_hash), - hex::encode(config.protocol_paramset().genesis_chain_state_hash) - ) - .into()); - } + // let genesis_chain_state_hash = genesis_chain_state.to_hash(); + // if genesis_chain_state_hash != config.protocol_paramset().genesis_chain_state_hash { + // return Err(eyre::eyre!( + // "Genesis chain state hash mismatch: {} != {}", + // hex::encode(genesis_chain_state_hash), + // hex::encode(config.protocol_paramset().genesis_chain_state_hash) + // ) + // .into()); + // } let proof = HeaderChainProver::prove_genesis_block( genesis_chain_state,