@@ -46,12 +46,13 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
4646use crate::ln::types::ChannelId;
4747use crate::prelude::*;
4848use crate::sign::ecdsa::EcdsaChannelSigner;
49- use crate::sign::{EntropySource, PeerStorageKey};
49+ use crate::sign::{EntropySource, PeerStorageKey, SignerProvider };
5050use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
5151use crate::types::features::{InitFeatures, NodeFeatures};
52+ use crate::util::async_poll::{MaybeSend, MaybeSync};
5253use crate::util::errors::APIError;
5354use crate::util::logger::{Logger, WithContext};
54- use crate::util::persist::MonitorName;
55+ use crate::util::persist::{FutureSpawner, MonitorName, MonitorUpdatingPersisterAsync, KVStore} ;
5556#[cfg(peer_storage)]
5657use crate::util::ser::{VecWriter, Writeable};
5758use crate::util::wakers::{Future, Notifier};
@@ -192,6 +193,15 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
192193 /// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
193194 /// the monitor already exists in the archive.
194195 fn archive_persisted_channel(&self, monitor_name: MonitorName);
196+
197+ /// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
198+ /// [`Self::update_persisted_channel`], which have completed.
199+ ///
200+ /// Returning an update here is equivalent to calling
201+ /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
202+ /// hidden in the docs.
203+ #[doc(hidden)]
204+ fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { Vec::new() }
195205}
196206
197207struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -235,6 +245,73 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
235245 }
236246}
237247
248+
249+ /// An unconstructable [`Persist`]er which is used under the hood when you call
250+ /// [`ChainMonitor::new_async_beta`].
251+ pub struct AsyncPersister<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
252+ where
253+ K::Target: KVStore + MaybeSync,
254+ L::Target: Logger,
255+ ES::Target: EntropySource + Sized,
256+ SP::Target: SignerProvider + Sized,
257+ BI::Target: BroadcasterInterface,
258+ FE::Target: FeeEstimator
259+ {
260+ persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
261+ }
262+
263+ impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
264+ Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
265+ where
266+ K::Target: KVStore + MaybeSync,
267+ L::Target: Logger,
268+ ES::Target: EntropySource + Sized,
269+ SP::Target: SignerProvider + Sized,
270+ BI::Target: BroadcasterInterface,
271+ FE::Target: FeeEstimator
272+ {
273+ type Target = Self;
274+ fn deref(&self) -> &Self {
275+ self
276+ }
277+ }
278+
279+ impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
280+ Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
281+ where
282+ K::Target: KVStore + MaybeSync,
283+ L::Target: Logger,
284+ ES::Target: EntropySource + Sized,
285+ SP::Target: SignerProvider + Sized,
286+ BI::Target: BroadcasterInterface,
287+ FE::Target: FeeEstimator,
288+ <SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
289+ {
290+ fn persist_new_channel(
291+ &self, monitor_name: MonitorName, monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
292+ ) -> ChannelMonitorUpdateStatus {
293+ self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
294+ ChannelMonitorUpdateStatus::InProgress
295+ }
296+
297+ fn update_persisted_channel(
298+ &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
299+ monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
300+ ) -> ChannelMonitorUpdateStatus {
301+ self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
302+ ChannelMonitorUpdateStatus::InProgress
303+ }
304+
305+ fn archive_persisted_channel(&self, monitor_name: MonitorName) {
306+ self.persister.spawn_async_archive_persisted_channel(monitor_name);
307+ }
308+
309+ fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
310+ self.persister.get_and_clear_completed_updates()
311+ }
312+ }
313+
314+
238315/// An implementation of [`chain::Watch`] for monitoring channels.
239316///
240317/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -291,6 +368,55 @@ pub struct ChainMonitor<
291368 our_peerstorage_encryption_key: PeerStorageKey,
292369}
293370
371+ impl<
372+ K: Deref + MaybeSend + MaybeSync + 'static,
373+ S: FutureSpawner,
374+ SP: Deref + MaybeSend + MaybeSync + 'static,
375+ C: Deref,
376+ T: Deref + MaybeSend + MaybeSync + 'static,
377+ F: Deref + MaybeSend + MaybeSync + 'static,
378+ L: Deref + MaybeSend + MaybeSync + 'static,
379+ ES: Deref + MaybeSend + MaybeSync + 'static,
380+ > ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, C, T, F, L, AsyncPersister<K, S, L, ES, SP, T, F>, ES>
381+ where
382+ K::Target: KVStore + MaybeSync,
383+ SP::Target: SignerProvider + Sized,
384+ C::Target: chain::Filter,
385+ T::Target: BroadcasterInterface,
386+ F::Target: FeeEstimator,
387+ L::Target: Logger,
388+ ES::Target: EntropySource + Sized,
389+ <SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
390+ {
391+ /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
392+ ///
393+ /// This behaves the same as [`ChainMonitor::new`] except that it relies on
394+ /// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
395+ ///
396+ /// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
397+ pub fn new_async_beta(
398+ chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
399+ persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
400+ _our_peerstorage_encryption_key: PeerStorageKey,
401+ ) -> Self {
402+ Self {
403+ monitors: RwLock::new(new_hash_map()),
404+ chain_source,
405+ broadcaster,
406+ logger,
407+ fee_estimator: feeest,
408+ persister: AsyncPersister { persister },
409+ _entropy_source,
410+ pending_monitor_events: Mutex::new(Vec::new()),
411+ highest_chain_height: AtomicUsize::new(0),
412+ event_notifier: Notifier::new(),
413+ pending_send_only_events: Mutex::new(Vec::new()),
414+ #[cfg(peer_storage)]
415+ our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
416+ }
417+ }
418+ }
419+
294420impl<
295421 ChannelSigner: EcdsaChannelSigner,
296422 C: Deref,
@@ -1357,6 +1483,9 @@ where
13571483 fn release_pending_monitor_events(
13581484 &self,
13591485 ) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1486+ for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1487+ self.channel_monitor_updated(channel_id, update_id);
1488+ }
13601489 let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
13611490 for monitor_state in self.monitors.read().unwrap().values() {
13621491 let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
0 commit comments