Skip to content

Commit 652621e

Browse files
authored
Performance/memory optimizations, stability/bug fixes, and small enhancements (#183)
* Reduce query stuttering effect during range topology change * Simplify client range router patching mechanism * Refactor BatchPubCall and related classes to improve fan-out handling and reduce memory overhead * Don't report redundant ServerBusy event * Reduce loading time by making DistWorkerCoProc/InboxStoreCoProc reset process async * Reduce memory overhead by doing contract to root after remove * Refactor base-kv to support report leader change to CoProc * Only report tenant metrics from leader co-proc * Optimize retry logic for pushing qos1/2 messages and adjust related events * New event for matching retained messages successfully and related tests * Add createdAt field to InboxMetadata and implement drop event reporting on delete * Add an api to retrieve inbox state * Reduce memory overhead during resetting * fixed the bug causing DistWorkerCleaner stop running * Adjust default values of gc params * Conditional clear batch call state during reset to avoid task leaking * Prevent concurrent update non-thread safe result proto builder * Improved child removal logic and add benchmarking for performance evaluation * Add set session type to ClientInfo before making attach request * Enhance DistWorkerCleaner with heuristic interval and step * Implemented dynamic sending window for confirmable messages * Improve gossiping: * Do not early confirm when single node * Reduce message complexity * Optimize child branch detachment and compression logic in TopicLevelTrie * Enhance TopicIndex to support custom value equality strategy * Fix potential consistency issue in TenantRouteCache and TopicIndex * Update tenant stats on inbox clearance and add integration test for session deletion * Refactor InboxMetaCache to reduce refresh/seek operation * Correct range lookup key for InboxCheckSubScheduler * Ensure strict fifo order for check permission call * Try drain staging buffer immediately after buffering message * Schedule an on-demand hint/confirm timeout to ensure fetch loop non-stop * Add @JsonMerge annotation to service configuration classes to reduce config file complexity further * Reduce IO overhead for InboxStoreCoProc
1 parent fc40ee6 commit 652621e

File tree

240 files changed

+12472
-3446
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

240 files changed

+12472
-3446
lines changed

base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/Gossiper.java

Lines changed: 58 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.basecluster.messenger;
2121

22-
import org.apache.bifromq.basecluster.messenger.proto.GossipMessage;
2322
import com.github.benmanes.caffeine.cache.Cache;
2423
import com.github.benmanes.caffeine.cache.Caffeine;
2524
import com.google.common.annotations.VisibleForTesting;
@@ -41,59 +40,20 @@
4140
import lombok.Builder;
4241
import lombok.Getter;
4342
import lombok.extern.slf4j.Slf4j;
43+
import org.apache.bifromq.basecluster.messenger.proto.GossipMessage;
4444

4545
@Slf4j
4646
final class Gossiper {
47-
48-
@Builder
49-
private static class GossipState {
50-
public final GossipMessage message;
51-
52-
public final long infectionPeriod;
53-
54-
private final CompletableFuture<Duration> spreadSuccessSignal = new CompletableFuture<>();
55-
56-
private final long start = System.nanoTime();
57-
58-
private final Set<InetSocketAddress> infected = new HashSet<>();
59-
60-
@Getter
61-
public boolean confirmed;
62-
63-
void addInfectedAddress(InetSocketAddress address) {
64-
infected.add(address);
65-
}
66-
67-
boolean isInfected(InetSocketAddress address) {
68-
return infected.contains(address);
69-
}
70-
71-
CompletableFuture<Duration> spreadSuccessSignal() {
72-
return spreadSuccessSignal;
73-
}
74-
75-
void confirmSpreadSuccess() {
76-
spreadSuccessSignal.complete(Duration.ofNanos(System.nanoTime() - start));
77-
confirmed = true;
78-
}
79-
}
80-
81-
private long currentPeriod = 0;
82-
private long gossipCounter = 0;
83-
// nanos
84-
private long prevPeriodTime = -1;
85-
8647
private final Cache<String, GossipState> currentGossips;
87-
8848
private final String id;
89-
9049
private final int retransmitMultiplier;
91-
9250
private final Duration spreadPeriod;
93-
9451
private final Subject<GossipMessage> gossipPublisher;
95-
9652
private final Observable<Timed<GossipMessage>> gossipSink;
53+
private long currentPeriod = 0;
54+
private long gossipCounter = 0;
55+
// nanos
56+
private long prevPeriodTime = -1;
9757

9858
Gossiper(String id, int retransmitMultiplier, Duration spreadPeriod, Scheduler scheduler) {
9959
// global unique id of the gossiper
@@ -140,28 +100,33 @@ public long nextPeriod(int totalGossipers) {
140100
prevPeriodTime = currentPeriodTime;
141101
// If time interval between current and previous period is too long, do confirm or sweep in advance
142102
if (elapsedPeriod > periodsToSweep) {
143-
log.warn("Too many elapsed periods, sweep gossips in advance, currentPeriod={}, elapsedPeriod={}",
144-
currentPeriod, elapsedPeriod);
145-
sweepSpreadGossips(gossipState -> true);
103+
// Avoid sweeping when isolated to prevent information loss
104+
// in case the node is temporarily partitioned.
105+
if (totalGossipers > 1) {
106+
log.warn("Too many elapsed periods, sweep gossips in advance, currentPeriod={}, elapsedPeriod={}",
107+
currentPeriod, elapsedPeriod);
108+
sweepSpreadGossips(gossipState -> true);
109+
}
146110
return currentPeriod;
147111
}
148112
if (elapsedPeriod > periodsToSpread) {
149-
log.warn("Some gossips are too old to spread, confirm gossips in advance, "
150-
+ "currentPeriod={}, elapsedPeriod={}",
151-
currentPeriod, elapsedPeriod);
152-
double confirmRatio = elapsedPeriod / (double) periodsToSweep;
153-
confirmGossipsSpread(gossipState ->
154-
!gossipState.confirmed && (
155-
ThreadLocalRandom.current().nextDouble() < confirmRatio ||
156-
currentPeriod > gossipState.infectionPeriod + periodsToSpread)
157-
);
113+
// Do not early-confirm when isolated (no other gossipers).
114+
if (totalGossipers > 1) {
115+
log.warn("Some gossips are too old to spread, confirm gossips in advance, "
116+
+ "currentPeriod={}, elapsedPeriod={}", currentPeriod, elapsedPeriod);
117+
double confirmRatio = elapsedPeriod / (double) periodsToSweep;
118+
confirmGossipsSpread(gossipState ->
119+
!gossipState.confirmed && (ThreadLocalRandom.current().nextDouble() < confirmRatio
120+
|| currentPeriod > gossipState.infectionPeriod + periodsToSpread)
121+
);
122+
}
158123
return currentPeriod;
159124
}
160125
}
161126
// do normal period action
162127
sweepSpreadGossips(gossipState -> currentPeriod > gossipState.infectionPeriod + periodsToSweep);
163-
confirmGossipsSpread(gossipState -> !gossipState.confirmed &&
164-
currentPeriod > gossipState.infectionPeriod + periodsToSpread);
128+
confirmGossipsSpread(gossipState -> !gossipState.confirmed
129+
&& currentPeriod > gossipState.infectionPeriod + periodsToSpread);
165130
prevPeriodTime = currentPeriodTime;
166131
return currentPeriod;
167132
}
@@ -217,4 +182,37 @@ int gossipPeriodsToSweep(int retransmitMultiplier, int totalGossipers) {
217182
int ceilLog2(int num) {
218183
return num <= 1 ? 1 : 32 - Integer.numberOfLeadingZeros(num - 1);
219184
}
185+
186+
@Builder
187+
private static class GossipState {
188+
public final GossipMessage message;
189+
190+
public final long infectionPeriod;
191+
192+
private final CompletableFuture<Duration> spreadSuccessSignal = new CompletableFuture<>();
193+
194+
private final long start = System.nanoTime();
195+
196+
private final Set<InetSocketAddress> infected = new HashSet<>();
197+
198+
@Getter
199+
public boolean confirmed;
200+
201+
void addInfectedAddress(InetSocketAddress address) {
202+
infected.add(address);
203+
}
204+
205+
boolean isInfected(InetSocketAddress address) {
206+
return infected.contains(address);
207+
}
208+
209+
CompletableFuture<Duration> spreadSuccessSignal() {
210+
return spreadSuccessSignal;
211+
}
212+
213+
void confirmSpreadSuccess() {
214+
spreadSuccessSignal.complete(Duration.ofNanos(System.nanoTime() - start));
215+
confirmed = true;
216+
}
217+
}
220218
}

base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/MessengerMessageEnvelope.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.basecluster.messenger;
2121

22-
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
2322
import java.net.InetSocketAddress;
2423
import lombok.Builder;
24+
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
2525

2626
@Builder
2727
class MessengerMessageEnvelope {
2828
public final MessengerMessage message;
2929
public final InetSocketAddress recipient;
30-
public final InetSocketAddress sender; // null if gossip
30+
public final InetSocketAddress sender;
3131
}

base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/MessengerTransport.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,11 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.basecluster.messenger;
2121

22-
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
23-
import org.apache.bifromq.basecluster.transport.ITransport;
24-
import org.apache.bifromq.basecluster.transport.PacketEnvelope;
2522
import com.google.protobuf.AbstractMessageLite;
2623
import com.google.protobuf.InvalidProtocolBufferException;
2724
import io.reactivex.rxjava3.core.Observable;
@@ -33,6 +30,9 @@
3330
import java.util.concurrent.TimeUnit;
3431
import java.util.stream.Collectors;
3532
import lombok.extern.slf4j.Slf4j;
33+
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
34+
import org.apache.bifromq.basecluster.transport.ITransport;
35+
import org.apache.bifromq.basecluster.transport.PacketEnvelope;
3636

3737
@Slf4j
3838
final class MessengerTransport {
@@ -61,25 +61,13 @@ Observable<Timed<MessengerMessageEnvelope>> receive() {
6161

6262
private Observable<Timed<MessengerMessageEnvelope>> convert(PacketEnvelope packetEnvelope) {
6363
return Observable.fromIterable(packetEnvelope.data.stream().map(b -> {
64-
MessengerMessageEnvelope.MessengerMessageEnvelopeBuilder messageEnvelopeBuilder =
65-
MessengerMessageEnvelope.builder()
66-
.recipient(packetEnvelope.recipient);
6764
try {
68-
MessengerMessage mm = MessengerMessage.parseFrom(b);
69-
messageEnvelopeBuilder.message(mm);
70-
switch (mm.getMessengerMessageTypeCase()) {
71-
case DIRECT:
72-
return new Timed<MessengerMessageEnvelope>(
73-
messageEnvelopeBuilder.sender(packetEnvelope.sender).build(),
74-
System.currentTimeMillis(),
75-
TimeUnit.MILLISECONDS);
76-
case GOSSIP:
77-
default:
78-
return new Timed<MessengerMessageEnvelope>(
79-
messageEnvelopeBuilder.build(),
80-
System.currentTimeMillis(),
81-
TimeUnit.MILLISECONDS);
82-
}
65+
MessengerMessageEnvelope mmEnvelop = MessengerMessageEnvelope.builder()
66+
.recipient(packetEnvelope.recipient)
67+
.message(MessengerMessage.parseFrom(b))
68+
.sender(packetEnvelope.sender)
69+
.build();
70+
return new Timed<>(mmEnvelop, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
8371
} catch (InvalidProtocolBufferException e) {
8472
return null;
8573
}

base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.collect.Sets;
2626
import com.google.protobuf.ByteString;
2727
import io.reactivex.rxjava3.observers.TestObserver;
28+
import java.time.Duration;
2829
import java.util.HashMap;
2930
import java.util.Map;
3031
import java.util.Set;
@@ -362,10 +363,10 @@ public void testAgentClusterPartitionAndHealing() {
362363
IAgentMember agentMember1OnS1 = agentOnS1.register("agentNode1OnS1");
363364
agentMember1OnS1.metadata(copyFromUtf8("agentNode1OnS1"));
364365
IAgentMember agentMember2OnS1 = agentOnS1.register("agentNode2OnS1");
365-
agentMember1OnS1.metadata(copyFromUtf8("agentNode2OnS1"));
366+
agentMember2OnS1.metadata(copyFromUtf8("agentNode2OnS1"));
366367

367368
IAgentMember agentMemberOnS2 = agentOnS2.register("agentNodeOnS2");
368-
agentMember2OnS1.metadata(copyFromUtf8("agentNodeOnS2"));
369+
agentMemberOnS2.metadata(copyFromUtf8("agentNodeOnS2"));
369370

370371
IAgentMember agentMemberOnS3 = agentOnS3.register("agentNodeOnS3");
371372
agentMemberOnS3.metadata(copyFromUtf8("agentNodeOnS3"));
@@ -374,17 +375,17 @@ public void testAgentClusterPartitionAndHealing() {
374375
await().until(() -> agentOnS2.membership().blockingFirst().size() == 4);
375376
await().until(() -> agentOnS3.membership().blockingFirst().size() == 4);
376377

377-
// isolate s2 from others
378+
// isolate s1 from others
378379
storeMgr.isolate("s1");
379-
await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 2);
380-
await().forever().until(() -> agentOnS2.membership().blockingFirst().size() == 2);
381-
await().forever().until(() -> agentOnS3.membership().blockingFirst().size() == 2);
380+
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS1.membership().blockingFirst().size() == 2);
381+
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS2.membership().blockingFirst().size() == 2);
382+
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS3.membership().blockingFirst().size() == 2);
382383

383384
// integrate s1 into the cluster
384385
storeMgr.integrate("s1");
385-
await().forever().until(() -> agentOnS1.membership().blockingFirst().size() == 4);
386-
await().forever().until(() -> agentOnS2.membership().blockingFirst().size() == 4);
387-
await().forever().until(() -> agentOnS3.membership().blockingFirst().size() == 4);
386+
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS1.membership().blockingFirst().size() == 4);
387+
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS2.membership().blockingFirst().size() == 4);
388+
await().atMost(Duration.ofSeconds(60)).until(() -> agentOnS3.membership().blockingFirst().size() == 4);
388389
}
389390

390391
@StoreCfgs(stores = {

base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeState.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,24 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.basekv.raft;
2121

22+
import com.google.protobuf.ByteString;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.Iterator;
27+
import java.util.LinkedHashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import java.util.Set;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import java.util.stream.Collectors;
2235
import org.apache.bifromq.basekv.raft.event.CommitEvent;
2336
import org.apache.bifromq.basekv.raft.event.ElectionEvent;
2437
import org.apache.bifromq.basekv.raft.event.SnapshotRestoredEvent;
@@ -34,19 +47,6 @@
3447
import org.apache.bifromq.basekv.raft.proto.RequestVoteReply;
3548
import org.apache.bifromq.basekv.raft.proto.Snapshot;
3649
import org.apache.bifromq.basekv.raft.proto.Voting;
37-
import com.google.protobuf.ByteString;
38-
import java.util.Collections;
39-
import java.util.HashMap;
40-
import java.util.HashSet;
41-
import java.util.Iterator;
42-
import java.util.LinkedHashMap;
43-
import java.util.List;
44-
import java.util.Map;
45-
import java.util.Optional;
46-
import java.util.Set;
47-
import java.util.concurrent.CompletableFuture;
48-
import java.util.concurrent.ThreadLocalRandom;
49-
import java.util.stream.Collectors;
5050
import org.slf4j.Logger;
5151

5252
abstract class RaftNodeState implements IRaftNodeState {
@@ -248,7 +248,7 @@ protected void submitSnapshot(ByteString requested, String fromLeader) {
248248
(installed, ex) -> onSnapshotInstalled.done(requested, installed, ex));
249249
}
250250

251-
protected void notifyCommit() {
251+
protected void notifyCommit(boolean isLeader) {
252252
log.trace("Notify commit index[{}]", commitIndex);
253253
for (Iterator<Map.Entry<Long, ProposeTask>> it = uncommittedProposals.entrySet().iterator(); it.hasNext(); ) {
254254
Map.Entry<Long, ProposeTask> entry = it.next();
@@ -282,7 +282,7 @@ protected void notifyCommit() {
282282
task.future.completeExceptionally(DropProposalException.overridden());
283283
}
284284
}
285-
listener.onEvent(new CommitEvent(id, commitIndex));
285+
listener.onEvent(new CommitEvent(id, commitIndex, isLeader));
286286
}
287287

288288
protected void notifyLeaderElected(String leaderId, long term) {

base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollower.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ RaftNodeState stableTo(long stabledIndex) {
262262
if (task.committed) {
263263
commitIndex = index;
264264
log.trace("Advanced commitIndex[{}]", commitIndex);
265-
notifyCommit();
265+
notifyCommit(false);
266266
}
267267
toRemove.add(index);
268268
} else {
@@ -531,7 +531,7 @@ private void handleAppendEntries(String fromLeader, AppendEntries appendEntries)
531531
log.trace("Advanced commitIndex[from:{},to:{}]", commitIndex, newCommitIndex);
532532
commitIndex = newCommitIndex;
533533
// report to application
534-
notifyCommit();
534+
notifyCommit(false);
535535
} else {
536536
// entries between lastIndex and newCommitIndex missing, probably because the channel between
537537
// leader and follower is lossy.
@@ -554,7 +554,7 @@ private void handleAppendEntries(String fromLeader, AppendEntries appendEntries)
554554
stabilizingIndexes.firstKey(), commitIndex, newCommitIndex);
555555
commitIndex = newCommitIndex;
556556
// report to application
557-
notifyCommit();
557+
notifyCommit(false);
558558
} else {
559559
if (newCommitIndex > stabilizingIndexes.lastKey()) {
560560
// if the newCommitIndex is greater than the largest local stabilizing index

0 commit comments

Comments
 (0)