diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java index e23bff61..c27a952a 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java @@ -142,12 +142,9 @@ public CompletableFuture handleHeartBeat(HeartBeatRequest req return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode())); } } else { - //To make it simple, for larger term, do not change to follower immediately - //first change to candidate, and notify the state-maintainer thread - changeRoleToCandidate(request.getTerm()); - needIncreaseTermImmediately = true; - //TOOD notify - return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode())); + //stepped down by larger term - convert to follower immediately (per Raft protocol) + changeRoleToFollower(request.getTerm(), request.getLeaderId()); + return CompletableFuture.completedFuture(new HeartBeatResponse()); } } } @@ -157,6 +154,7 @@ public void changeRoleToLeader(long term) { if (memberState.currTerm() == term) { memberState.changeToLeader(term); lastSendHeartBeatTime = -1; + lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; // Reset parse result state handleRoleChange(term, MemberState.Role.LEADER); LOGGER.info("[{}] [ChangeRoleToLeader] from term: {} and currTerm: {}", memberState.getSelfId(), term, memberState.currTerm()); } else { @@ -169,6 +167,8 @@ public void changeRoleToCandidate(long term) { synchronized (memberState) { if (term >= memberState.currTerm()) { memberState.changeToCandidate(term); + lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; // Reset parse result for new election + nextTimeToRequestVote = -1; // Reset vote timing to allow immediate first vote handleRoleChange(term, MemberState.Role.CANDIDATE); LOGGER.info("[{}] [ChangeRoleToCandidate] from term: {} and currTerm: {}", memberState.getSelfId(), term, memberState.currTerm()); } else { @@ -184,12 +184,24 @@ public void testRevote(long term) { nextTimeToRequestVote = -1; } + //just for test + public void setLastLeaderHeartBeatTime(long time) { + this.lastLeaderHeartBeatTime = time; + } + public long getLastLeaderHeartBeatTime() { + return lastLeaderHeartBeatTime; + } + public void changeRoleToFollower(long term, String leaderId) { - LOGGER.info("[{}][ChangeRoleToFollower] from term: {} leaderId: {} and currTerm: {}", memberState.getSelfId(), term, leaderId, memberState.currTerm()); - lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; - memberState.changeToFollower(term, leaderId); - lastLeaderHeartBeatTime = System.currentTimeMillis(); - handleRoleChange(term, MemberState.Role.FOLLOWER); + synchronized (memberState) { + LOGGER.info("[{}][ChangeRoleToFollower] from term: {} leaderId: {} and currTerm: {}", + memberState.getSelfId(), term, leaderId, memberState.currTerm()); + lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; + nextTimeToRequestVote = -1; // Reset vote timing to prevent immediate voting + memberState.changeToFollower(term, leaderId); + lastLeaderHeartBeatTime = System.currentTimeMillis(); + handleRoleChange(term, MemberState.Role.FOLLOWER); + } } public CompletableFuture handleVote(VoteRequest request, boolean self) { @@ -204,32 +216,36 @@ public CompletableFuture handleVote(VoteRequest request, boolean s return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER)); } + // Step 1: Check term first - if request has higher term, update our term and convert to follower + if (request.getTerm() > memberState.currTerm()) { + //stepped down by larger term - convert to follower first + changeRoleToFollower(request.getTerm(), null); + // After term update, we can now process the vote request in the new term + // Continue to ledger and other checks below + } else if (request.getTerm() < memberState.currTerm()) { + return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM)); + } + + // Step 2: Now we're in the same term, check ledger conditions if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM)); } else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) { return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX)); } - if (request.getTerm() < memberState.currTerm()) { - return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM)); - } else if (request.getTerm() == memberState.currTerm()) { - if (memberState.currVoteFor() == null) { - //let it go - } else if (memberState.currVoteFor().equals(request.getLeaderId())) { - //repeat just let it go + // Step 3: Check vote conditions in the same term + if (memberState.currVoteFor() == null) { + // let it go + } else if (memberState.currVoteFor().equals(request.getLeaderId())) { + // repeat just let it go + } else { + if (memberState.getLeaderId() != null) { + return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()) + .voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER)); } else { - if (memberState.getLeaderId() != null) { - return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER)); - } else { - return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED)); - } + return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()) + .voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED)); } - } else { - //stepped down by larger term - changeRoleToCandidate(request.getTerm()); - needIncreaseTermImmediately = true; - //only can handleVote when the term is consistent - return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY)); } if (request.getTerm() < memberState.getLedgerEndTerm()) { @@ -419,7 +435,7 @@ private void maintainAsCandidate() throws Exception { if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) { long prevTerm = memberState.currTerm(); term = memberState.nextTerm(); - LOGGER.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term); + LOGGER.info("{}_[INCREASE_TERM] from {} to {}, lastParseResult: {}, needIncreaseTermImmediately: {}", memberState.getSelfId(), prevTerm, term, lastParseResult, needIncreaseTermImmediately); lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE; } else { term = memberState.currTerm(); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/MemberState.java b/dledger/src/main/java/io/openmessaging/storage/dledger/MemberState.java index 975c0133..c4232962 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/MemberState.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/MemberState.java @@ -151,7 +151,15 @@ public synchronized void changeToLeader(long term) { } public synchronized void changeToFollower(long term, String leaderId) { - PreConditions.check(currTerm == term, DLedgerResponseCode.ILLEGAL_MEMBER_STATE, "%d != %d", currTerm, term); + PreConditions.check(term >= currTerm, DLedgerResponseCode.ILLEGAL_MEMBER_STATE, "should %d >= %d", term, currTerm); + if (term != currTerm) { + currTerm = term; + currVoteFor = null; + persistState(); + } + if (term > knownMaxTermInGroup) { + knownMaxTermInGroup = term; + } this.role = FOLLOWER; this.leaderId = leaderId; transferee = null; diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/LeaderElectorTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/LeaderElectorTest.java index c4c32489..8a3c54ec 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/LeaderElectorTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/LeaderElectorTest.java @@ -16,6 +16,7 @@ package io.openmessaging.storage.dledger; +import io.openmessaging.storage.dledger.client.DLedgerClient; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; @@ -294,5 +295,84 @@ public void testThreeServerAndPreferredLeader() throws Exception { Assertions.assertNotNull(leaderServer); Assertions.assertEquals(preferredLeaderId, leaderServer.getDLedgerConfig().getSelfId()); } + + @Test + public void testFollowerElectionWithHigherTermAndLessIndex() throws Exception { + String group = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort()); + List servers = new ArrayList<>(); + servers.add(launchServer(group, peers, "n0")); + servers.add(launchServer(group, peers, "n1")); + servers.add(launchServer(group, peers, "n2")); + + Thread.sleep(5000); + AtomicInteger leaderNum = new AtomicInteger(0); + AtomicInteger followerNum = new AtomicInteger(0); + DLedgerServer leaderServer = parseServers(servers, leaderNum, followerNum); + Assertions.assertEquals(1, leaderNum.get()); + Assertions.assertEquals(2, followerNum.get()); + Assertions.assertNotNull(leaderServer); + String originalLeaderId = leaderServer.getMemberState().getSelfId(); + String originalLeaderAddr = leaderServer.getMemberState().getSelfAddr(); + DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]); + for (int i = 0; i < 10; i++) { + AppendEntryResponse appendEntryResponse = dLedgerClient.append(("HelloThreeServerInMemory" + i).getBytes()); + Assertions.assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); + Assertions.assertEquals(i, appendEntryResponse.getIndex()); + } + Thread.sleep(1000); + Assertions.assertEquals(9, leaderServer.getDLedgerStore().getLedgerEndIndex()); + DLedgerServer followerServer = null; + for (DLedgerServer server : servers) { + if (!server.getMemberState().isLeader()) { + followerServer = server; + break; + } + } + Assertions.assertNotNull(followerServer); + String followerId = followerServer.getMemberState().getSelfId(); + String followerAddr = followerServer.getMemberState().getSelfAddr(); + + DLedgerLeaderElector leaderElector = followerServer.getDLedgerLeaderElector(); + simulatePartition(followerServer, leaderServer); + + int heartBeatTimeIntervalMs = leaderServer.getDLedgerConfig().getHeartBeatTimeIntervalMs(); + int heartbeatLeak = leaderServer.getDLedgerConfig().getMaxHeartBeatLeak(); + long lastBeat = leaderElector.getLastLeaderHeartBeatTime(); + if (DLedgerUtils.elapsed(lastBeat) > heartBeatTimeIntervalMs - 200) { + // wait heartbeat send, to avoid the candidate test interrupt by leader heartbeat + Thread.sleep(heartBeatTimeIntervalMs / 2); + } + // make sure vote result is REJECT_SMALL_LEDGER_END_INDEX + followerServer.getMemberState().updateLedgerIndexAndTerm(followerServer.getMemberState().getLedgerEndIndex() - 1, leaderServer.getMemberState().currTerm()); + leaderElector.setLastLeaderHeartBeatTime(System.currentTimeMillis() - 10000); + // should wait no more than one heartbeat and wait no more than maxVoteIntervalMs + Thread.sleep(100); + + Assertions.assertTrue(followerServer.getMemberState().isCandidate(), "Follower did not become candidate after heartbeat timeout"); + long followerTerm = followerServer.getMemberState().currTerm(); + Assertions.assertTrue(followerTerm > 0, "Follower term should be greater than 0 after becoming candidate"); + leaderElector.testRevote(followerTerm); + followerServer.getMemberState().getPeerMap().put(originalLeaderId, originalLeaderAddr); + leaderServer.getMemberState().getPeerMap().put(followerId, followerAddr); + Thread.sleep(heartBeatTimeIntervalMs * heartbeatLeak + 1000); + + // test all term should increase + long newTerm = followerServer.getMemberState().currTerm(); + Assertions.assertTrue(newTerm > followerTerm, "Follower term should increase after revote : " + newTerm + " vs " + followerTerm); + leaderNum.set(0); + followerNum.set(0); + leaderServer = parseServers(servers, leaderNum, followerNum); + Assertions.assertEquals(1, leaderNum.get(), "Should have exactly one leader"); + Assertions.assertEquals(2, followerNum.get(), "Should have exactly two followers"); + Assertions.assertNotNull(leaderServer, "Leader should not be null"); + Assertions.assertNotEquals(followerId, leaderServer.getMemberState().getSelfId(), "Original leader should remain leader after network recovery"); + + long finalTerm = leaderServer.getMemberState().currTerm(); + for (DLedgerServer server : servers) { + Assertions.assertEquals(finalTerm, server.getMemberState().currTerm(), "All servers should have the same term"); + } + Assertions.assertTrue(finalTerm > followerTerm, "final term should increase after revote : " + finalTerm + " vs " + followerTerm); + } } diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/protocol/HeartbeatRequestTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/protocol/HeartbeatRequestTest.java index 722dd247..d36952cf 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/protocol/HeartbeatRequestTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/protocol/HeartbeatRequestTest.java @@ -100,7 +100,6 @@ public void testHeartbeat() throws Exception { request.setGroup(group); request.setTerm(leader.getMemberState().currTerm() + 1); request.setIds(leader.getMemberState().getSelfId(), follower.getMemberState().getSelfId(), leader.getMemberState().getSelfId()); - Assertions.assertEquals(DLedgerResponseCode.TERM_NOT_READY.getCode(), follower.handleHeartBeat(request).get().getCode()); Thread.sleep(100); Assertions.assertEquals(DLedgerResponseCode.SUCCESS.getCode(), follower.handleHeartBeat(request).get().getCode()); }