Skip to content

Commit 5187b00

Browse files
committed
base-kv improvements:
1. quit when FSM and WAL mismatch after restored from snapshot 2. using dedicated executor for dumping snapshot data 3. check range version before entering WAL & query queue
1 parent 856b916 commit 5187b00

File tree

15 files changed

+265
-122
lines changed

15 files changed

+265
-122
lines changed

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/RaftNodeState.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,6 @@
4444
import org.slf4j.Logger;
4545

4646
abstract class RaftNodeState implements IRaftNodeState {
47-
public interface OnSnapshotInstalled {
48-
CompletableFuture<Void> done(ByteString requested, ByteString installed, Throwable ex);
49-
}
50-
51-
/**
52-
* The future of Uncommitted propose request.
53-
*/
54-
protected static class ProposeTask {
55-
final long term;
56-
final CompletableFuture<Long> future;
57-
58-
ProposeTask(long term, CompletableFuture<Long> future) {
59-
this.term = term;
60-
this.future = future;
61-
}
62-
}
6347

6448
protected final String id;
6549
protected final RaftConfig config;
@@ -73,7 +57,6 @@ protected static class ProposeTask {
7357
protected final int maxUncommittedProposals;
7458
protected final String[] tags;
7559
protected volatile long commitIndex;
76-
7760
public RaftNodeState(
7861
long currentTerm,
7962
long commitIndex,
@@ -226,7 +209,6 @@ protected int randomizeElectionTimeoutTick() {
226209
ThreadLocalRandom.current().nextInt(1, config.getElectionTimeoutTick() + 1);
227210
}
228211

229-
230212
protected void submitRaftMessages(Map<String, List<RaftMessage>> messages) {
231213
Map<String, List<RaftMessage>> sendMessages = messages.entrySet().stream()
232214
.filter(entry -> !entry.getValue().isEmpty())
@@ -385,4 +367,21 @@ protected void handleLowTermMessage(String fromPeer, RaftMessage message) {
385367
// ignore other messages other than the leader issues
386368
}
387369
}
370+
371+
interface OnSnapshotInstalled {
372+
CompletableFuture<Void> done(ByteString requested, ByteString installed, Throwable ex);
373+
}
374+
375+
/**
376+
* The future of Uncommitted propose request.
377+
*/
378+
protected static class ProposeTask {
379+
final long term;
380+
final CompletableFuture<Long> future;
381+
382+
ProposeTask(long term, CompletableFuture<Long> future) {
383+
this.term = term;
384+
this.future = future;
385+
}
386+
}
388387
}

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/RaftNodeStateFollower.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.baidu.bifromq.basekv.raft.exception.LeaderTransferException;
2121
import com.baidu.bifromq.basekv.raft.exception.ReadIndexException;
2222
import com.baidu.bifromq.basekv.raft.exception.RecoveryException;
23+
import com.baidu.bifromq.basekv.raft.exception.SnapshotException;
2324
import com.baidu.bifromq.basekv.raft.proto.AppendEntries;
2425
import com.baidu.bifromq.basekv.raft.proto.AppendEntriesReply;
2526
import com.baidu.bifromq.basekv.raft.proto.InstallSnapshot;
@@ -46,12 +47,6 @@
4647
import java.util.concurrent.CompletableFuture;
4748

4849
class RaftNodeStateFollower extends RaftNodeState {
49-
private static class StabilizingTask {
50-
int pendingReplyCount = 0;
51-
long readIndex = -1;
52-
boolean committed = false;
53-
}
54-
5550
private final TreeMap<Long, StabilizingTask> stabilizingIndexes = new TreeMap<>(Long::compareTo);
5651
private final LinkedHashMap<Long, Set<Integer>> tickToReadRequestsMap;
5752
private final Map<Integer, CompletableFuture<Long>> idToReadRequestMap;
@@ -384,12 +379,20 @@ void changeClusterConfig(String correlateId,
384379
@Override
385380
void onSnapshotRestored(ByteString requested, ByteString installed, Throwable ex, CompletableFuture<Void> onDone) {
386381
if (currentISSRequest == null) {
382+
log.debug("Snapshot installation request not found");
383+
onDone.completeExceptionally(new SnapshotException("No snapshot installation request"));
387384
return;
388385
}
389386
InstallSnapshot iss = currentISSRequest;
390387
Snapshot snapshot = iss.getSnapshot();
391388
if (snapshot.getData() != requested) {
392-
log.debug("Skip reply for obsolete snapshot installation");
389+
if (ex != null) {
390+
log.debug("Obsolete snapshot install failed", ex);
391+
onDone.completeExceptionally(ex);
392+
} else {
393+
log.debug("Obsolete snapshot installation");
394+
onDone.completeExceptionally(new SnapshotException("Obsolete snapshot installed by FSM"));
395+
}
393396
return;
394397
}
395398
currentISSRequest = null;
@@ -409,7 +412,7 @@ void onSnapshotRestored(ByteString requested, ByteString installed, Throwable ex
409412
submitRaftMessages(iss.getLeaderId(), reply);
410413
onDone.completeExceptionally(ex);
411414
} else {
412-
log.debug("Snapshot[index:{},term:{}] accepted by FSM", snapshot.getIndex(), snapshot.getTerm());
415+
log.info("Snapshot[index:{},term:{}] accepted by FSM", snapshot.getIndex(), snapshot.getTerm());
413416
try {
414417
// replace fsm snapshot data with the installed one
415418
snapshot = snapshot.toBuilder().setData(installed).build();
@@ -711,4 +714,10 @@ private boolean entryMatch(long index, long term) {
711714
return snapshot.getIndex() == index && snapshot.getTerm() == term;
712715
}
713716
}
717+
718+
private static class StabilizingTask {
719+
int pendingReplyCount = 0;
720+
long readIndex = -1;
721+
boolean committed = false;
722+
}
714723
}

base-kv/base-kv-raft/src/main/java/com/baidu/bifromq/basekv/raft/exception/InternalError.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
package com.baidu.bifromq.basekv.raft.exception;
1515

1616
public class InternalError extends Error {
17+
public InternalError(String message) {
18+
super(message);
19+
}
20+
1721
public InternalError(Throwable e) {
1822
super(e);
1923
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package com.baidu.bifromq.basekv.raft.exception;
15+
16+
public class SnapshotException extends RuntimeException {
17+
public SnapshotException(String message) {
18+
super(message);
19+
}
20+
21+
public SnapshotException(Throwable e) {
22+
super(e);
23+
}
24+
}

base-kv/base-kv-store-balance-controller/src/main/java/com/baidu/bifromq/basekv/balance/impl/RangeSplitBalancer.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313

1414
package com.baidu.bifromq.basekv.balance.impl;
1515

16+
import static com.baidu.bifromq.basekv.utils.BoundaryUtil.compareEndKeys;
17+
import static com.baidu.bifromq.basekv.utils.BoundaryUtil.compareStartKey;
18+
import static com.baidu.bifromq.basekv.utils.BoundaryUtil.endKey;
19+
import static com.baidu.bifromq.basekv.utils.BoundaryUtil.startKey;
20+
1621
import com.baidu.bifromq.basekv.proto.Boundary;
1722
import com.baidu.bifromq.basekv.proto.KVRangeDescriptor;
1823
import com.baidu.bifromq.basekv.proto.KVRangeStoreDescriptor;
@@ -136,14 +141,20 @@ protected Map<Boundary, ClusterConfig> doGenerate(Struct loadRules,
136141
&& ioDensity > maxIODensityPerRange
137142
&& storeDescriptor.getRangesList().size() < maxRangesPerStore
138143
&& splitHint.hasSplitKey()) {
139-
expectedRangeLayout.put(boundary
140-
.toBuilder()
141-
.setEndKey(splitHint.getSplitKey())
142-
.build(), clusterConfig);
143-
expectedRangeLayout.put(boundary
144-
.toBuilder()
145-
.setStartKey(splitHint.getSplitKey())
146-
.build(), clusterConfig);
144+
if (compareStartKey(startKey(boundary), splitHint.getSplitKey()) < 0
145+
&& compareEndKeys(splitHint.getSplitKey(), endKey(boundary)) < 0) {
146+
expectedRangeLayout.put(boundary
147+
.toBuilder()
148+
.setEndKey(splitHint.getSplitKey())
149+
.build(), clusterConfig);
150+
expectedRangeLayout.put(boundary
151+
.toBuilder()
152+
.setStartKey(splitHint.getSplitKey())
153+
.build(), clusterConfig);
154+
} else {
155+
log.warn("Invalid split key in hint: {}, range: {}", splitHint.getSplitKey(), boundary);
156+
expectedRangeLayout.put(boundary, rangeDescriptor.getConfig());
157+
}
147158
} else {
148159
expectedRangeLayout.put(boundary, rangeDescriptor.getConfig());
149160
}

base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/KVRangeStore.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -555,24 +555,35 @@ private CompletableFuture<Void> ensureRange(KVRangeId rangeId, Snapshot walSnaps
555555
}
556556

557557
private void quitKVRange(IKVRangeFSM range) {
558-
mgmtTaskRunner.add(() -> {
558+
if (status.get() != Status.STARTED) {
559+
return;
560+
}
561+
CompletableFuture<Optional<RangeFSMHolder.PinnedRange>> afterDestroyed = mgmtTaskRunner.add(() -> {
559562
if (status.get() != Status.STARTED) {
560-
return CompletableFuture.completedFuture(null);
563+
return CompletableFuture.failedFuture(new KVRangeStoreException("Not started"));
561564
}
562565
RangeFSMHolder holder = kvRangeMap.remove(range.id());
563566
assert holder.fsm == range;
564567
long ver = range.ver();
568+
log.debug("Destroy kvrange: rangeId={}", KVRangeIdUtil.toString(range.id()));
565569
return range.destroy()
566-
.thenCompose(v -> {
570+
.thenApply(v -> {
567571
if (holder.pinned != null && holder.pinned.ver > ver) {
568-
RangeFSMHolder.PinnedRange pinnedRange = holder.pinned;
572+
return Optional.of(holder.pinned);
573+
}
574+
return Optional.empty();
575+
});
576+
});
577+
afterDestroyed.thenCompose(pinned ->
578+
mgmtTaskRunner.add(() -> {
579+
if (pinned.isPresent()) {
580+
RangeFSMHolder.PinnedRange pinnedRange = pinned.get();
569581
log.debug("Recreate range after destroy: rangeId={}", KVRangeIdUtil.toString(range.id()));
570582
return ensureRange(range.id(), pinnedRange.walSnapshot, pinnedRange.fsmSnapshot);
571583
}
572584
return CompletableFuture.completedFuture(null);
573-
})
574-
.thenAccept(v -> updateDescriptorList());
575-
});
585+
}))
586+
.thenAccept(v -> updateDescriptorList());
576587
}
577588

578589
private IKVRangeFSM loadKVRangeFSM(KVRangeId rangeId, IKVRange range, IKVRangeWALStore walStore) {

base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/option/KVRangeOptions.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,21 @@ public class KVRangeOptions {
3636
@Builder.Default
3737
private int compactWALThreshold = 10000; // the max number of logs before compaction
3838
@Builder.Default
39+
private int shrinkWALCheckIntervalSec = 60;
40+
@Builder.Default
3941
private long tickUnitInMS = 100;
4042
@Builder.Default
4143
private int maxWALFatchBatchSize = 5 * 1024 * 1024; // 5MB
4244
@Builder.Default
43-
private int snapshotSyncIdleTimeoutSec = 30;
45+
private int snapshotSyncIdleTimeoutSec = 600; // 10min
4446
@Builder.Default
4547
private int statsCollectIntervalSec = 5;
4648
@Builder.Default
4749
private int zombieTimeoutSec = 300; // 5min
4850
@Builder.Default
4951
private RaftConfig walRaftConfig = new RaftConfig()
5052
.setPreVote(true)
51-
.setInstallSnapshotTimeoutTick(300)
52-
.setElectionTimeoutTick(30)
53-
.setMaxSizePerAppend(10 * 1024 * 1024); // 10MB;
53+
.setInstallSnapshotTimeoutTick(6000) // 10min
54+
.setElectionTimeoutTick(30) // 3 sec
55+
.setMaxSizePerAppend(100 * 1024 * 1024); // 100MB;
5456
}

base-kv/base-kv-store-server/src/main/java/com/baidu/bifromq/basekv/store/range/KVRangeDumpSession.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package com.baidu.bifromq.basekv.store.range;
1515

16+
import com.baidu.bifromq.baseenv.EnvProvider;
1617
import com.baidu.bifromq.basekv.proto.KVPair;
1718
import com.baidu.bifromq.basekv.proto.KVRangeMessage;
1819
import com.baidu.bifromq.basekv.proto.SaveSnapshotDataReply;
@@ -21,29 +22,27 @@
2122
import com.baidu.bifromq.basekv.store.util.AsyncRunner;
2223
import com.baidu.bifromq.logger.SiftLogger;
2324
import com.google.common.util.concurrent.RateLimiter;
25+
import io.micrometer.core.instrument.Metrics;
26+
import io.micrometer.core.instrument.Tags;
27+
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
2428
import io.reactivex.rxjava3.disposables.Disposable;
2529
import java.time.Duration;
2630
import java.util.Optional;
2731
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.Executor;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.LinkedTransferQueue;
34+
import java.util.concurrent.ThreadPoolExecutor;
35+
import java.util.concurrent.TimeUnit;
2936
import java.util.concurrent.atomic.AtomicBoolean;
3037
import java.util.concurrent.atomic.AtomicInteger;
3138
import org.slf4j.Logger;
3239

3340
class KVRangeDumpSession {
34-
enum Result {
35-
OK, NoCheckpoint, Canceled, Abort, Error
36-
}
37-
3841
private final Logger log;
39-
40-
interface DumpBytesRecorder {
41-
void record(int bytes);
42-
}
43-
4442
private final String follower;
4543
private final SnapshotSyncRequest request;
4644
private final IKVRangeMessenger messenger;
45+
private final ExecutorService executor;
4746
private final AsyncRunner runner;
4847
private final AtomicInteger reqId = new AtomicInteger();
4948
private final AtomicBoolean canceled = new AtomicBoolean();
@@ -59,14 +58,18 @@ interface DumpBytesRecorder {
5958
SnapshotSyncRequest request,
6059
IKVRange accessor,
6160
IKVRangeMessenger messenger,
62-
Executor executor,
6361
Duration maxIdleDuration,
6462
long bandwidth,
6563
DumpBytesRecorder recorder,
6664
String... tags) {
6765
this.follower = follower;
6866
this.request = request;
6967
this.messenger = messenger;
68+
this.executor = ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
69+
new ThreadPoolExecutor(1, 1,
70+
0L, TimeUnit.MILLISECONDS, new LinkedTransferQueue<>(),
71+
EnvProvider.INSTANCE.newThreadFactory("basekv-snapshot-dumper")),
72+
"mutator", "basekv.range", Tags.of(tags));
7073
this.runner = new AsyncRunner("basekv.runner.sessiondump", executor);
7174
this.maxIdleDuration = maxIdleDuration;
7275
this.recorder = recorder;
@@ -124,7 +127,7 @@ String checkpointId() {
124127
}
125128

126129
void tick() {
127-
if (lastReplyTS == 0) {
130+
if (lastReplyTS == 0 || canceled.get()) {
128131
return;
129132
}
130133
long elapseNanos = Duration.ofNanos(System.nanoTime() - lastReplyTS).toNanos();
@@ -147,7 +150,7 @@ void cancel() {
147150
}
148151

149152
CompletableFuture<Result> awaitDone() {
150-
return doneSignal;
153+
return doneSignal.whenComplete((v, e) -> executor.shutdown());
151154
}
152155

153156
private void handleReply(SaveSnapshotDataReply reply) {
@@ -230,4 +233,12 @@ private void nextSaveRequest() {
230233
}
231234
});
232235
}
236+
237+
enum Result {
238+
OK, NoCheckpoint, Canceled, Abort, Error
239+
}
240+
241+
interface DumpBytesRecorder {
242+
void record(int bytes);
243+
}
233244
}

0 commit comments

Comments
 (0)