Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7f639aa
add shortcut to prevent NPE when landscape doesn't contain local desc…
popduke Aug 17, 2025
e1e628b
1. fixed an identified config change failure process by: 1)make wal c…
popduke Aug 17, 2025
22f9f4c
Reduce memory head caused by inefficient argument formatter
popduke Aug 17, 2025
2138a7e
Improve the backpressure mechanism when the downstream is stalled
popduke Aug 17, 2025
c89355e
1. Reducing AGENT_HOST_MAP sync overhead when failed member detected
popduke Aug 17, 2025
e68de01
1. Correct the log context for CRDT
popduke Aug 19, 2025
2fdf789
fixed a race condition which may cause pipeline graceful retargeting …
popduke Aug 19, 2025
a575ce5
1. reduce HostMemberList sync overhead during failure broadcast
popduke Aug 19, 2025
a2a7f71
1. fixed an issue in RedundantRangeRemovalBalancer which may cause th…
popduke Aug 20, 2025
0abe945
1. Optimize bootstrap and config change workflow
popduke Aug 21, 2025
23f12e9
1. correctly handle the duplicate matchinfo in inbox ingestion package
popduke Aug 22, 2025
2fc4018
Fixed an unbalanced issue of ReplicaCntBalancer
popduke Aug 27, 2025
4e65e0f
CRDT AntiEntropy Improvements:
popduke Aug 27, 2025
1c6e896
Improve stale member cleanup logic
popduke Aug 28, 2025
fab1267
base-crdt: metering delta send rate and throughput correctly during a…
popduke Aug 29, 2025
8ff32b3
optimize balancer's log output
popduke Aug 29, 2025
91dea9b
Improved housekeeping logic for crdt-based metadata service
popduke Aug 29, 2025
cfc1e03
Improved the built-in balancers efficiency
popduke Sep 5, 2025
7fbf1a8
Exclude ranges in terminated states from effective route
popduke Sep 5, 2025
91723a2
Enable manually triggered cov build
popduke Sep 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-cov.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Cov-Build

on:
workflow_dispatch:
pull_request:
branches:
- 'main'
Expand Down
4 changes: 4 additions & 0 deletions base-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>org.apache.bifromq</groupId>
<artifactId>base-env-provider</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bifromq</groupId>
<artifactId>base-util</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bifromq</groupId>
<artifactId>base-hlc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,31 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster;

import static org.apache.bifromq.basecluster.memberlist.CRDTUtil.AGENT_HOST_MAP_URI;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bifromq.basecluster.memberlist.CRDTUtil.AGENT_HOST_MAP_URI;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.agent.proto.AgentEndpoint;
import org.apache.bifromq.basecluster.fd.FailureDetector;
import org.apache.bifromq.basecluster.fd.IFailureDetector;
Expand All @@ -43,23 +60,6 @@
import org.apache.bifromq.basecrdt.store.ICRDTStore;
import org.apache.bifromq.basecrdt.store.proto.CRDTStoreMessage;
import org.apache.bifromq.baseenv.EnvProvider;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;

@Slf4j
final class AgentHost implements IAgentHost {
Expand Down Expand Up @@ -173,6 +173,11 @@ public Observable<Map<HostEndpoint, Set<String>>> landscape() {
return memberList.landscape();
}

@Override
public Observable<Long> refuteSignal() {
return memberList.refuteSignal();
}

@Override
public void close() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster;

import io.reactivex.rxjava3.core.Observable;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bifromq.basecluster.memberlist.HostAddressResolver;
import org.apache.bifromq.basecluster.memberlist.IHostAddressResolver;
import org.apache.bifromq.basecluster.memberlist.agent.IAgent;
import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
import org.apache.bifromq.basecluster.transport.ITransport;
import org.apache.bifromq.basecluster.transport.TCPTransport;
import org.apache.bifromq.basecluster.transport.Transport;
import io.reactivex.rxjava3.core.Observable;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Agent host defines the interface for hosting agents and joining the cluster.
Expand Down Expand Up @@ -101,6 +101,14 @@ static IAgentHost newInstance(AgentHostOptions options) {
*/
Observable<Map<HostEndpoint, Set<String>>> landscape();

/**
* Emits a signal whenever the local host actively refutes a suspicion of being dead.
* Each emission carries the timestamp (in millis) when the refutation occurred.
*
* @return an observable stream of refutation timestamps
*/
Observable<Long> refuteSignal();

/**
* Shutdown the agent host.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.memberlist;
Expand All @@ -25,28 +25,8 @@
import static org.apache.bifromq.basecrdt.core.api.CausalCRDTType.mvreg;
import static org.apache.bifromq.basecrdt.store.ReplicaIdGenerator.generate;

import org.apache.bifromq.basecluster.agent.proto.AgentEndpoint;
import org.apache.bifromq.basecluster.memberlist.agent.Agent;
import org.apache.bifromq.basecluster.memberlist.agent.AgentAddressProvider;
import org.apache.bifromq.basecluster.memberlist.agent.AgentMessenger;
import org.apache.bifromq.basecluster.memberlist.agent.IAgent;
import org.apache.bifromq.basecluster.membership.proto.Doubt;
import org.apache.bifromq.basecluster.membership.proto.Fail;
import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
import org.apache.bifromq.basecluster.membership.proto.HostMember;
import org.apache.bifromq.basecluster.membership.proto.Join;
import org.apache.bifromq.basecluster.membership.proto.Quit;
import org.apache.bifromq.basecluster.messenger.IMessenger;
import org.apache.bifromq.basecluster.proto.ClusterMessage;
import org.apache.bifromq.basecrdt.core.api.IORMap;
import org.apache.bifromq.basecrdt.core.api.MVRegOperation;
import org.apache.bifromq.basecrdt.core.api.ORMapOperation;
import org.apache.bifromq.basecrdt.proto.Replica;
import org.apache.bifromq.basecrdt.store.ICRDTStore;
import org.apache.bifromq.basehlc.HLC;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.ByteString;
import io.micrometer.core.instrument.Gauge;
Expand All @@ -57,6 +37,7 @@
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -69,7 +50,30 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.base.util.RendezvousHash;
import org.apache.bifromq.basecluster.agent.proto.AgentEndpoint;
import org.apache.bifromq.basecluster.memberlist.agent.Agent;
import org.apache.bifromq.basecluster.memberlist.agent.AgentAddressProvider;
import org.apache.bifromq.basecluster.memberlist.agent.AgentMessenger;
import org.apache.bifromq.basecluster.memberlist.agent.IAgent;
import org.apache.bifromq.basecluster.membership.proto.Doubt;
import org.apache.bifromq.basecluster.membership.proto.Fail;
import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
import org.apache.bifromq.basecluster.membership.proto.HostMember;
import org.apache.bifromq.basecluster.membership.proto.Join;
import org.apache.bifromq.basecluster.membership.proto.Quit;
import org.apache.bifromq.basecluster.messenger.IMessenger;
import org.apache.bifromq.basecluster.proto.ClusterMessage;
import org.apache.bifromq.basecrdt.core.api.IORMap;
import org.apache.bifromq.basecrdt.core.api.MVRegOperation;
import org.apache.bifromq.basecrdt.core.api.ORMapOperation;
import org.apache.bifromq.basecrdt.proto.Replica;
import org.apache.bifromq.basecrdt.store.ICRDTStore;
import org.apache.bifromq.basehlc.HLC;

/**
* HostMemberList implementation using CRDT for achieving a consistent view of the host members in the cluster.
*/
@Slf4j
public class HostMemberList implements IHostMemberList {
private final AtomicReference<State> state = new AtomicReference<>(State.JOINED);
Expand All @@ -79,12 +83,25 @@ public class HostMemberList implements IHostMemberList {
private final IHostAddressResolver addressResolver;
private final BehaviorSubject<Map<HostEndpoint, HostMember>> membershipSubject = BehaviorSubject.createDefault(
new ConcurrentHashMap<>());
private final PublishSubject<Long> refuteSubject = PublishSubject.create();
private final Map<String, Agent> agentMap = new ConcurrentHashMap<>();
private final IORMap hostListCRDT;
private final CompositeDisposable disposables = new CompositeDisposable();
private final MetricManager metricManager;
private final String[] tags;
private volatile HostMember local;

/**
* Constructor of HostMemberList.
*
* @param bindAddr the address to bind the host member
* @param port the port to bind the host member
* @param messenger the messenger to use for communication
* @param scheduler the scheduler to use for scheduling tasks
* @param store the CRDT store to use for storing internal OR-Map
* @param addressResolver the address resolver to resolve host endpoints to addresses
* @param tags the tags to be used for metrics
*/
public HostMemberList(String bindAddr,
int port,
IMessenger messenger,
Expand Down Expand Up @@ -134,10 +151,13 @@ private boolean join(HostMember member) {
if (joined) {
// add it into crdt
log.debug("Member[{}] joins the cluster: local={}", member, local);
Optional<HostMember> memberInCRDT = getHostMember(hostListCRDT, member.getEndpoint());
if (memberInCRDT.isEmpty() || memberInCRDT.get().getIncarnation() < member.getIncarnation()) {
hostListCRDT.execute(ORMapOperation.update(member.getEndpoint().toByteString())
.with(MVRegOperation.write(member.toByteString())));
if (member == local) {
// only update crdt if it's local member
Optional<HostMember> memberInCRDT = getHostMember(hostListCRDT, member.getEndpoint());
if (memberInCRDT.isEmpty() || memberInCRDT.get().getIncarnation() < member.getIncarnation()) {
hostListCRDT.execute(ORMapOperation.update(member.getEndpoint().toByteString())
.with(MVRegOperation.write(member.toByteString())));
}
}
// update crdt landscape
store.join(hostListCRDT.id(), currentMembers().keySet().stream()
Expand All @@ -148,12 +168,11 @@ private boolean join(HostMember member) {
}
}

private void drop(HostEndpoint memberEndpoint, int incarnation) {
private void drop(HostEndpoint memberEndpoint, int incarnation, boolean fromQuit) {
synchronized (this) {
boolean removed = removeMember(memberEndpoint, incarnation);
Optional<HostMember> memberInCRDT = getHostMember(hostListCRDT, memberEndpoint);
if (memberInCRDT.isPresent()) {
// remove it from crdt if any
if (!fromQuit && memberInCRDT.isPresent() && shouldReportFailure(memberInCRDT.get().getEndpoint())) {
hostListCRDT.execute(ORMapOperation.remove(memberEndpoint.toByteString()).of(mvreg));
}
if (removed) {
Expand All @@ -165,6 +184,17 @@ private void drop(HostEndpoint memberEndpoint, int incarnation) {
}
}

private boolean shouldReportFailure(HostEndpoint failedMemberEndpoint) {
// if local member is responsible for removing the failed member from CRDT
RendezvousHash<HostEndpoint, HostEndpoint> hash = RendezvousHash.<HostEndpoint, HostEndpoint>builder()
.keyFunnel((from, into) -> into.putBytes(from.getId().asReadOnlyByteBuffer()))
.nodeFunnel((from, into) -> into.putBytes(from.getId().asReadOnlyByteBuffer()))
.nodes(currentMembers().keySet())
.build();
HostEndpoint reporter = hash.get(failedMemberEndpoint);
return reporter.getId().equals(local.getEndpoint().getId());
}

@Override
public boolean isZombie(HostEndpoint endpoint) {
return !endpoint.getId().equals(local.getEndpoint().getId())
Expand Down Expand Up @@ -207,6 +237,7 @@ public CompletableFuture<Void> stop() {
.thenCompose(v -> store.stopHosting(hostListCRDT.id()))
.whenComplete((v, e) -> {
membershipSubject.onComplete();
refuteSubject.onComplete();
metricManager.close();
state.set(State.QUITED);
});
Expand All @@ -226,6 +257,8 @@ private void renew(int atLeastIncarnation) {
synchronized (this) {
local = local.toBuilder().setIncarnation(Math.max(local.getIncarnation(), atLeastIncarnation) + 1).build();
join(local);
agentMap.values().forEach(Agent::refreshRegistration);
refuteSubject.onNext(HLC.INST.get());
}
}

Expand All @@ -247,7 +280,6 @@ public IAgent host(String agentId) {
tags));
local = local.toBuilder()
.setIncarnation(local.getIncarnation() + 1)
.addAgentId(agentId) // deprecate since 3.3.3
.putAgent(agentId, agentEndpoint.getIncarnation())
.build();
join(local);
Expand All @@ -265,8 +297,6 @@ public CompletableFuture<Void> stopHosting(String agentId) {
synchronized (this) {
local = local.toBuilder()
.setIncarnation(local.getIncarnation() + 1)
.clearAgentId()
.addAllAgentId(agentMap.keySet()) // deprecate since 3.3.3
.clearAgent()
.putAllAgent(Maps.transformValues(agentMap, a -> a.local().getIncarnation()))
.build();
Expand All @@ -279,7 +309,12 @@ public CompletableFuture<Void> stopHosting(String agentId) {

@Override
public Observable<Map<HostEndpoint, Set<String>>> landscape() {
return membershipSubject.map(m -> Maps.transformValues(m, v -> Sets.newHashSet(v.getAgentIdList())));
return membershipSubject.map(m -> Maps.transformValues(m, v -> v.getAgentMap().keySet()));
}

@Override
public Observable<Long> refuteSignal() {
return refuteSubject;
}

private Map<HostEndpoint, HostMember> currentMembers() {
Expand Down Expand Up @@ -327,6 +362,9 @@ private void handleMessage(ClusterMessage message) {
case QUIT -> handleQuit(message.getQuit());
case FAIL -> handleFail(message.getFail());
case DOUBT -> handleDoubt(message.getDoubt());
default -> {
// never happen
}
}
}

Expand Down Expand Up @@ -363,15 +401,15 @@ private void handleFail(Fail fail) {
} else if (isZombie(failedEndpoint)) {
clearZombie(failedEndpoint);
} else {
drop(failedEndpoint, fail.getIncarnation());
drop(failedEndpoint, fail.getIncarnation(), false);
}
}

private void handleQuit(Quit quit) {
HostEndpoint quitEndpoint = quit.getEndpoint();
log.debug("Member[{}] quits the cluster", quitEndpoint);
if (!quitEndpoint.equals(local.getEndpoint()) && !isZombie(quitEndpoint)) {
drop(quitEndpoint, quit.getIncarnation());
drop(quitEndpoint, quit.getIncarnation(), true);
}
}

Expand All @@ -388,7 +426,7 @@ private void handleDoubt(Doubt doubt) {

private void clearZombie(HostEndpoint zombieEndpoint) {
// drop zombie if any, and broadcast a quit on behalf of it
drop(zombieEndpoint, Integer.MAX_VALUE);
drop(zombieEndpoint, Integer.MAX_VALUE, false);
messenger.spread(ClusterMessage.newBuilder()
.setQuit(Quit.newBuilder().setEndpoint(zombieEndpoint).setIncarnation(Integer.MAX_VALUE).build())
.build());
Expand Down
Loading
Loading