Skip to content

Commit 16b7cf9

Browse files
committed
sweepbatcher: add option WithWaitForAddSweep
WithWaitForAddSweep instructs sweepbatcher to wait for all existing sweeps to be AddSweep'ed in Run before actual starting. This is needed in setups where AddSweep is called after setting up dependencies of FetchSweep, so FetchSweep would fail if called before AddSweep.
1 parent 6db2a39 commit 16b7cf9

File tree

2 files changed

+398
-21
lines changed

2 files changed

+398
-21
lines changed

sweepbatcher/sweep_batcher.go

Lines changed: 191 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,21 @@ type Batcher struct {
244244
// initialized.
245245
initDone chan struct{}
246246

247+
// waitForAddSweepDone is a channel that is closed when waitForAddSweep
248+
// completes. If waitForAddSweepOnStart is false, the channel is always
249+
// closed.
250+
waitForAddSweepDone chan struct{}
251+
252+
// sweepsAdded is a set of swap hashes added by AddSweep calls, filled
253+
// and used only until waitForAddSweep finishes (= until channel
254+
// waitForAddSweepDone is closed).
255+
sweepsAdded map[lntypes.Hash]struct{}
256+
257+
// sweepsAddedCond is a condition variable (and a mutex, its L field)
258+
// used to protect sweepsAdded and to wait on it being filled with
259+
// hashes of all the existing sweeps.
260+
sweepsAddedCond *sync.Cond
261+
247262
// wallet is the wallet kit client that is used by batches.
248263
wallet lndclient.WalletKitClient
249264

@@ -323,6 +338,13 @@ type Batcher struct {
323338
// error. By default, it logs all errors as warnings, but "insufficient
324339
// fee" as Info.
325340
publishErrorHandler PublishErrorHandler
341+
342+
// waitForAddSweepOnStart instructs sweepbatcher to wait for all
343+
// existing sweeps to be AddSweep'ed in Run before actual starting.
344+
// This is needed in setups where AddSweep is called after setting up
345+
// dependencies of FetchSweep, so FetchSweep would fail if called before
346+
// AddSweep.
347+
waitForAddSweepOnStart bool
326348
}
327349

328350
// BatcherConfig holds batcher configuration.
@@ -373,6 +395,13 @@ type BatcherConfig struct {
373395
// error. By default, it logs all errors as warnings, but "insufficient
374396
// fee" as Info.
375397
publishErrorHandler PublishErrorHandler
398+
399+
// waitForAddSweepOnStart instructs sweepbatcher to wait for all
400+
// existing sweeps to be AddSweep'ed in Run before actual starting.
401+
// This is needed in setups where AddSweep is called after setting up
402+
// dependencies of FetchSweep, so FetchSweep would fail if called before
403+
// AddSweep.
404+
waitForAddSweepOnStart bool
376405
}
377406

378407
// BatcherOption configures batcher behaviour.
@@ -460,6 +489,16 @@ func WithPublishErrorHandler(handler PublishErrorHandler) BatcherOption {
460489
}
461490
}
462491

492+
// WithWaitForAddSweep instructs sweepbatcher to wait for all existing sweeps
493+
// to be AddSweep'ed in Run before actual starting. This is needed in setups
494+
// where AddSweep is called after setting up dependencies of FetchSweep, so
495+
// FetchSweep would fail if called before AddSweep.
496+
func WithWaitForAddSweep() BatcherOption {
497+
return func(cfg *BatcherConfig) {
498+
cfg.waitForAddSweepOnStart = true
499+
}
500+
}
501+
463502
// NewBatcher creates a new Batcher instance.
464503
func NewBatcher(wallet lndclient.WalletKitClient,
465504
chainNotifier lndclient.ChainNotifierClient,
@@ -492,33 +531,151 @@ func NewBatcher(wallet lndclient.WalletKitClient,
492531
"musig2ServerSigner")
493532
}
494533

534+
waitForAddSweepDone := make(chan struct{})
535+
if !cfg.waitForAddSweepOnStart {
536+
close(waitForAddSweepDone)
537+
}
538+
495539
return &Batcher{
496-
batches: make(map[int32]*batch),
497-
sweepReqs: make(chan SweepRequest),
498-
errChan: make(chan error, 1),
499-
quit: make(chan struct{}),
500-
initDone: make(chan struct{}),
501-
wallet: wallet,
502-
chainNotifier: chainNotifier,
503-
signerClient: signerClient,
504-
musig2ServerSign: musig2ServerSigner,
505-
VerifySchnorrSig: verifySchnorrSig,
506-
chainParams: chainparams,
507-
store: store,
508-
sweepStore: sweepStore,
509-
clock: cfg.clock,
510-
initialDelay: cfg.initialDelay,
511-
publishDelay: cfg.publishDelay,
512-
customFeeRate: cfg.customFeeRate,
513-
txLabeler: cfg.txLabeler,
514-
customMuSig2Signer: cfg.customMuSig2Signer,
515-
mixedBatch: cfg.mixedBatch,
516-
publishErrorHandler: cfg.publishErrorHandler,
540+
batches: make(map[int32]*batch),
541+
sweepReqs: make(chan SweepRequest),
542+
errChan: make(chan error, 1),
543+
quit: make(chan struct{}),
544+
initDone: make(chan struct{}),
545+
waitForAddSweepDone: waitForAddSweepDone,
546+
sweepsAdded: make(map[lntypes.Hash]struct{}),
547+
sweepsAddedCond: sync.NewCond(&sync.Mutex{}),
548+
wallet: wallet,
549+
chainNotifier: chainNotifier,
550+
signerClient: signerClient,
551+
musig2ServerSign: musig2ServerSigner,
552+
VerifySchnorrSig: verifySchnorrSig,
553+
chainParams: chainparams,
554+
store: store,
555+
sweepStore: sweepStore,
556+
clock: cfg.clock,
557+
initialDelay: cfg.initialDelay,
558+
publishDelay: cfg.publishDelay,
559+
customFeeRate: cfg.customFeeRate,
560+
txLabeler: cfg.txLabeler,
561+
customMuSig2Signer: cfg.customMuSig2Signer,
562+
mixedBatch: cfg.mixedBatch,
563+
publishErrorHandler: cfg.publishErrorHandler,
564+
waitForAddSweepOnStart: cfg.waitForAddSweepOnStart,
565+
}
566+
}
567+
568+
// missingSweeps returns the list of sweeps present in existing, but not present
569+
// in sweepsAdded.
570+
func missingSweeps(existing []lntypes.Hash,
571+
sweepsAdded map[lntypes.Hash]struct{}) []lntypes.Hash {
572+
573+
missing := []lntypes.Hash{}
574+
for _, h := range existing {
575+
if _, has := sweepsAdded[h]; !has {
576+
missing = append(missing, h)
577+
}
578+
}
579+
580+
return missing
581+
}
582+
583+
// waitForAddSweep waits until all the existing sweeps from the DB are added to
584+
// the sweeper by AddSweep calls, or until ctx is cancelled.
585+
func (b *Batcher) waitForAddSweep(ctx context.Context) error {
586+
// Close channel waitForAddSweepDone in the end of this function, so
587+
// subsequent AddSweep calls do not add hashes to sweepsAdded.
588+
defer close(b.waitForAddSweepDone)
589+
590+
// Collect existing sweeps from DB.
591+
batches, err := b.FetchUnconfirmedBatches(ctx)
592+
if err != nil {
593+
return fmt.Errorf("b.FetchUnconfirmedBatches failed: %w", err)
594+
}
595+
var existing []lntypes.Hash
596+
for _, batch := range batches {
597+
dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
598+
if err != nil {
599+
return fmt.Errorf("store.FetchBatchSweeps failed: %w",
600+
err)
601+
}
602+
for _, dbSweep := range dbSweeps {
603+
existing = append(existing, dbSweep.SwapHash)
604+
}
605+
}
606+
607+
// If it doesn't complete in 1 minute, log an error.
608+
logTimer := time.AfterFunc(time.Minute, func() {
609+
select {
610+
case <-b.waitForAddSweepDone:
611+
// waitForAddSweepDone is closed. Do nothing.
612+
return
613+
614+
default:
615+
// waitForAddSweepDone is open. Still waiting.
616+
}
617+
618+
// Collect missing sweeps.
619+
b.sweepsAddedCond.L.Lock()
620+
missing := missingSweeps(existing, b.sweepsAdded)
621+
b.sweepsAddedCond.L.Unlock()
622+
623+
log.Errorf("SweepBatcher hasn't started yet, because some "+
624+
"sweeps (%v) haven't been AddSweep'ed. Maybe they "+
625+
"confirmed on-chain, in this case they should be "+
626+
"removed manually from the database, table 'sweeps'.",
627+
missing)
628+
})
629+
630+
go func() {
631+
select {
632+
case <-ctx.Done():
633+
// Wake cond var up to finish the loop below.
634+
b.sweepsAddedCond.Signal()
635+
636+
case <-b.waitForAddSweepDone:
637+
// waitForAddSweep was closed, stop the goroutine.
638+
}
639+
}()
640+
641+
// Wait on cond var for all existing sweeps to be added in AddSweep or
642+
// for the context to expire.
643+
b.sweepsAddedCond.L.Lock()
644+
645+
for {
646+
// If all the existing sweeps were added, stop waiting.
647+
if len(missingSweeps(existing, b.sweepsAdded)) == 0 {
648+
break
649+
}
650+
651+
// It the context expired, stop waiting.
652+
if ctx.Err() != nil {
653+
break
654+
}
655+
656+
// Wait for next AddSweep or ctx cancellation.
657+
b.sweepsAddedCond.Wait()
517658
}
659+
660+
// Clear sweepsAdded, it is not needed anymore.
661+
b.sweepsAdded = make(map[lntypes.Hash]struct{})
662+
663+
b.sweepsAddedCond.L.Unlock()
664+
665+
// Cancel the delayed check.
666+
logTimer.Stop()
667+
668+
return ctx.Err()
518669
}
519670

520671
// Run starts the batcher and processes incoming sweep requests.
521672
func (b *Batcher) Run(ctx context.Context) error {
673+
if b.waitForAddSweepOnStart {
674+
if err := b.waitForAddSweep(ctx); err != nil {
675+
return fmt.Errorf("waitForAddSweep failed: %w", err)
676+
}
677+
}
678+
522679
runCtx, cancel := context.WithCancel(ctx)
523680
defer func() {
524681
cancel()
@@ -577,6 +734,19 @@ func (b *Batcher) Run(ctx context.Context) error {
577734
// AddSweep adds a sweep request to the batcher for handling. This will either
578735
// place the sweep in an existing batch or create a new one.
579736
func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
737+
// If waitForAddSweep is running (indicated by waitForAddSweepDone being
738+
// open), add the swap hash to sweepsAdded.
739+
select {
740+
case <-b.waitForAddSweepDone:
741+
// waitForAddSweepDone is closed. Do not add to sweepsAdded.
742+
default:
743+
// waitForAddSweepDone is open.
744+
b.sweepsAddedCond.L.Lock()
745+
b.sweepsAdded[sweepReq.SwapHash] = struct{}{}
746+
b.sweepsAddedCond.L.Unlock()
747+
b.sweepsAddedCond.Signal()
748+
}
749+
580750
select {
581751
case b.sweepReqs <- *sweepReq:
582752
return nil

0 commit comments

Comments
 (0)