Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,9 @@ public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest req
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
}
} else {
//To make it simple, for larger term, do not change to follower immediately
//first change to candidate, and notify the state-maintainer thread
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//TOOD notify
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
//stepped down by larger term - convert to follower immediately (per Raft protocol)
changeRoleToFollower(request.getTerm(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse());
}
}
}
Expand All @@ -157,6 +154,7 @@ public void changeRoleToLeader(long term) {
if (memberState.currTerm() == term) {
memberState.changeToLeader(term);
lastSendHeartBeatTime = -1;
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; // Reset parse result state
handleRoleChange(term, MemberState.Role.LEADER);
LOGGER.info("[{}] [ChangeRoleToLeader] from term: {} and currTerm: {}", memberState.getSelfId(), term, memberState.currTerm());
} else {
Expand All @@ -169,6 +167,8 @@ public void changeRoleToCandidate(long term) {
synchronized (memberState) {
if (term >= memberState.currTerm()) {
memberState.changeToCandidate(term);
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; // Reset parse result for new election
nextTimeToRequestVote = -1; // Reset vote timing to allow immediate first vote
handleRoleChange(term, MemberState.Role.CANDIDATE);
LOGGER.info("[{}] [ChangeRoleToCandidate] from term: {} and currTerm: {}", memberState.getSelfId(), term, memberState.currTerm());
} else {
Expand All @@ -184,12 +184,24 @@ public void testRevote(long term) {
nextTimeToRequestVote = -1;
}

//just for test
public void setLastLeaderHeartBeatTime(long time) {
this.lastLeaderHeartBeatTime = time;
}
public long getLastLeaderHeartBeatTime() {
return lastLeaderHeartBeatTime;
}

public void changeRoleToFollower(long term, String leaderId) {
LOGGER.info("[{}][ChangeRoleToFollower] from term: {} leaderId: {} and currTerm: {}", memberState.getSelfId(), term, leaderId, memberState.currTerm());
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
memberState.changeToFollower(term, leaderId);
lastLeaderHeartBeatTime = System.currentTimeMillis();
handleRoleChange(term, MemberState.Role.FOLLOWER);
synchronized (memberState) {
LOGGER.info("[{}][ChangeRoleToFollower] from term: {} leaderId: {} and currTerm: {}",
memberState.getSelfId(), term, leaderId, memberState.currTerm());
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = -1; // Reset vote timing to prevent immediate voting
memberState.changeToFollower(term, leaderId);
lastLeaderHeartBeatTime = System.currentTimeMillis();
handleRoleChange(term, MemberState.Role.FOLLOWER);
}
}

public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean self) {
Expand All @@ -204,32 +216,36 @@ public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean s
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
}

// Step 1: Check term first - if request has higher term, update our term and convert to follower
if (request.getTerm() > memberState.currTerm()) {
//stepped down by larger term - convert to follower first
changeRoleToFollower(request.getTerm(), null);
// After term update, we can now process the vote request in the new term
// Continue to ledger and other checks below
} else if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
}

// Step 2: Now we're in the same term, check ledger conditions
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
}

if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) {
if (memberState.currVoteFor() == null) {
//let it go
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
//repeat just let it go
// Step 3: Check vote conditions in the same term
if (memberState.currVoteFor() == null) {
// let it go
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
// repeat just let it go
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm())
.voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER));
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER));
} else {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm())
.voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
} else {
//stepped down by larger term
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//only can handleVote when the term is consistent
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}

if (request.getTerm() < memberState.getLedgerEndTerm()) {
Expand Down Expand Up @@ -419,7 +435,7 @@ private void maintainAsCandidate() throws Exception {
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();
term = memberState.nextTerm();
LOGGER.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
LOGGER.info("{}_[INCREASE_TERM] from {} to {}, lastParseResult: {}, needIncreaseTermImmediately: {}", memberState.getSelfId(), prevTerm, term, lastParseResult, needIncreaseTermImmediately);
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,15 @@ public synchronized void changeToLeader(long term) {
}

public synchronized void changeToFollower(long term, String leaderId) {
PreConditions.check(currTerm == term, DLedgerResponseCode.ILLEGAL_MEMBER_STATE, "%d != %d", currTerm, term);
PreConditions.check(term >= currTerm, DLedgerResponseCode.ILLEGAL_MEMBER_STATE, "should %d >= %d", term, currTerm);
if (term != currTerm) {
currTerm = term;
currVoteFor = null;
persistState();
}
if (term > knownMaxTermInGroup) {
knownMaxTermInGroup = term;
}
this.role = FOLLOWER;
this.leaderId = leaderId;
transferee = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.openmessaging.storage.dledger;

import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
Expand Down Expand Up @@ -294,5 +295,84 @@ public void testThreeServerAndPreferredLeader() throws Exception {
Assertions.assertNotNull(leaderServer);
Assertions.assertEquals(preferredLeaderId, leaderServer.getDLedgerConfig().getSelfId());
}

@Test
public void testFollowerElectionWithHigherTermAndLessIndex() throws Exception {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
List<DLedgerServer> servers = new ArrayList<>();
servers.add(launchServer(group, peers, "n0"));
servers.add(launchServer(group, peers, "n1"));
servers.add(launchServer(group, peers, "n2"));

Thread.sleep(5000);
AtomicInteger leaderNum = new AtomicInteger(0);
AtomicInteger followerNum = new AtomicInteger(0);
DLedgerServer leaderServer = parseServers(servers, leaderNum, followerNum);
Assertions.assertEquals(1, leaderNum.get());
Assertions.assertEquals(2, followerNum.get());
Assertions.assertNotNull(leaderServer);
String originalLeaderId = leaderServer.getMemberState().getSelfId();
String originalLeaderAddr = leaderServer.getMemberState().getSelfAddr();
DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]);
for (int i = 0; i < 10; i++) {
AppendEntryResponse appendEntryResponse = dLedgerClient.append(("HelloThreeServerInMemory" + i).getBytes());
Assertions.assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode());
Assertions.assertEquals(i, appendEntryResponse.getIndex());
}
Thread.sleep(1000);
Assertions.assertEquals(9, leaderServer.getDLedgerStore().getLedgerEndIndex());
DLedgerServer followerServer = null;
for (DLedgerServer server : servers) {
if (!server.getMemberState().isLeader()) {
followerServer = server;
break;
}
}
Assertions.assertNotNull(followerServer);
String followerId = followerServer.getMemberState().getSelfId();
String followerAddr = followerServer.getMemberState().getSelfAddr();

DLedgerLeaderElector leaderElector = followerServer.getDLedgerLeaderElector();
simulatePartition(followerServer, leaderServer);

int heartBeatTimeIntervalMs = leaderServer.getDLedgerConfig().getHeartBeatTimeIntervalMs();
int heartbeatLeak = leaderServer.getDLedgerConfig().getMaxHeartBeatLeak();
long lastBeat = leaderElector.getLastLeaderHeartBeatTime();
if (DLedgerUtils.elapsed(lastBeat) > heartBeatTimeIntervalMs - 200) {
// wait heartbeat send, to avoid the candidate test interrupt by leader heartbeat
Thread.sleep(heartBeatTimeIntervalMs / 2);
}
// make sure vote result is REJECT_SMALL_LEDGER_END_INDEX
followerServer.getMemberState().updateLedgerIndexAndTerm(followerServer.getMemberState().getLedgerEndIndex() - 1, leaderServer.getMemberState().currTerm());
leaderElector.setLastLeaderHeartBeatTime(System.currentTimeMillis() - 10000);
// should wait no more than one heartbeat and wait no more than maxVoteIntervalMs
Thread.sleep(100);

Assertions.assertTrue(followerServer.getMemberState().isCandidate(), "Follower did not become candidate after heartbeat timeout");
long followerTerm = followerServer.getMemberState().currTerm();
Assertions.assertTrue(followerTerm > 0, "Follower term should be greater than 0 after becoming candidate");
leaderElector.testRevote(followerTerm);
followerServer.getMemberState().getPeerMap().put(originalLeaderId, originalLeaderAddr);
leaderServer.getMemberState().getPeerMap().put(followerId, followerAddr);
Thread.sleep(heartBeatTimeIntervalMs * heartbeatLeak + 1000);

// test all term should increase
long newTerm = followerServer.getMemberState().currTerm();
Assertions.assertTrue(newTerm > followerTerm, "Follower term should increase after revote : " + newTerm + " vs " + followerTerm);
leaderNum.set(0);
followerNum.set(0);
leaderServer = parseServers(servers, leaderNum, followerNum);
Assertions.assertEquals(1, leaderNum.get(), "Should have exactly one leader");
Assertions.assertEquals(2, followerNum.get(), "Should have exactly two followers");
Assertions.assertNotNull(leaderServer, "Leader should not be null");
Assertions.assertNotEquals(followerId, leaderServer.getMemberState().getSelfId(), "Original leader should remain leader after network recovery");

long finalTerm = leaderServer.getMemberState().currTerm();
for (DLedgerServer server : servers) {
Assertions.assertEquals(finalTerm, server.getMemberState().currTerm(), "All servers should have the same term");
}
Assertions.assertTrue(finalTerm > followerTerm, "final term should increase after revote : " + finalTerm + " vs " + followerTerm);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public void testHeartbeat() throws Exception {
request.setGroup(group);
request.setTerm(leader.getMemberState().currTerm() + 1);
request.setIds(leader.getMemberState().getSelfId(), follower.getMemberState().getSelfId(), leader.getMemberState().getSelfId());
Assertions.assertEquals(DLedgerResponseCode.TERM_NOT_READY.getCode(), follower.handleHeartBeat(request).get().getCode());
Thread.sleep(100);
Assertions.assertEquals(DLedgerResponseCode.SUCCESS.getCode(), follower.handleHeartBeat(request).get().getCode());
}
Expand Down