Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agreement/asyncVoteVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-deadlock"
)

type asyncVerifyVoteRequest struct {
Expand Down Expand Up @@ -57,6 +58,7 @@ type AsyncVoteVerifier struct {
execpoolOut chan interface{}
ctx context.Context
ctxCancel context.CancelFunc
enqueueMu deadlock.RWMutex // guard against avv.ctxCancel between avv.ctx.Done and avv.wg.Add
}

// MakeAsyncVoteVerifier creates an AsyncVoteVerifier with workers as the number of CPUs
Expand Down Expand Up @@ -132,6 +134,8 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf
}

func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error {
avv.enqueueMu.RLock()
defer avv.enqueueMu.RUnlock()
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -152,6 +156,8 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader,
}

func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error {
avv.enqueueMu.RLock()
defer avv.enqueueMu.RUnlock()
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -174,7 +180,9 @@ func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReade
// Quit tells the AsyncVoteVerifier to shutdown and waits until all workers terminate.
func (avv *AsyncVoteVerifier) Quit() {
// indicate we're done and wait for all workers to finish
avv.enqueueMu.Lock()
avv.ctxCancel()
avv.enqueueMu.Unlock()

// wait until all the tasks we've given the pool are done.
avv.wg.Wait()
Expand Down
69 changes: 69 additions & 0 deletions agreement/asyncVoteVerifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package agreement

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -51,3 +53,70 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) {
verifyEqVoteErr := voteVerifier.verifyEqVote(context.Background(), nil, unauthenticatedEquivocationVote{}, 0, message{}, make(chan<- asyncVerifyVoteResponse, 1))
require.Equal(t, context.Canceled, verifyEqVoteErr)
}

// bypassAsyncVoteVerifierCtxCheck is used to call the quivalent of AsyncVoteVerifier.verifyVote and to simulate the case
// where the ctx is checked in verifyVote before it is cancled. This likelihood is enhanced by the sleep of 10 ms.
// This behavior is possible, since the ctx is canceled from a different go-routine.
// bypassAsyncVoteVerifierCtxCheck is important to test what happens when the service shuts down, and a vote sneaks
// through the ctx check.
func bypassAsyncVoteVerifierCtxCheck(verctx context.Context, avv *AsyncVoteVerifier, l LedgerReader,
uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) {
avv.enqueueMu.RLock()
defer avv.enqueueMu.RUnlock()
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
// instead, enqueue so the worker will set the error value and return the cancelled vote properly.
default:
time.Sleep(10 * time.Millisecond)
req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out}
avv.wg.Add(1)
if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil {
// we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks.
// if we got a non-nil, it means that our context has expired, which means that we won't see this task
// getting to the verification function.
avv.wg.Done()
}
}
}

// TestServiceStop tests what happens when the agreement service shuts down, and
// calls the AsyncVoteVerifier Quit
// Specifically, tests the case when a vote gets submitted to the pool for verification
// concurrently when the verifier is quitting
func TestServiceStop(t *testing.T) {
partitiontest.PartitionTest(t)
ledger, addresses, vrfSecrets, otSecrets := readOnlyFixture100()
proposal := proposalValue{BlockDigest: randomBlockHash()}
proposal.OriginalProposer = addresses[0]

rv := rawVote{Sender: addresses[0], Round: 1, Period: 1, Step: step(0), Proposal: proposal}
uv, err := makeVote(rv, otSecrets[0], vrfSecrets[0], ledger)
require.NoError(t, err)
outChan := make(chan asyncVerifyVoteResponse, 4)

// flush the output chan
go func() {
for range outChan {
}
return
}()
for x := 0; x < 1000; x++ {
voteVerifier := MakeAsyncVoteVerifier(execpool.MakeBacklog(nil, 0, execpool.HighPriority, nil))
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
bypassAsyncVoteVerifierCtxCheck(context.Background(), voteVerifier, ledger, uv, 1, message{}, outChan)
select {
case <-voteVerifier.workerWaitCh:
return
default:
}
}
}()
voteVerifier.Quit()
wg.Wait()
}
}