Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker
err := errors.New("config is already joint")
return c.err(err)
}
if len(incoming(cfg.Voters)) == 0 {
if len(cfg.Voters.Incoming) == 0 {
// We allow adding nodes to an empty config for convenience (testing and
// bootstrap), but you can't enter a joint state.
err := errors.New("can't make a zero-voter config joint")
return c.err(err)
}
// Clear the outgoing config.
*outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
cfg.Voters.Outgoing = quorum.MajorityConfig{}
// Copy incoming to outgoing.
for id := range incoming(cfg.Voters) {
outgoing(cfg.Voters)[id] = struct{}{}
for id := range cfg.Voters.Incoming {
cfg.Voters.Outgoing[id] = struct{}{}
}

if err := c.apply(&cfg, trk, ccs...); err != nil {
Expand All @@ -79,7 +79,7 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker

// LeaveJoint transitions out of a joint configuration. It is an error to call
// this method if the configuration is not joint, i.e. if the outgoing majority
// config Voters[1] is empty.
// config Voters.Outgoing is empty.
//
// The outgoing majority config of the joint configuration will be removed,
// that is, the incoming config is promoted as the sole decision maker. In the
Expand All @@ -106,22 +106,22 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
}
cfg.LearnersNext = nil

for id := range outgoing(cfg.Voters) {
_, isVoter := incoming(cfg.Voters)[id]
for id := range cfg.Voters.Outgoing {
_, isVoter := cfg.Voters.Incoming[id]
_, isLearner := cfg.Learners[id]

if !isVoter && !isLearner {
delete(trk, id)
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.Voters.Outgoing = nil
cfg.AutoLeave = false

return checkAndReturn(cfg, trk)
}

// Simple carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// mutates the incoming majority config Voters.Incoming by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
Expand All @@ -137,15 +137,15 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
if err := c.apply(&cfg, trk, ccs...); err != nil {
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
if n := symdiff(c.Tracker.Voters.Incoming, cfg.Voters.Incoming); n > 1 {
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}

return checkAndReturn(cfg, trk)
}

// apply a change to the configuration. By convention, changes to voters are
// always made to the incoming majority config Voters[0]. Voters[1] is either
// always made to the incoming majority config Voters.Incoming. Voters.Outgoing is either
// empty or preserves the outgoing majority configuration while in a joint state.
func (c Changer) apply(cfg *tracker.Config, trk tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
for _, cc := range ccs {
Expand All @@ -167,7 +167,7 @@ func (c Changer) apply(cfg *tracker.Config, trk tracker.ProgressMap, ccs ...pb.C
return fmt.Errorf("unexpected conf type %d", cc.Type)
}
}
if len(incoming(cfg.Voters)) == 0 {
if len(cfg.Voters.Incoming) == 0 {
return errors.New("removed all voters")
}
return nil
Expand All @@ -185,7 +185,7 @@ func (c Changer) makeVoter(cfg *tracker.Config, trk tracker.ProgressMap, id uint
pr.IsLearner = false
nilAwareDelete(&cfg.Learners, id)
nilAwareDelete(&cfg.LearnersNext, id)
incoming(cfg.Voters)[id] = struct{}{}
cfg.Voters.Incoming[id] = struct{}{}
}

// makeLearner makes the given ID a learner or stages it to be a learner once
Expand Down Expand Up @@ -219,7 +219,7 @@ func (c Changer) makeLearner(cfg *tracker.Config, trk tracker.ProgressMap, id ui
// be turned into a learner in LeaveJoint().
//
// Otherwise, add a regular learner right away.
if _, onRight := outgoing(cfg.Voters)[id]; onRight {
if _, onRight := cfg.Voters.Outgoing[id]; onRight {
nilAwareAdd(&cfg.LearnersNext, id)
} else {
pr.IsLearner = true
Expand All @@ -233,20 +233,20 @@ func (c Changer) remove(cfg *tracker.Config, trk tracker.ProgressMap, id uint64)
return
}

delete(incoming(cfg.Voters), id)
delete(cfg.Voters.Incoming, id)
nilAwareDelete(&cfg.Learners, id)
nilAwareDelete(&cfg.LearnersNext, id)

// If the peer is still a voter in the outgoing config, keep the Progress.
if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
if _, onRight := cfg.Voters.Outgoing[id]; !onRight {
delete(trk, id)
}
}

// initProgress initializes a new progress for the given node or learner.
func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id uint64, isLearner bool) {
if !isLearner {
incoming(cfg.Voters)[id] = struct{}{}
cfg.Voters.Incoming[id] = struct{}{}
} else {
nilAwareAdd(&cfg.Learners, id)
}
Expand Down Expand Up @@ -295,20 +295,20 @@ func checkInvariants(cfg tracker.Config, trk tracker.ProgressMap) error {
// Any staged learner was staged because it could not be directly added due
// to a conflicting voter in the outgoing config.
for id := range cfg.LearnersNext {
if _, ok := outgoing(cfg.Voters)[id]; !ok {
return fmt.Errorf("%d is in LearnersNext, but not Voters[1]", id)
if _, ok := cfg.Voters.Outgoing[id]; !ok {
return fmt.Errorf("%d is in LearnersNext, but not Voters.Outgoing", id)
}
if trk[id].IsLearner {
return fmt.Errorf("%d is in LearnersNext, but is already marked as learner", id)
}
}
// Conversely Learners and Voters doesn't intersect at all.
for id := range cfg.Learners {
if _, ok := outgoing(cfg.Voters)[id]; ok {
return fmt.Errorf("%d is in Learners and Voters[1]", id)
if _, ok := cfg.Voters.Outgoing[id]; ok {
return fmt.Errorf("%d is in Learners and Voters.Outgoing", id)
}
if _, ok := incoming(cfg.Voters)[id]; ok {
return fmt.Errorf("%d is in Learners and Voters[0]", id)
if _, ok := cfg.Voters.Incoming[id]; ok {
return fmt.Errorf("%d is in Learners and Voters.Incoming", id)
}
if !trk[id].IsLearner {
return fmt.Errorf("%d is in Learners, but is not marked as learner", id)
Expand All @@ -317,8 +317,8 @@ func checkInvariants(cfg tracker.Config, trk tracker.ProgressMap) error {

if !joint(cfg) {
// We enforce that empty maps are nil instead of zero.
if outgoing(cfg.Voters) != nil {
return fmt.Errorf("cfg.Voters[1] must be nil when not joint")
if cfg.Voters.Outgoing != nil {
return fmt.Errorf("cfg.Voters.Outgoing must be nil when not joint")
}
if cfg.LearnersNext != nil {
return fmt.Errorf("cfg.LearnersNext must be nil when not joint")
Expand Down Expand Up @@ -398,13 +398,9 @@ func symdiff(l, r map[uint64]struct{}) int {
}

func joint(cfg tracker.Config) bool {
return len(outgoing(cfg.Voters)) > 0
return len(cfg.Voters.Outgoing) > 0
}

func incoming(voters quorum.JointConfig) quorum.MajorityConfig { return voters[0] }
func outgoing(voters quorum.JointConfig) quorum.MajorityConfig { return voters[1] }
func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &voters[1] }

// Describe prints the type and NodeID of the configuration changes as a
// space-delimited string.
func Describe(ccs ...pb.ConfChangeSingle) string {
Expand Down
15 changes: 8 additions & 7 deletions quorum/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestDataDriven(t *testing.T) {
if d.Cmd == "vote" {
input = votes
}
if voters := JointConfig([2]MajorityConfig{c, cj}).IDs(); len(voters) != len(input) {

if voters := (JointConfig{c, cj}).IDs(); len(voters) != len(input) {
return fmt.Sprintf("error: mismatched input (explicit or _) for voters %v: %v",
voters, input)
}
Expand All @@ -173,11 +174,11 @@ func TestDataDriven(t *testing.T) {
fmt.Fprintf(&buf, "%s <-- via alternative computation\n", aIdx)
}
// Joining a majority with the empty majority should give same result.
if aIdx := JointConfig([2]MajorityConfig{c, {}}).CommittedIndex(l); aIdx != idx {
if aIdx := (JointConfig{c, nil}.CommittedIndex(l)); aIdx != idx {
fmt.Fprintf(&buf, "%s <-- via zero-joint quorum\n", aIdx)
}
// Joining a majority with itself should give same result.
if aIdx := JointConfig([2]MajorityConfig{c, c}).CommittedIndex(l); aIdx != idx {
if aIdx := (JointConfig{c, c}.CommittedIndex(l)); aIdx != idx {
fmt.Fprintf(&buf, "%s <-- via self-joint quorum\n", aIdx)
}
overlay := func(c MajorityConfig, l AckedIndexer, id uint64, idx Index) AckedIndexer {
Expand Down Expand Up @@ -209,11 +210,11 @@ func TestDataDriven(t *testing.T) {
}
fmt.Fprintf(&buf, "%s\n", idx)
} else {
cc := JointConfig([2]MajorityConfig{c, cj})
cc := JointConfig{c, cj}
fmt.Fprint(&buf, cc.Describe(l))
idx := cc.CommittedIndex(l)
// Interchanging the majorities shouldn't make a difference. If it does, print.
if aIdx := JointConfig([2]MajorityConfig{cj, c}).CommittedIndex(l); aIdx != idx {
if aIdx := (JointConfig{cj, c}).CommittedIndex(l); aIdx != idx {
fmt.Fprintf(&buf, "%s <-- via symmetry\n", aIdx)
}
fmt.Fprintf(&buf, "%s\n", idx)
Expand All @@ -231,9 +232,9 @@ func TestDataDriven(t *testing.T) {
fmt.Fprintf(&buf, "%v\n", r)
} else {
// Run a joint quorum test case.
r := JointConfig([2]MajorityConfig{c, cj}).VoteResult(l)
r := (JointConfig{c, cj}).VoteResult(l)
// Interchanging the majorities shouldn't make a difference. If it does, print.
if ar := JointConfig([2]MajorityConfig{cj, c}).VoteResult(l); ar != r {
if ar := (JointConfig{cj, c}).VoteResult(l); ar != r {
fmt.Fprintf(&buf, "%v <-- via symmetry\n", ar)
}
fmt.Fprintf(&buf, "%v\n", r)
Expand Down
31 changes: 19 additions & 12 deletions quorum/joint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,31 @@ package quorum

// JointConfig is a configuration of two groups of (possibly overlapping)
// majority configurations. Decisions require the support of both majorities.
type JointConfig [2]MajorityConfig
type JointConfig struct {
Incoming MajorityConfig
Outgoing MajorityConfig
}

func (c JointConfig) String() string {
if len(c[1]) > 0 {
return c[0].String() + "&&" + c[1].String()
if len(c.Outgoing) > 0 {
return c.Incoming.String() + "&&" + c.Outgoing.String()
}
return c[0].String()
return c.Incoming.String()
}

// IDs returns a newly initialized map representing the set of voters present
// in the joint configuration.
func (c JointConfig) IDs() map[uint64]struct{} {
m := map[uint64]struct{}{}
for _, cc := range c {
for id := range cc {
m[id] = struct{}{}
}

for id := range c.Incoming {
m[id] = struct{}{}
}

for id := range c.Outgoing {
m[id] = struct{}{}
}

return m
}

Expand All @@ -47,8 +54,8 @@ func (c JointConfig) Describe(l AckedIndexer) string {
// quorum. An index is jointly committed if it is committed in both constituent
// majorities.
func (c JointConfig) CommittedIndex(l AckedIndexer) Index {
idx0 := c[0].CommittedIndex(l)
idx1 := c[1].CommittedIndex(l)
idx0 := c.Incoming.CommittedIndex(l)
idx1 := c.Outgoing.CommittedIndex(l)
if idx0 < idx1 {
return idx0
}
Expand All @@ -59,8 +66,8 @@ func (c JointConfig) CommittedIndex(l AckedIndexer) Index {
// a result indicating whether the vote is pending, lost, or won. A joint quorum
// requires both majority quorums to vote in favor.
func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult {
r1 := c[0].VoteResult(votes)
r2 := c[1].VoteResult(votes)
r1 := c.Incoming.VoteResult(votes)
r2 := c.Outgoing.VoteResult(votes)

if r1 == r2 {
// If they agree, return the agreed state.
Expand Down
2 changes: 1 addition & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
if cc != nil {
alreadyPending := r.pendingConfIndex > r.raftLog.applied
alreadyJoint := len(r.trk.Config.Voters[1]) > 0
alreadyJoint := len(r.trk.Config.Voters.Outgoing) > 0
wantsLeaveJoint := len(cc.AsV2().Changes) == 0

var failedCheck string
Expand Down
2 changes: 1 addition & 1 deletion raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3975,7 +3975,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
pr.IsLearner = true
v.trk.Learners[peerAddrs[i]] = struct{}{}
} else {
v.trk.Voters[0][peerAddrs[i]] = struct{}{}
v.trk.Voters.Incoming[peerAddrs[i]] = struct{}{}
}
v.trk.Progress[peerAddrs[i]] = pr
}
Expand Down
4 changes: 2 additions & 2 deletions state_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func traceEvent(evt stateMachineEventType, r *raft, m *raftpb.Message, prop map[
NodeID: strconv.FormatUint(r.id, 10),
State: makeTracingState(r),
LogSize: r.raftLog.lastIndex(),
Conf: [2][]string{formatConf(r.trk.Voters[0].Slice()), formatConf(r.trk.Voters[1].Slice())},
Conf: [2][]string{formatConf(r.trk.Voters.Incoming.Slice()), formatConf(r.trk.Voters.Outgoing.Slice())},
Role: r.state.String(),
Message: makeTracingMessage(m),
Properties: prop,
Expand Down Expand Up @@ -276,7 +276,7 @@ func traceConfChangeEvent(cfg tracker.Config, r *raft) {

cc := &TracingConfChange{
Changes: []SingleConfChange{},
NewConf: formatConf(cfg.Voters[0].Slice()),
NewConf: formatConf(cfg.Voters.Incoming.Slice()),
}

p := map[string]any{}
Expand Down
8 changes: 4 additions & 4 deletions tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (c *Config) Clone() Config {
return mm
}
return Config{
Voters: quorum.JointConfig{clone(c.Voters[0]), clone(c.Voters[1])},
Voters: quorum.JointConfig{clone(c.Voters.Incoming), clone(c.Voters.Outgoing)},
Learners: clone(c.Learners),
LearnersNext: clone(c.LearnersNext),
}
Expand Down Expand Up @@ -147,8 +147,8 @@ func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker {
// ConfState returns a ConfState representing the active configuration.
func (p *ProgressTracker) ConfState() pb.ConfState {
return pb.ConfState{
Voters: p.Voters[0].Slice(),
VotersOutgoing: p.Voters[1].Slice(),
Voters: p.Voters.Incoming.Slice(),
VotersOutgoing: p.Voters.Outgoing.Slice(),
Learners: quorum.MajorityConfig(p.Learners).Slice(),
LearnersNext: quorum.MajorityConfig(p.LearnersNext).Slice(),
AutoLeave: p.AutoLeave,
Expand All @@ -158,7 +158,7 @@ func (p *ProgressTracker) ConfState() pb.ConfState {
// IsSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *ProgressTracker) IsSingleton() bool {
return len(p.Voters[0]) == 1 && len(p.Voters[1]) == 0
return len(p.Voters.Incoming) == 1 && len(p.Voters.Outgoing) == 0
}

type matchAckIndexer map[uint64]*Progress
Expand Down