From 7aea9664aa293b2d9d7c0554f96684dcb470cb39 Mon Sep 17 00:00:00 2001 From: algonautshant Date: Thu, 27 Apr 2023 20:24:27 -0400 Subject: [PATCH 1/6] test to demonstrate the agreement service shutdown may panic --- agreement/asyncVoteVerifier_test.go | 58 +++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/agreement/asyncVoteVerifier_test.go b/agreement/asyncVoteVerifier_test.go index e344d10b70..5113b7bf76 100644 --- a/agreement/asyncVoteVerifier_test.go +++ b/agreement/asyncVoteVerifier_test.go @@ -18,6 +18,7 @@ package agreement import ( "context" + "sync" "testing" "github.com/stretchr/testify/require" @@ -51,3 +52,60 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) { verifyEqVoteErr := voteVerifier.verifyEqVote(context.Background(), nil, unauthenticatedEquivocationVote{}, 0, message{}, make(chan<- asyncVerifyVoteResponse, 1)) require.Equal(t, context.Canceled, verifyEqVoteErr) } + +// bypassAsyncVoteVerifierCtxCheck is used to call the quivalent of AsyncVoteVerifier.verifyVote and simulate the case +// where the ctx is checked in verifyVote before it is cancled. +// This behavior is possible, since the ctx is cancled from a different go-routine. +// bypassAsyncVoteVerifierCtxCheck is important to test what happens when the service shuts down, and a vote sneaks +// through the ctx check. +func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, + uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) { + req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out} + avv.wg.Add(1) + if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil { + // we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks. + // if we got a non-nil, it means that our context has expired, which means that we won't see this task + // getting to the verification function. + avv.wg.Done() + } +} + +// TestServiceStop tests what happens when the agreement service shuts down, and +// calls the AsyncVoteVerifier Quit +// Specifically, tests the case when a vote gets submitted to the pool for verification +// concurrently when the verifier is quitting +func TestServiceStop(t *testing.T) { + ledger, addresses, vrfSecrets, otSecrets := readOnlyFixture100() + proposal := proposalValue{BlockDigest: randomBlockHash()} + proposal.OriginalProposer = addresses[0] + + rv := rawVote{Sender: addresses[0], Round: 1, Period: 1, Step: step(0), Proposal: proposal} + uv, err := makeVote(rv, otSecrets[0], vrfSecrets[0], ledger) + require.NoError(t, err) + outChan := make(chan asyncVerifyVoteResponse, 4) + + // flush the output chan + go func() { + for range outChan { + } + return + }() + for x := 0; x < 1000; x++ { + voteVerifier := MakeAsyncVoteVerifier(execpool.MakeBacklog(nil, 0, execpool.HighPriority, nil)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + bypassAsyncVoteVerifierCtxCheck(voteVerifier, context.Background(), ledger, uv, 1, message{}, outChan) + select { + case <-voteVerifier.workerWaitCh: + return + default: + } + } + }() + voteVerifier.Quit() + wg.Wait() + } +} From 49a539da4597017a2149a8cf2e8eab9a005dbca0 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Fri, 28 Apr 2023 09:53:08 -0400 Subject: [PATCH 2/6] Update agreement/asyncVoteVerifier_test.go --- agreement/asyncVoteVerifier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agreement/asyncVoteVerifier_test.go b/agreement/asyncVoteVerifier_test.go index 5113b7bf76..ec5a30ea8b 100644 --- a/agreement/asyncVoteVerifier_test.go +++ b/agreement/asyncVoteVerifier_test.go @@ -58,7 +58,7 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) { // This behavior is possible, since the ctx is cancled from a different go-routine. // bypassAsyncVoteVerifierCtxCheck is important to test what happens when the service shuts down, and a vote sneaks // through the ctx check. -func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, +func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, //nolint:revive // verctx is OK as second argument uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) { req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out} avv.wg.Add(1) From 0717d0f1dd5d6cac7b1f41710be156f3246327ea Mon Sep 17 00:00:00 2001 From: algonautshant Date: Tue, 2 May 2023 17:49:09 -0400 Subject: [PATCH 3/6] fix with a mutex and enhanced test --- agreement/asyncVoteVerifier.go | 6 ++++++ agreement/asyncVoteVerifier_test.go | 33 +++++++++++++++++++---------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/agreement/asyncVoteVerifier.go b/agreement/asyncVoteVerifier.go index e30c9c5b1a..d5cd95e2b7 100644 --- a/agreement/asyncVoteVerifier.go +++ b/agreement/asyncVoteVerifier.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/algorand/go-algorand/util/execpool" + "github.com/algorand/go-deadlock" ) type asyncVerifyVoteRequest struct { @@ -57,6 +58,7 @@ type AsyncVoteVerifier struct { execpoolOut chan interface{} ctx context.Context ctxCancel context.CancelFunc + mu deadlock.RWMutex } // MakeAsyncVoteVerifier creates an AsyncVoteVerifier with workers as the number of CPUs @@ -132,6 +134,8 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf } func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error { + avv.mu.RLock() + defer avv.mu.RUnlock() select { case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request // case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)! @@ -174,7 +178,9 @@ func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReade // Quit tells the AsyncVoteVerifier to shutdown and waits until all workers terminate. func (avv *AsyncVoteVerifier) Quit() { // indicate we're done and wait for all workers to finish + avv.mu.Lock() avv.ctxCancel() + avv.mu.Unlock() // wait until all the tasks we've given the pool are done. avv.wg.Wait() diff --git a/agreement/asyncVoteVerifier_test.go b/agreement/asyncVoteVerifier_test.go index ec5a30ea8b..d7fb9cd62e 100644 --- a/agreement/asyncVoteVerifier_test.go +++ b/agreement/asyncVoteVerifier_test.go @@ -20,6 +20,7 @@ import ( "context" "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -53,20 +54,29 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) { require.Equal(t, context.Canceled, verifyEqVoteErr) } -// bypassAsyncVoteVerifierCtxCheck is used to call the quivalent of AsyncVoteVerifier.verifyVote and simulate the case -// where the ctx is checked in verifyVote before it is cancled. -// This behavior is possible, since the ctx is cancled from a different go-routine. +// bypassAsyncVoteVerifierCtxCheck is used to call the quivalent of AsyncVoteVerifier.verifyVote and to simulate the case +// where the ctx is checked in verifyVote before it is cancled. This likelihood is enhanced by the sleep of 10 ms. +// This behavior is possible, since the ctx is canceled from a different go-routine. // bypassAsyncVoteVerifierCtxCheck is important to test what happens when the service shuts down, and a vote sneaks // through the ctx check. -func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, //nolint:revive // verctx is OK as second argument +func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) { - req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out} - avv.wg.Add(1) - if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil { - // we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks. - // if we got a non-nil, it means that our context has expired, which means that we won't see this task - // getting to the verification function. - avv.wg.Done() + avv.mu.RLock() + defer avv.mu.RUnlock() + select { + case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request + // case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)! + // instead, enqueue so the worker will set the error value and return the cancelled vote properly. + default: + time.Sleep(10 * time.Millisecond) + req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out} + avv.wg.Add(1) + if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil { + // we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks. + // if we got a non-nil, it means that our context has expired, which means that we won't see this task + // getting to the verification function. + avv.wg.Done() + } } } @@ -75,6 +85,7 @@ func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Cont // Specifically, tests the case when a vote gets submitted to the pool for verification // concurrently when the verifier is quitting func TestServiceStop(t *testing.T) { + partitiontest.PartitionTest(t) ledger, addresses, vrfSecrets, otSecrets := readOnlyFixture100() proposal := proposalValue{BlockDigest: randomBlockHash()} proposal.OriginalProposer = addresses[0] From e526401d4e64805335b765a4d2982c0972c3e757 Mon Sep 17 00:00:00 2001 From: algonautshant Date: Tue, 2 May 2023 18:26:16 -0400 Subject: [PATCH 4/6] mutex for eqVotes --- agreement/asyncVoteVerifier.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agreement/asyncVoteVerifier.go b/agreement/asyncVoteVerifier.go index d5cd95e2b7..69b0f4fa88 100644 --- a/agreement/asyncVoteVerifier.go +++ b/agreement/asyncVoteVerifier.go @@ -156,6 +156,8 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, } func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error { + avv.mu.RLock() + defer avv.mu.RUnlock() select { case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request // case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)! From 54a53fc6712b574871745dc7928533c1965bf500 Mon Sep 17 00:00:00 2001 From: algonautshant Date: Wed, 3 May 2023 16:34:26 -0400 Subject: [PATCH 5/6] rename and comment --- agreement/asyncVoteVerifier.go | 14 +++++++------- agreement/asyncVoteVerifier_test.go | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/agreement/asyncVoteVerifier.go b/agreement/asyncVoteVerifier.go index 69b0f4fa88..03c2b15629 100644 --- a/agreement/asyncVoteVerifier.go +++ b/agreement/asyncVoteVerifier.go @@ -58,7 +58,7 @@ type AsyncVoteVerifier struct { execpoolOut chan interface{} ctx context.Context ctxCancel context.CancelFunc - mu deadlock.RWMutex + enqueueMu deadlock.RWMutex // guard against avv.ctxCancel between avv.ctx.Done and avv.wg.Add } // MakeAsyncVoteVerifier creates an AsyncVoteVerifier with workers as the number of CPUs @@ -134,8 +134,8 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf } func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error { - avv.mu.RLock() - defer avv.mu.RUnlock() + avv.enqueueMu.RLock() + defer avv.enqueueMu.RUnlock() select { case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request // case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)! @@ -156,8 +156,8 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, } func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error { - avv.mu.RLock() - defer avv.mu.RUnlock() + avv.enqueueMu.RLock() + defer avv.enqueueMu.RUnlock() select { case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request // case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)! @@ -180,9 +180,9 @@ func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReade // Quit tells the AsyncVoteVerifier to shutdown and waits until all workers terminate. func (avv *AsyncVoteVerifier) Quit() { // indicate we're done and wait for all workers to finish - avv.mu.Lock() + avv.enqueueMu.Lock() avv.ctxCancel() - avv.mu.Unlock() + avv.enqueueMu.Unlock() // wait until all the tasks we've given the pool are done. avv.wg.Wait() diff --git a/agreement/asyncVoteVerifier_test.go b/agreement/asyncVoteVerifier_test.go index d7fb9cd62e..92f31cd76b 100644 --- a/agreement/asyncVoteVerifier_test.go +++ b/agreement/asyncVoteVerifier_test.go @@ -61,8 +61,8 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) { // through the ctx check. func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) { - avv.mu.RLock() - defer avv.mu.RUnlock() + avv.enqueueMu.RLock() + defer avv.enqueueMu.RUnlock() select { case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request // case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)! From 09c316399cdb1e5eabe9bb70b92c8903e5c820b9 Mon Sep 17 00:00:00 2001 From: algonautshant Date: Wed, 3 May 2023 18:06:39 -0400 Subject: [PATCH 6/6] fix lint --- agreement/asyncVoteVerifier_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agreement/asyncVoteVerifier_test.go b/agreement/asyncVoteVerifier_test.go index 92f31cd76b..304303200b 100644 --- a/agreement/asyncVoteVerifier_test.go +++ b/agreement/asyncVoteVerifier_test.go @@ -59,7 +59,7 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) { // This behavior is possible, since the ctx is canceled from a different go-routine. // bypassAsyncVoteVerifierCtxCheck is important to test what happens when the service shuts down, and a vote sneaks // through the ctx check. -func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader, +func bypassAsyncVoteVerifierCtxCheck(verctx context.Context, avv *AsyncVoteVerifier, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) { avv.enqueueMu.RLock() defer avv.enqueueMu.RUnlock() @@ -108,7 +108,7 @@ func TestServiceStop(t *testing.T) { go func() { defer wg.Done() for { - bypassAsyncVoteVerifierCtxCheck(voteVerifier, context.Background(), ledger, uv, 1, message{}, outChan) + bypassAsyncVoteVerifierCtxCheck(context.Background(), voteVerifier, ledger, uv, 1, message{}, outChan) select { case <-voteVerifier.workerWaitCh: return