Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions base-kv/base-kv-meta-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.bifromq.basekv.metaservice;

import static org.apache.bifromq.basekv.metaservice.CRDTUtil.parseDescriptorKey;
import static org.apache.bifromq.basekv.metaservice.CRDTUtil.toLandscapeURI;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecrdt.core.api.CausalCRDTType;
import org.apache.bifromq.basecrdt.core.api.IMVReg;
import org.apache.bifromq.basecrdt.core.api.IORMap;
import org.apache.bifromq.basecrdt.core.api.MVRegOperation;
import org.apache.bifromq.basecrdt.core.api.ORMapOperation;
import org.apache.bifromq.basecrdt.proto.Replica;
import org.apache.bifromq.basecrdt.service.ICRDTService;
import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;
import org.apache.bifromq.basekv.proto.StoreKey;

@Slf4j
class BaseKVLandscapeCRDT implements IBaseKVLandscapeCRDT {
private final ICRDTService crdtService;
private final IORMap landscapeORMap;
private final BehaviorSubject<Map<StoreKey, KVRangeStoreDescriptor>> landscapeSubject = BehaviorSubject.create();
private final CompositeDisposable disposable = new CompositeDisposable();

BaseKVLandscapeCRDT(String clusterId, ICRDTService crdtService) {
this.crdtService = crdtService;
this.landscapeORMap = crdtService.host(toLandscapeURI(clusterId));
disposable.add(landscapeORMap.inflation()
.observeOn(IBaseKVMetaService.SHARED_SCHEDULER)
.map(this::buildLandscape)
.subscribe(landscapeSubject::onNext));
}

public Observable<Set<ByteString>> aliveReplicas() {
return crdtService.aliveReplicas(landscapeORMap.id().getUri())
.map(replicas -> replicas.stream().map(Replica::getId).collect(Collectors.toSet()));
}

public Observable<Map<StoreKey, KVRangeStoreDescriptor>> landscape() {
return landscapeSubject.distinctUntilChanged();
}

public Optional<KVRangeStoreDescriptor> getStoreDescriptor(String storeId) {
StoreKey storeKey = toDescriptorKey(storeId);
return buildLandscape(landscapeORMap.getMVReg(storeKey.toByteString()));
}

public CompletableFuture<Void> setStoreDescriptor(KVRangeStoreDescriptor descriptor) {
StoreKey storeKey = toDescriptorKey(descriptor.getId());
return landscapeORMap.execute(ORMapOperation.update(storeKey.toByteString())
.with(MVRegOperation.write(descriptor.toByteString())));
}

public CompletableFuture<Void> removeDescriptor(StoreKey key) {
return landscapeORMap.execute(ORMapOperation.remove(key.toByteString()).of(CausalCRDTType.mvreg));
}

public CompletableFuture<Void> removeDescriptor(String storeId) {
StoreKey storeKey = toDescriptorKey(storeId);
return landscapeORMap.execute(ORMapOperation.remove(storeKey.toByteString()).of(CausalCRDTType.mvreg));
}

public StoreKey toDescriptorKey(String storeId) {
return StoreKey.newBuilder()
.setStoreId(storeId)
.setReplicaId(landscapeORMap.id().getId())
.build();
}

public void stop() {
disposable.dispose();
crdtService.stopHosting(landscapeORMap.id().getUri()).join();
}

private Map<StoreKey, KVRangeStoreDescriptor> buildLandscape(long ts) {
Map<StoreKey, KVRangeStoreDescriptor> landscape = new HashMap<>();
landscapeORMap.keys().forEachRemaining(ormapKey -> buildLandscape(landscapeORMap.getMVReg(ormapKey.key()))
.ifPresent(descriptor -> landscape.put(parseDescriptorKey(ormapKey.key()), descriptor)));
return landscape;
}

private Optional<KVRangeStoreDescriptor> buildLandscape(IMVReg mvReg) {
List<KVRangeStoreDescriptor> l = Lists.newArrayList(Iterators.filter(Iterators.transform(mvReg.read(), b -> {
try {
return KVRangeStoreDescriptor.parseFrom(b);
} catch (InvalidProtocolBufferException e) {
log.error("Unable to parse KVRangeStoreDescriptor", e);
return null;
}
}), Objects::nonNull));
l.sort((a, b) -> Long.compareUnsigned(b.getHlc(), a.getHlc()));
return Optional.ofNullable(l.isEmpty() ? null : l.get(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.bifromq.basekv.metaservice;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;

class BaseKVLandscapeObserver implements IBaseKVLandscapeObserver {
private final BehaviorSubject<Map<String, KVRangeStoreDescriptor>> landscapeSubject =
BehaviorSubject.create();
private final CompositeDisposable disposable = new CompositeDisposable();

BaseKVLandscapeObserver(IBaseKVLandscapeCRDT landscapeCRDT) {
disposable.add(landscapeCRDT.landscape()
.map(descriptorMap -> {
Map<String, KVRangeStoreDescriptor> descriptorMapByStoreId = new HashMap<>();
descriptorMap.forEach((key, value) -> descriptorMapByStoreId.compute(key.getStoreId(), (k, v) -> {
if (v == null) {
return value;
}
return v.getHlc() > value.getHlc() ? v : value;
}));
return descriptorMapByStoreId;
})
.subscribe(landscapeSubject::onNext));
}

@Override
public final Observable<Map<String, KVRangeStoreDescriptor>> landscape() {
return landscapeSubject.distinctUntilChanged();
}

@Override
public Optional<KVRangeStoreDescriptor> getStoreDescriptor(String storeId) {
for (String key : landscapeSubject.getValue().keySet()) {
if (key.equals(storeId)) {
return Optional.of(landscapeSubject.getValue().get(key));
}
}
return Optional.empty();
}

@Override
public void stop() {
disposable.dispose();
landscapeSubject.onComplete();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.bifromq.basekv.metaservice;

import com.google.protobuf.ByteString;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;
import org.apache.bifromq.basekv.proto.StoreKey;

@Slf4j
class BaseKVLandscapeReporter implements IBaseKVLandscapeReporter {
private final String storeId;
private final IBaseKVLandscapeCRDT landscapeCRDT;
private final CompositeDisposable disposable = new CompositeDisposable();
private volatile KVRangeStoreDescriptor latestDescriptor;

BaseKVLandscapeReporter(String storeId, IBaseKVLandscapeCRDT landscapeCRDT) {
this.storeId = storeId;
this.landscapeCRDT = landscapeCRDT;
disposable.add(Observable.combineLatest(
landscapeCRDT.landscape(),
landscapeCRDT.aliveReplicas(),
(StoreDescriptorAndReplicas::new))
.observeOn(IBaseKVMetaService.SHARED_SCHEDULER)
.subscribe(this::houseKeep));
}

@Override
public CompletableFuture<Void> report(KVRangeStoreDescriptor descriptor) {
Optional<KVRangeStoreDescriptor> descriptorOnCRDT = landscapeCRDT.getStoreDescriptor(descriptor.getId());
if (descriptorOnCRDT.isEmpty() || !descriptorOnCRDT.get().equals(descriptor)) {
return landscapeCRDT.setStoreDescriptor(descriptor);
}
return CompletableFuture.completedFuture(null);
}

@Override
public void stop() {
landscapeCRDT.removeDescriptor(storeId).join();
disposable.dispose();
}

private void houseKeep(StoreDescriptorAndReplicas storeDescriptorAndReplicas) {
Map<StoreKey, KVRangeStoreDescriptor> storedDescriptors = storeDescriptorAndReplicas.descriptorMap;
Set<ByteString> aliveReplicas = storeDescriptorAndReplicas.replicaIds;
for (StoreKey storeKey : storedDescriptors.keySet()) {
if (!aliveReplicas.contains(storeKey.getReplicaId())) {
log.debug("store[{}] is not alive, remove its descriptor", storeKey.getStoreId());
landscapeCRDT.removeDescriptor(storeKey);
}
}
if (!storedDescriptors.containsKey(landscapeCRDT.toDescriptorKey(storeId))) {
KVRangeStoreDescriptor latestDescriptor = this.latestDescriptor;
if (latestDescriptor != null) {
landscapeCRDT.setStoreDescriptor(latestDescriptor);
}
}
}

private record StoreDescriptorAndReplicas(Map<StoreKey, KVRangeStoreDescriptor> descriptorMap,
Set<ByteString> replicaIds) {
}
}
Loading
Loading