Skip to content

Commit 447b0d2

Browse files
committed
1. solve the issue that commit index may go back in some extreme situation during config change
2. code cleanup
1 parent 12dc686 commit 447b0d2

23 files changed

+245
-239
lines changed

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/BasicStateStoreTest.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.testng.Assert;
3535
import org.testng.annotations.Test;
3636

37+
/**
38+
* Base class for state store test cases.
39+
*/
3740
public abstract class BasicStateStoreTest {
3841

3942
protected abstract IRaftStateStore createStorage(String id, Snapshot initSnapshot);
@@ -75,12 +78,6 @@ public void testSaveAndLoadCurrentTerm() {
7578
assertEquals(stateStorage.currentTerm(), 0);
7679
stateStorage.saveTerm(1);
7780
assertEquals(stateStorage.currentTerm(), 1);
78-
try {
79-
stateStorage.saveTerm(0);
80-
fail();
81-
} catch (Throwable t) {
82-
83-
}
8481
}
8582

8683
@Test
@@ -127,7 +124,7 @@ public void testAppendFirstEntryFailedWithLowerBoundCheck() {
127124
.build();
128125
IRaftStateStore stateStorage = createStorage(localId(), initSnapshot);
129126
try {
130-
stateStorage.append(asList(LogEntry.newBuilder()
127+
stateStorage.append(List.of(LogEntry.newBuilder()
131128
.setTerm(1)
132129
.setIndex(5)
133130
.setData(ByteString.EMPTY)
@@ -137,7 +134,7 @@ public void testAppendFirstEntryFailedWithLowerBoundCheck() {
137134
assertTrue(e instanceof IndexOutOfBoundsException);
138135
}
139136
try {
140-
stateStorage.append(asList(LogEntry.newBuilder()
137+
stateStorage.append(List.of(LogEntry.newBuilder()
141138
.setTerm(1)
142139
.setIndex(7)
143140
.setData(ByteString.EMPTY)
@@ -159,7 +156,7 @@ public void testAppendFirstEntryFailedWithLowerBoundCheck() {
159156
.build()), true);
160157

161158
try {
162-
stateStorage.append(asList(LogEntry.newBuilder()
159+
stateStorage.append(List.of(LogEntry.newBuilder()
163160
.setTerm(1)
164161
.setIndex(5)
165162
.setData(ByteString.EMPTY)
@@ -169,7 +166,7 @@ public void testAppendFirstEntryFailedWithLowerBoundCheck() {
169166
assertTrue(e instanceof IndexOutOfBoundsException);
170167
}
171168
try {
172-
stateStorage.append(asList(LogEntry.newBuilder()
169+
stateStorage.append(List.of(LogEntry.newBuilder()
173170
.setTerm(1)
174171
.setIndex(9)
175172
.setData(ByteString.EMPTY)
@@ -200,7 +197,7 @@ public void testAsyncAppend() throws InterruptedException {
200197
assertEquals(stateStorage.entryAt(stateStorage.lastIndex()).get().getIndex(), stateStorage.lastIndex());
201198
}
202199
countDownLatch.await();
203-
assertTrue(1 <= stabledIndexes.size());
200+
assertFalse(stabledIndexes.isEmpty());
204201
assertTrue(stabledIndexes.get(0) >= 1);
205202
}
206203

@@ -270,7 +267,7 @@ public void testTruncateWholeLog() {
270267
assertEquals(stateStorage.firstIndex(), 6);
271268
assertEquals(stateStorage.lastIndex(), 5);
272269

273-
List<LogEntry> newEntries = asList(LogEntry.newBuilder()
270+
List<LogEntry> newEntries = List.of(LogEntry.newBuilder()
274271
.setTerm(1)
275272
.setIndex(6)
276273
.setData(ByteString.EMPTY)
@@ -293,7 +290,7 @@ public void testTruncateWholeLog() {
293290
assertEquals(stateStorage.firstIndex(), 6);
294291
assertEquals(stateStorage.lastIndex(), 7);
295292

296-
newEntries = asList(LogEntry.newBuilder()
293+
newEntries = List.of(LogEntry.newBuilder()
297294
.setTerm(1)
298295
.setIndex(6)
299296
.setData(ByteString.EMPTY)

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/ClusterConfigHelper.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/IPeerLogReplicator.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,62 +13,61 @@
1313

1414
package com.baidu.bifromq.basekv.raft;
1515

16-
1716
import com.baidu.bifromq.basekv.raft.proto.RaftNodeSyncState;
1817

1918
interface IPeerLogReplicator {
2019

2120
/**
22-
* Current matching index
21+
* Current matching index.
2322
*
24-
* @return
23+
* @return the index of the last log entry that has been replicated to the given peer
2524
*/
2625
long matchIndex();
2726

2827
/**
29-
* Next index to send
28+
* Next index to send.
3029
*
31-
* @return
30+
* @return the index of the next log entry to send to the given peer
3231
*/
3332
long nextIndex();
3433

3534
/**
36-
* Current status
35+
* Current status of the replicator.
3736
*
38-
* @return
37+
* @return the current status of the replicator
3938
*/
4039
RaftNodeSyncState status();
4140

4241
/**
43-
* an external clock signal to drive the state machine forward in case no other stimuli available
42+
* an external clock signal to drive the state machine forward in case no other stimuli happens.
4443
*
4544
* @return true if the replicator has changed its state after tick
4645
*/
4746
boolean tick();
4847

4948
/**
50-
* the amount of matchIndex advanced per tick always non-negative
49+
* the amount of matchIndex advanced per tick always non-negative.
5150
*
52-
* @return
51+
* @return the amount of matchIndex advanced per tick
5352
*/
5453
long catchupRate();
5554

5655
/**
57-
* a flag indicating whether the append entries for given peer should be paused
56+
* a flag indicating whether the append entries for given peer should be paused.
5857
*
59-
* @return
58+
* @return true if the replicator has changed its state after calling this method
6059
*/
6160
boolean pauseReplicating();
6261

6362
/**
64-
* a flag indicating whether the given peer need a heartbeat due to heartbeatTimeoutTick exceed
63+
* a flag indicating whether the given peer need a heartbeat due to heartbeatTimeoutTick exceed.
6564
*
6665
* @return true if peer need a heartbeat
6766
*/
6867
boolean needHeartbeat();
6968

7069
/**
71-
* backoff the next index when peer follower rejected the append entries request
70+
* backoff the next index when peer follower rejected the append entries request.
7271
*
7372
* @param peerRejectedIndex the index of mismatched log which is literally the prevLogIndex in appendEntries rpc
7473
* @param peerLastIndex the index of last log entry in peer's raft log
@@ -77,17 +76,17 @@ interface IPeerLogReplicator {
7776
boolean backoff(long peerRejectedIndex, long peerLastIndex);
7877

7978
/**
80-
* update the match index when peer follower accepted the append entries request
79+
* update the match index when peer follower accepted the append entries request.
8180
*
8281
* @param peerLastIndex the index of last log entry in peer's raft log
8382
* @return true if the replicator has changed its state after calling this method
8483
*/
8584
boolean confirmMatch(long peerLastIndex);
8685

8786
/**
88-
* advance the next index after sending log entries up to endIndex(inclusively) to follower
87+
* advance the next index after sending log entries up to endIndex(inclusively) to follower.
8988
*
90-
* @param endIndex
89+
* @param endIndex the index of the last log entry to send to the given peer
9190
* @return true if the replicator has changed its state after calling this method
9291
*/
9392
boolean replicateBy(long endIndex);

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/IRaftNodeState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import com.baidu.bifromq.basekv.raft.proto.ClusterConfig;
1717
import com.baidu.bifromq.basekv.raft.proto.RaftNodeStatus;
1818

19-
public interface IRaftNodeState {
19+
interface IRaftNodeState {
2020
String id();
2121

2222
RaftNodeStatus getState();

0 commit comments

Comments
 (0)