Skip to content

Commit 400c14a

Browse files
zhangShunLinzhangShunLin
authored andcommitted
Merge remote-tracking branch 'origin/main'
2 parents 5c25b7a + 1e049ee commit 400c14a

File tree

257 files changed

+5867
-6487
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

257 files changed

+5867
-6487
lines changed

base-cluster/src/main/java/com/baidu/bifromq/basecluster/AgentHost.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import com.google.common.base.Preconditions;
4242
import com.google.common.base.Strings;
4343
import com.google.protobuf.ByteString;
44+
import io.micrometer.core.instrument.Metrics;
45+
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
4446
import io.reactivex.rxjava3.core.Observable;
4547
import io.reactivex.rxjava3.core.Scheduler;
4648
import io.reactivex.rxjava3.disposables.CompositeDisposable;
@@ -55,24 +57,20 @@
5557

5658
@Slf4j
5759
final class AgentHost implements IAgentHost {
58-
private enum State {
59-
INIT, STARTING, STARTED, STOPPING, SHUTDOWN
60-
}
61-
6260
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
6361
private final AgentHostOptions options;
6462
private final ICRDTStore store;
6563
private final ITransport transport;
6664
private final IMessenger messenger;
6765
private final IHostMemberList memberList;
66+
private final Scheduler hostScheduler;
6867
private final MemberSelector memberSelector;
6968
private final AutoHealer healer;
7069
private final AutoSeeder seeder;
7170
private final AutoDropper deadDropper;
7271
private final CompositeDisposable disposables = new CompositeDisposable();
7372
private final IHostAddressResolver hostAddressResolver;
7473
private final String[] tags;
75-
7674
AgentHost(ITransport transport, IHostAddressResolver resolver, AgentHostOptions options) {
7775
checkArgument(!Strings.isNullOrEmpty(options.addr()) && !"0.0.0.0".equals(options.addr()),
7876
"Invalid bind address");
@@ -84,21 +82,22 @@ private enum State {
8482
.maxHealthScore(options.awarenessMaxMultiplier())
8583
.retransmitMultiplier(options.retransmitMultiplier())
8684
.spreadPeriod(options.gossipPeriod());
87-
Scheduler scheduler = Schedulers.from(newSingleThreadScheduledExecutor(
88-
EnvProvider.INSTANCE.newThreadFactory("agent-host-scheduler", true)));
85+
hostScheduler = Schedulers.from(ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
86+
newSingleThreadScheduledExecutor(EnvProvider.INSTANCE.newThreadFactory("agent-host-scheduler", true)),
87+
"agent-host-scheduler"));
8988
this.store = ICRDTStore.newInstance(options.crdtStoreOptions());
9089
this.messenger = Messenger.builder()
9190
.transport(transport)
9291
.opts(messengerOptions)
93-
.scheduler(scheduler)
92+
.scheduler(hostScheduler)
9493
.build();
9594
hostAddressResolver = resolver;
9695
this.store.start(messenger.receive()
9796
.filter(m -> m.value().message.hasCrdtStoreMessage())
9897
.map(m -> m.value().message.getCrdtStoreMessage()));
9998
tags = new String[] {"local", options.addr() + ":" + messenger.bindAddress().getPort()};
10099
this.memberList = new HostMemberList(options.addr(), messenger.bindAddress().getPort(),
101-
messenger, scheduler, store, hostAddressResolver, tags);
100+
messenger, hostScheduler, store, hostAddressResolver, tags);
102101
IFailureDetector failureDetector = FailureDetector.builder()
103102
.local(new IProbingTarget() {
104103
@Override
@@ -116,15 +115,15 @@ public InetSocketAddress addr() {
116115
.indirectProbes(options.indirectProbes())
117116
.worstHealthScore(options.awarenessMaxMultiplier())
118117
.messenger(messenger)
119-
.scheduler(scheduler)
118+
.scheduler(hostScheduler)
120119
.build();
121-
healer = new AutoHealer(messenger, scheduler, memberList, hostAddressResolver, options.autoHealingTimeout(),
120+
healer = new AutoHealer(messenger, hostScheduler, memberList, hostAddressResolver, options.autoHealingTimeout(),
122121
options.autoHealingInterval(), tags);
123-
seeder = new AutoSeeder(messenger, scheduler, memberList, hostAddressResolver, options.joinTimeout(),
122+
seeder = new AutoSeeder(messenger, hostScheduler, memberList, hostAddressResolver, options.joinTimeout(),
124123
Duration.ofSeconds(options.joinRetryInSec()), tags);
125-
deadDropper = new AutoDropper(messenger, scheduler, memberList, failureDetector, hostAddressResolver,
124+
deadDropper = new AutoDropper(messenger, hostScheduler, memberList, failureDetector, hostAddressResolver,
126125
options.suspicionMultiplier(), options.suspicionMaxTimeoutMultiplier(), tags);
127-
memberSelector = new MemberSelector(memberList, scheduler, hostAddressResolver);
126+
memberSelector = new MemberSelector(memberList, hostScheduler, hostAddressResolver);
128127
disposables.add(store.storeMessages().subscribe(this::sendCRDTStoreMessage));
129128

130129
start();
@@ -184,6 +183,7 @@ public void close() {
184183
.whenComplete((v, e) -> {
185184
memberSelector.stop();
186185
disposables.dispose();
186+
hostScheduler.shutdown();
187187
state.set(State.SHUTDOWN);
188188
}).join();
189189
log.debug("AgentHost stopped");
@@ -198,7 +198,6 @@ private void start() {
198198
}
199199
}
200200

201-
202201
private void sendCRDTStoreMessage(CRDTStoreMessage storeMsg) {
203202
ClusterMessage msg = ClusterMessage.newBuilder().setCrdtStoreMessage(storeMsg).build();
204203
try {
@@ -213,4 +212,9 @@ private void sendCRDTStoreMessage(CRDTStoreMessage storeMsg) {
213212
log.error("Target Host[{}] not found:\n{}", storeMsg.getReceiver(), storeMsg);
214213
}
215214
}
215+
216+
217+
private enum State {
218+
INIT, STARTING, STARTED, STOPPING, SHUTDOWN
219+
}
216220
}

base-cluster/src/main/java/com/baidu/bifromq/basecluster/memberlist/HostMemberList.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,11 @@
6666

6767
@Slf4j
6868
public class HostMemberList implements IHostMemberList {
69-
private enum State {
70-
JOINED, QUITTING, QUITED
71-
}
72-
7369
private final AtomicReference<State> state = new AtomicReference<>(State.JOINED);
7470
private final IMessenger messenger;
7571
private final Scheduler scheduler;
7672
private final ICRDTStore store;
7773
private final IHostAddressResolver addressResolver;
78-
7974
private final BehaviorSubject<Map<HostEndpoint, HostMember>> membershipSubject = BehaviorSubject.createDefault(
8075
new ConcurrentHashMap<>());
8176
private final Map<String, Agent> agentMap = new ConcurrentHashMap<>();
@@ -84,7 +79,6 @@ private enum State {
8479
private final MetricManager metricManager;
8580
private final String[] tags;
8681
private volatile HostMember local;
87-
8882
public HostMemberList(String bindAddr,
8983
int port,
9084
IMessenger messenger,
@@ -180,36 +174,36 @@ private InetSocketAddress getMemberAddress(HostEndpoint endpoint) {
180174
@Override
181175
public CompletableFuture<Void> stop() {
182176
if (state.compareAndSet(State.JOINED, State.QUITTING)) {
183-
synchronized (this) {
184-
return CompletableFuture.allOf(agentMap.values().stream()
185-
.map(Agent::quit).toArray(CompletableFuture[]::new))
186-
.exceptionally(e -> null)
187-
.thenCompose(v -> {
177+
return CompletableFuture.allOf(agentMap.values().stream()
178+
.map(Agent::quit).toArray(CompletableFuture[]::new))
179+
.exceptionally(e -> null)
180+
.thenCompose(v -> {
181+
synchronized (this) {
188182
disposables.dispose();
189183
// remove self from alive host list
190184
removeMember(local.getEndpoint(), local.getIncarnation());
191185
// delete from crdt and wait for
192186
return hostListCRDT.execute(
193187
ORMapOperation.remove(local.getEndpoint().toByteString()).of(mvreg))
194188
.exceptionally(e -> null);
195-
})
196-
.thenCompose(v1 -> {
197-
ClusterMessage quit = ClusterMessage.newBuilder()
198-
.setQuit(Quit.newBuilder()
199-
.setEndpoint(local.getEndpoint())
200-
.setIncarnation(local.getIncarnation())
201-
.build())
202-
.build();
203-
return messenger.spread(quit)
204-
.handle((v, e) -> null);
205-
})
206-
.thenCompose(v -> store.stopHosting(hostListCRDT.id()))
207-
.whenComplete((v, e) -> {
208-
membershipSubject.onComplete();
209-
metricManager.close();
210-
state.set(State.QUITED);
211-
});
212-
}
189+
}
190+
})
191+
.thenCompose(v1 -> {
192+
ClusterMessage quit = ClusterMessage.newBuilder()
193+
.setQuit(Quit.newBuilder()
194+
.setEndpoint(local.getEndpoint())
195+
.setIncarnation(local.getIncarnation())
196+
.build())
197+
.build();
198+
return messenger.spread(quit)
199+
.handle((v, e) -> null);
200+
})
201+
.thenCompose(v -> store.stopHosting(hostListCRDT.id()))
202+
.whenComplete((v, e) -> {
203+
membershipSubject.onComplete();
204+
metricManager.close();
205+
state.set(State.QUITED);
206+
});
213207
} else if (state.get() == State.QUITTING) {
214208
return CompletableFuture.failedFuture(new IllegalStateException("quit has started"));
215209
} else {
@@ -425,6 +419,10 @@ private void checkState() {
425419
Preconditions.checkState(state.get() == State.JOINED);
426420
}
427421

422+
private enum State {
423+
JOINED, QUITTING, QUITED
424+
}
425+
428426
private class MetricManager {
429427

430428
private final Set<Meter> meters = new HashSet<>();

base-cluster/src/main/java/com/baidu/bifromq/basecluster/memberlist/agent/AgentMember.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.baidu.bifromq.basecluster.agent.proto.AgentMemberAddr;
1919
import com.baidu.bifromq.basecluster.agent.proto.AgentMemberMetadata;
2020
import com.baidu.bifromq.basecluster.agent.proto.AgentMessage;
21+
import com.baidu.bifromq.basecluster.agent.proto.AgentMessageEnvelope;
2122
import com.baidu.bifromq.basecrdt.core.api.IORMap;
2223
import com.baidu.bifromq.basecrdt.core.api.MVRegOperation;
2324
import com.baidu.bifromq.basecrdt.core.api.ORMapOperation;
@@ -41,7 +42,6 @@ class AgentMember implements IAgentMember {
4142
private final AgentMemberAddr localAddr;
4243
private final IORMap agentCRDT;
4344
private final IAgentMessenger messenger;
44-
private final Scheduler scheduler;
4545
private final Supplier<Set<AgentMemberAddr>> memberAddresses;
4646
private final PublishSubject<AgentMessage> agentMessageSubject = PublishSubject.create();
4747
private final CompositeDisposable disposables = new CompositeDisposable();
@@ -57,7 +57,6 @@ class AgentMember implements IAgentMember {
5757
this.localAddr = memberAddr;
5858
this.agentCRDT = agentCRDT;
5959
this.messenger = messenger;
60-
this.scheduler = scheduler;
6160
this.memberAddresses = memberAddresses;
6261
metadata = AgentMemberMetadata.newBuilder().setHlc(HLC.INST.get()).build();
6362
updateCRDT();
@@ -66,7 +65,7 @@ class AgentMember implements IAgentMember {
6665
.subscribe(this::updateCRDT));
6766
disposables.add(messenger.receive()
6867
.filter(msg -> msg.getReceiver().equals(localAddr))
69-
.map(msg -> msg.getMessage())
68+
.map(AgentMessageEnvelope::getMessage)
7069
.observeOn(scheduler)
7170
.subscribe(agentMessageSubject::onNext));
7271
}
@@ -95,10 +94,9 @@ public AgentMemberAddr address() {
9594
public CompletableFuture<Void> broadcast(ByteString message, boolean reliable) {
9695
return throwsWhenDestroyed(() -> {
9796
AgentMessage agentMessage = AgentMessage.newBuilder().setSender(localAddr).setPayload(message).build();
98-
CompletableFuture<Void>[] sendFutures = memberAddresses.get().stream()
97+
return CompletableFuture.allOf(memberAddresses.get().stream()
9998
.map(memberAddr -> messenger.send(agentMessage, memberAddr, reliable))
100-
.toArray(CompletableFuture[]::new);
101-
return CompletableFuture.allOf(sendFutures).exceptionally(e -> null);
99+
.toArray(CompletableFuture[]::new)).exceptionally(e -> null);
102100
});
103101
}
104102

@@ -132,7 +130,7 @@ public CompletableFuture<Void> multicast(String targetMemberName, ByteString mes
132130
private void updateCRDT(long ts) {
133131
skipRunWhenDestroyed(() -> {
134132
Optional<AgentMemberMetadata> metaOnCRDT = CRDTUtil.getAgentMemberMetadata(agentCRDT, localAddr);
135-
if (!metaOnCRDT.isPresent() || !metaOnCRDT.get().equals(metadata)) {
133+
if (metaOnCRDT.isEmpty() || !metaOnCRDT.get().equals(metadata)) {
136134
updateCRDT();
137135
}
138136
});

base-env/base-env-provider/pom.xml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
~ See the License for the specific language governing permissions and limitations under the License.
1313
-->
1414

15-
<project xmlns="http://maven.apache.org/POM/4.0.0"
16-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
15+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
16+
xmlns="http://maven.apache.org/POM/4.0.0"
1717
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1818
<modelVersion>4.0.0</modelVersion>
1919
<parent>
@@ -40,6 +40,10 @@
4040
<groupId>com.google.guava</groupId>
4141
<artifactId>guava</artifactId>
4242
</dependency>
43+
<dependency>
44+
<groupId>com.google.protobuf</groupId>
45+
<artifactId>protobuf-java</artifactId>
46+
</dependency>
4347
<dependency>
4448
<groupId>org.slf4j</groupId>
4549
<artifactId>slf4j-api</artifactId>

0 commit comments

Comments
 (0)