diff --git a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java index 2333a3e10..a42016200 100644 --- a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java +++ b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java @@ -126,7 +126,7 @@ final class BaseKVStoreClient implements IBaseKVStoreClient { // key: storeId private final Map> lnrQueryPplns = Maps.newHashMap(); private final AtomicReference> effectiveRouter = - new AtomicReference<>(new TreeMap<>(BoundaryUtil::compare)); + new AtomicReference<>(Collections.unmodifiableNavigableMap(new TreeMap<>(BoundaryUtil::compare))); // key: serverId, val: storeId private volatile Map serverToStoreMap = Maps.newHashMap(); @@ -486,7 +486,7 @@ private boolean refreshRangeRoute(ClusterInfo clusterInfo) { } NavigableMap last = effectiveRouter.get(); if (!router.equals(last)) { - effectiveRouter.set(router); + effectiveRouter.set(Collections.unmodifiableNavigableMap(router)); return true; } return false; @@ -519,7 +519,7 @@ private void patchRouter(String leaderStoreId, KVRangeDescriptor latest) { } } patched.put(setting.boundary, setting); - effectiveRouter.compareAndSet(router, patched); + effectiveRouter.compareAndSet(router, Collections.unmodifiableNavigableMap(patched)); } } diff --git a/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java b/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java index 39b676283..fb206d974 100644 --- a/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java +++ b/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java @@ -14,7 +14,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.bifromq.inbox.store; @@ -22,6 +22,10 @@ import static org.apache.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary; import static org.apache.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY; +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; import org.apache.bifromq.basekv.client.IBaseKVStoreClient; import org.apache.bifromq.basekv.client.KVRangeSetting; import org.apache.bifromq.basekv.client.exception.BadRequestException; @@ -33,9 +37,6 @@ import org.apache.bifromq.inbox.storage.proto.GCReply; import org.apache.bifromq.inbox.storage.proto.GCRequest; import org.apache.bifromq.inbox.storage.proto.InboxServiceROCoProcInput; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import lombok.extern.slf4j.Slf4j; @Slf4j public class InboxStoreGCProcessor implements IInboxStoreGCProcessor { @@ -49,8 +50,8 @@ public InboxStoreGCProcessor(IBaseKVStoreClient storeClient, String localStoreId @Override public final CompletableFuture gc(long reqId, long now) { - Collection rangeSettingList = findByBoundary(FULL_BOUNDARY, - storeClient.latestEffectiveRouter()); + Collection rangeSettingList = Sets.newHashSet(findByBoundary(FULL_BOUNDARY, + storeClient.latestEffectiveRouter())); if (localServerId != null) { rangeSettingList.removeIf(rangeSetting -> !rangeSetting.leader.equals(localServerId)); }