Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agreement/asyncVoteVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-deadlock"
)

type asyncVerifyVoteRequest struct {
Expand Down Expand Up @@ -57,6 +58,7 @@ type AsyncVoteVerifier struct {
execpoolOut chan interface{}
ctx context.Context
ctxCancel context.CancelFunc
mu deadlock.RWMutex
}

// MakeAsyncVoteVerifier creates an AsyncVoteVerifier with workers as the number of CPUs
Expand Down Expand Up @@ -132,6 +134,8 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf
}

func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error {
avv.mu.RLock()
defer avv.mu.RUnlock()
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -152,6 +156,8 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader,
}

func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error {
avv.mu.RLock()
defer avv.mu.RUnlock()
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -174,7 +180,9 @@ func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReade
// Quit tells the AsyncVoteVerifier to shutdown and waits until all workers terminate.
func (avv *AsyncVoteVerifier) Quit() {
// indicate we're done and wait for all workers to finish
avv.mu.Lock()
avv.ctxCancel()
avv.mu.Unlock()

// wait until all the tasks we've given the pool are done.
avv.wg.Wait()
Expand Down
69 changes: 69 additions & 0 deletions agreement/asyncVoteVerifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package agreement

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -51,3 +53,70 @@ func TestVerificationAgainstFullExecutionPool(t *testing.T) {
verifyEqVoteErr := voteVerifier.verifyEqVote(context.Background(), nil, unauthenticatedEquivocationVote{}, 0, message{}, make(chan<- asyncVerifyVoteResponse, 1))
require.Equal(t, context.Canceled, verifyEqVoteErr)
}

// bypassAsyncVoteVerifierCtxCheck is used to call the quivalent of AsyncVoteVerifier.verifyVote and to simulate the case
// where the ctx is checked in verifyVote before it is cancled. This likelihood is enhanced by the sleep of 10 ms.
// This behavior is possible, since the ctx is canceled from a different go-routine.
// bypassAsyncVoteVerifierCtxCheck is important to test what happens when the service shuts down, and a vote sneaks
// through the ctx check.
func bypassAsyncVoteVerifierCtxCheck(avv *AsyncVoteVerifier, verctx context.Context, l LedgerReader,
uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) {
avv.mu.RLock()
defer avv.mu.RUnlock()
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
// instead, enqueue so the worker will set the error value and return the cancelled vote properly.
default:
time.Sleep(10 * time.Millisecond)
req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out}
avv.wg.Add(1)
if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil {
// we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks.
// if we got a non-nil, it means that our context has expired, which means that we won't see this task
// getting to the verification function.
avv.wg.Done()
}
}
}

// TestServiceStop tests what happens when the agreement service shuts down, and
// calls the AsyncVoteVerifier Quit
// Specifically, tests the case when a vote gets submitted to the pool for verification
// concurrently when the verifier is quitting
func TestServiceStop(t *testing.T) {
partitiontest.PartitionTest(t)
ledger, addresses, vrfSecrets, otSecrets := readOnlyFixture100()
proposal := proposalValue{BlockDigest: randomBlockHash()}
proposal.OriginalProposer = addresses[0]

rv := rawVote{Sender: addresses[0], Round: 1, Period: 1, Step: step(0), Proposal: proposal}
uv, err := makeVote(rv, otSecrets[0], vrfSecrets[0], ledger)
require.NoError(t, err)
outChan := make(chan asyncVerifyVoteResponse, 4)

// flush the output chan
go func() {
for range outChan {
}
return
}()
for x := 0; x < 1000; x++ {
voteVerifier := MakeAsyncVoteVerifier(execpool.MakeBacklog(nil, 0, execpool.HighPriority, nil))
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
bypassAsyncVoteVerifierCtxCheck(voteVerifier, context.Background(), ledger, uv, 1, message{}, outChan)
select {
case <-voteVerifier.workerWaitCh:
return
default:
}
}
}()
voteVerifier.Quit()
wg.Wait()
}
}