Skip to content

Commit adc25d7

Browse files
authored
Merge branch 'main' into main
2 parents 10acbf9 + 6d1e6f6 commit adc25d7

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

bifromq-dist/bifromq-dist-worker/src/main/java/org/apache/bifromq/dist/worker/DeliverExecutorGroup.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.github.benmanes.caffeine.cache.Scheduler;
4747
import com.google.common.base.Charsets;
4848
import java.util.HashMap;
49-
import java.util.List;
5049
import java.util.Map;
5150
import java.util.Set;
5251
import java.util.concurrent.ThreadLocalRandom;
@@ -55,7 +54,7 @@
5554

5655
@Slf4j
5756
class DeliverExecutorGroup implements IDeliverExecutorGroup {
58-
// OuterCacheKey: OrderedSharedMatchingKey(<tenantId>, <escapedTopicFilter>)
57+
// OuterCacheKey: OrderedSharedMatchingKey(<tenantId>, <mqttTopicFilter>)
5958
// InnerCacheKey: ClientInfo(<tenantId>, <type>, <metadata>)
6059
private final LoadingCache<OrderedSharedMatchingKey, Cache<ClientInfo, NormalMatching>> orderedSharedMatching;
6160
private final int inlineFanOutThreshold = DistInlineFanOutThreshold.INSTANCE.get();
@@ -161,7 +160,7 @@ public void refreshOrderedShareSubRoutes(String tenantId, RouteMatcher routeMatc
161160
log.debug("Refresh ordered shared sub routes: tenantId={}, sharedTopicFilter={}", tenantId, routeMatcher);
162161
}
163162
orderedSharedMatching.invalidate(
164-
new OrderedSharedMatchingKey(tenantId, routeMatcher.getFilterLevelList()));
163+
new OrderedSharedMatchingKey(tenantId, routeMatcher.getMqttTopicFilter()));
165164
}
166165

167166
private void prepareSend(Matching matching, TopicMessagePackHolder msgPackHolder, boolean inline) {
@@ -180,7 +179,7 @@ private void prepareSend(Matching matching, TopicMessagePackHolder msgPackHolder
180179
ClientInfo sender = publisherPack.getPublisher();
181180
NormalMatching matchedInbox = orderedSharedMatching
182181
.get(new OrderedSharedMatchingKey(groupMatching.tenantId(),
183-
groupMatching.matcher.getFilterLevelList()))
182+
groupMatching.matcher.getMqttTopicFilter()))
184183
.get(sender, senderInfo -> {
185184
RendezvousHash<ClientInfo, NormalMatching> hash =
186185
RendezvousHash.<ClientInfo, NormalMatching>builder()
@@ -217,6 +216,6 @@ private void send(NormalMatching route, TopicMessagePackHolder msgPackHolder, bo
217216
fanoutExecutors[idx].submit(route, msgPackHolder, inline);
218217
}
219218

220-
private record OrderedSharedMatchingKey(String tenantId, List<String> filterLevels) {
219+
private record OrderedSharedMatchingKey(String tenantId, String mqttTopicFilter) {
221220
}
222221
}

bifromq-dist/bifromq-dist-worker/src/test/java/org/apache/bifromq/dist/worker/DistQoS0Test.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,4 +524,38 @@ public void testProbeAndSeek() {
524524
assertEquals(reply.getResultMap().get(tenantA).getFanoutMap().getOrDefault("TopicB", 0).intValue(), 0);
525525
unmatch(tenantA, "TopicA/#", InboxService, "inbox1", "batch1");
526526
}
527+
528+
@Test(groups = "integration")
529+
public void testOrderedShareWithGroups() {
530+
when(mqttBroker.open("batch1")).thenReturn(writer1);
531+
532+
match(tenantA, "$oshare/group1/#", MqttBroker, "inbox1", "batch1");
533+
match(tenantA, "$oshare/group2/#", MqttBroker, "inbox1", "batch1");
534+
Set<String> set = Set.of("$oshare/group1/#", "$oshare/group2/#");
535+
536+
await().until(() -> {
537+
clearInvocations(writer1);
538+
Set<String> topicFilterSet = new HashSet<>();
539+
dist(tenantA, AT_MOST_ONCE, "/a/b/c", copyFromUtf8("Hello"), "orderKey1");
540+
try {
541+
ArgumentCaptor<DeliveryRequest> captor = ArgumentCaptor.forClass(DeliveryRequest.class);
542+
verify(writer1, timeout(1000).times(2)).deliver(captor.capture());
543+
captor.getAllValues().forEach(req -> {
544+
DeliveryPackage packs = req.getPackageMap().get(tenantA);
545+
for (DeliveryPack pack : packs.getPackList()) {
546+
for (MatchInfo matchInfo : pack.getMatchInfoList()) {
547+
topicFilterSet.add(matchInfo.getMatcher().getMqttTopicFilter());
548+
}
549+
}
550+
});
551+
assertEquals(set, topicFilterSet);
552+
return true;
553+
} catch (Throwable e) {
554+
return false;
555+
}
556+
});
557+
558+
unmatch(tenantA, "$oshare/group1/#", MqttBroker, "inbox1", "batch1");
559+
unmatch(tenantA, "$oshare/group2/#", MqttBroker, "inbox1", "batch1");
560+
}
527561
}

0 commit comments

Comments
 (0)