Skip to content

Commit 93eb435

Browse files
authored
[Feat] new runtime settings for limiting fanout scale to persistent sessions (#166)
* Tenant-level settings for limiting group fanout and persistent fanout * Report relevant events when throttling happens * Reduced memory consumption in data plane
1 parent 6551e30 commit 93eb435

File tree

34 files changed

+1242
-511
lines changed

34 files changed

+1242
-511
lines changed

bifromq-deliverer/src/main/java/org/apache/bifromq/deliverer/BatchDeliveryCall.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.deliverer;
@@ -26,34 +26,35 @@
2626
import static org.apache.bifromq.deliverer.DeliveryCallResult.OK;
2727
import static org.apache.bifromq.plugin.subbroker.TypeUtil.toMap;
2828

29-
import org.apache.bifromq.basescheduler.IBatchCall;
30-
import org.apache.bifromq.basescheduler.ICallTask;
31-
import org.apache.bifromq.dist.client.IDistClient;
32-
import org.apache.bifromq.plugin.subbroker.DeliveryPack;
33-
import org.apache.bifromq.plugin.subbroker.DeliveryPackage;
34-
import org.apache.bifromq.plugin.subbroker.DeliveryReply;
35-
import org.apache.bifromq.plugin.subbroker.DeliveryRequest;
36-
import org.apache.bifromq.plugin.subbroker.DeliveryResult;
37-
import org.apache.bifromq.plugin.subbroker.IDeliverer;
38-
import org.apache.bifromq.type.MatchInfo;
3929
import java.util.ArrayDeque;
4030
import java.util.Collections;
4131
import java.util.HashMap;
4232
import java.util.HashSet;
33+
import java.util.Iterator;
4334
import java.util.LinkedHashMap;
4435
import java.util.Map;
4536
import java.util.Queue;
4637
import java.util.Set;
4738
import java.util.concurrent.CompletableFuture;
4839
import lombok.extern.slf4j.Slf4j;
40+
import org.apache.bifromq.basescheduler.IBatchCall;
41+
import org.apache.bifromq.basescheduler.ICallTask;
42+
import org.apache.bifromq.dist.client.IDistClient;
43+
import org.apache.bifromq.plugin.subbroker.DeliveryPack;
44+
import org.apache.bifromq.plugin.subbroker.DeliveryPackage;
45+
import org.apache.bifromq.plugin.subbroker.DeliveryReply;
46+
import org.apache.bifromq.plugin.subbroker.DeliveryRequest;
47+
import org.apache.bifromq.plugin.subbroker.DeliveryResult;
48+
import org.apache.bifromq.plugin.subbroker.IDeliverer;
49+
import org.apache.bifromq.type.MatchInfo;
4950

5051
@Slf4j
5152
class BatchDeliveryCall implements IBatchCall<DeliveryCall, DeliveryCallResult, DelivererKey> {
5253
private final IDistClient distClient;
5354
private final IDeliverer deliverer;
5455
private final DelivererKey batcherKey;
5556
private final Queue<ICallTask<DeliveryCall, DeliveryCallResult, DelivererKey>> tasks = new ArrayDeque<>(128);
56-
private Map<String, Map<TopicMessagePackHolder, Set<MatchInfo>>> batch = new HashMap<>(128);
57+
private final Map<String, Map<TopicMessagePackHolder, Set<MatchInfo>>> batch = new HashMap<>(128);
5758

5859
BatchDeliveryCall(IDistClient distClient, IDeliverer deliverer, DelivererKey batcherKey) {
5960
this.distClient = distClient;
@@ -63,8 +64,6 @@ class BatchDeliveryCall implements IBatchCall<DeliveryCall, DeliveryCallResult,
6364

6465
@Override
6566
public void reset() {
66-
batch = new HashMap<>(128);
67-
tasks.clear();
6867
}
6968

7069
@Override
@@ -78,15 +77,20 @@ public void add(ICallTask<DeliveryCall, DeliveryCallResult, DelivererKey> callTa
7877
@Override
7978
public CompletableFuture<Void> execute() {
8079
DeliveryRequest.Builder requestBuilder = DeliveryRequest.newBuilder();
81-
batch.forEach((tenantId, pack) -> {
80+
Iterator<Map.Entry<String, Map<TopicMessagePackHolder, Set<MatchInfo>>>> itr = batch.entrySet().iterator();
81+
while (itr.hasNext()) {
82+
Map.Entry<String, Map<TopicMessagePackHolder, Set<MatchInfo>>> entry = itr.next();
83+
String tenantId = entry.getKey();
84+
Map<TopicMessagePackHolder, Set<MatchInfo>> pack = entry.getValue();
8285
DeliveryPackage.Builder packageBuilder = DeliveryPackage.newBuilder();
8386
pack.forEach((msgPackWrapper, matchInfos) -> {
8487
DeliveryPack.Builder packBuilder = DeliveryPack.newBuilder().setMessagePack(msgPackWrapper.messagePack);
8588
matchInfos.forEach(packBuilder::addMatchInfo);
8689
packageBuilder.addPack(packBuilder.build());
8790
});
8891
requestBuilder.putPackage(tenantId, packageBuilder.build());
89-
});
92+
itr.remove();
93+
}
9094
DeliveryRequest request = requestBuilder.build();
9195
return execute(request);
9296
}

bifromq-dist/bifromq-dist-client/src/main/java/org/apache/bifromq/dist/client/scheduler/BatchPubCall.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,17 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.client.scheduler;
2121

22+
import java.util.ArrayDeque;
23+
import java.util.HashMap;
24+
import java.util.Iterator;
25+
import java.util.Map;
26+
import java.util.Queue;
27+
import java.util.concurrent.CompletableFuture;
2228
import org.apache.bifromq.base.util.AsyncRetry;
2329
import org.apache.bifromq.base.util.exception.NeedRetryException;
2430
import org.apache.bifromq.baserpc.client.IRPCClient;
@@ -29,17 +35,12 @@
2935
import org.apache.bifromq.dist.rpc.proto.DistRequest;
3036
import org.apache.bifromq.type.ClientInfo;
3137
import org.apache.bifromq.type.PublisherMessagePack;
32-
import java.util.ArrayDeque;
33-
import java.util.HashMap;
34-
import java.util.Map;
35-
import java.util.Queue;
36-
import java.util.concurrent.CompletableFuture;
3738

3839
class BatchPubCall implements IBatchCall<PubRequest, PubResult, PubCallBatcherKey> {
3940
private final IRPCClient.IRequestPipeline<DistRequest, DistReply> ppln;
4041
private final Queue<ICallTask<PubRequest, PubResult, PubCallBatcherKey>> tasks = new ArrayDeque<>(64);
4142
private final long retryTimeoutNanos;
42-
private Map<ClientInfo, Map<String, PublisherMessagePack.TopicPack.Builder>> clientMsgPack = new HashMap<>(128);
43+
private final Map<ClientInfo, Map<String, PublisherMessagePack.TopicPack.Builder>> clientMsgPack = new HashMap<>(128);
4344

4445
BatchPubCall(IRPCClient.IRequestPipeline<DistRequest, DistReply> ppln, long retryTimeoutNanos) {
4546
this.ppln = ppln;
@@ -48,8 +49,6 @@ class BatchPubCall implements IBatchCall<PubRequest, PubResult, PubCallBatcherKe
4849

4950
@Override
5051
public void reset() {
51-
clientMsgPack = new HashMap<>(128);
52-
tasks.clear();
5352
}
5453

5554
@Override
@@ -64,13 +63,20 @@ public void add(ICallTask<PubRequest, PubResult, PubCallBatcherKey> callTask) {
6463
@Override
6564
public CompletableFuture<Void> execute() {
6665
DistRequest.Builder requestBuilder = DistRequest.newBuilder().setReqId(System.nanoTime());
67-
clientMsgPack.forEach((k, v) -> {
68-
PublisherMessagePack.Builder senderMsgPackBuilder = PublisherMessagePack.newBuilder().setPublisher(k);
69-
for (PublisherMessagePack.TopicPack.Builder packBuilder : v.values()) {
66+
Iterator<Map.Entry<ClientInfo, Map<String, PublisherMessagePack.TopicPack.Builder>>> itr =
67+
clientMsgPack.entrySet().iterator();
68+
while (itr.hasNext()) {
69+
Map.Entry<ClientInfo, Map<String, PublisherMessagePack.TopicPack.Builder>> entry = itr.next();
70+
ClientInfo publisher = entry.getKey();
71+
Map<String, PublisherMessagePack.TopicPack.Builder> topicPackMap = entry.getValue();
72+
PublisherMessagePack.Builder senderMsgPackBuilder = PublisherMessagePack.newBuilder()
73+
.setPublisher(publisher);
74+
for (PublisherMessagePack.TopicPack.Builder packBuilder : topicPackMap.values()) {
7075
senderMsgPackBuilder.addMessagePack(packBuilder);
7176
}
7277
requestBuilder.addMessages(senderMsgPackBuilder.build());
73-
});
78+
itr.remove();
79+
}
7480
DistRequest request = requestBuilder.build();
7581
return AsyncRetry.exec(() -> execute(request), retryTimeoutNanos);
7682
}

bifromq-dist/bifromq-dist-server/src/main/java/org/apache/bifromq/dist/server/scheduler/BatchDistServerCall.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class BatchDistServerCall implements IBatchCall<TenantPubRequest, DistServerCall
7474
private final String orderKey;
7575
private final Queue<ICallTask<TenantPubRequest, DistServerCallResult, DistServerCallBatcherKey>> tasks =
7676
new ArrayDeque<>();
77-
private Map<String, Map<ClientInfo, Iterable<Message>>> batch = new HashMap<>(128);
77+
private final Map<String, Map<ClientInfo, Iterable<Message>>> batch = new HashMap<>(128);
7878

7979
BatchDistServerCall(IBaseKVStoreClient distWorkerClient, DistServerCallBatcherKey batcherKey) {
8080
this.distWorkerClient = distWorkerClient;
@@ -99,7 +99,7 @@ public void add(ICallTask<TenantPubRequest, DistServerCallResult, DistServerCall
9999

100100
@Override
101101
public void reset() {
102-
batch = new HashMap<>(128);
102+
batch.clear();
103103
}
104104

105105
@Override

bifromq-dist/bifromq-dist-server/src/test/java/org/apache/bifromq/dist/server/DistServiceTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import static org.mockito.ArgumentMatchers.anyString;
2525
import static org.mockito.Mockito.when;
2626

27+
import java.time.Duration;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import lombok.extern.slf4j.Slf4j;
2731
import org.apache.bifromq.basecluster.AgentHostOptions;
2832
import org.apache.bifromq.basecluster.IAgentHost;
2933
import org.apache.bifromq.basecrdt.service.CRDTServiceOptions;
@@ -41,16 +45,12 @@
4145
import org.apache.bifromq.dist.worker.IDistWorker;
4246
import org.apache.bifromq.plugin.eventcollector.Event;
4347
import org.apache.bifromq.plugin.eventcollector.IEventCollector;
48+
import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
4449
import org.apache.bifromq.plugin.settingprovider.ISettingProvider;
4550
import org.apache.bifromq.plugin.settingprovider.Setting;
4651
import org.apache.bifromq.plugin.subbroker.IDeliverer;
4752
import org.apache.bifromq.plugin.subbroker.ISubBroker;
4853
import org.apache.bifromq.plugin.subbroker.ISubBrokerManager;
49-
import org.apache.bifromq.plugin.resourcethrottler.IResourceThrottler;
50-
import java.time.Duration;
51-
import java.util.concurrent.Executors;
52-
import java.util.concurrent.ScheduledExecutorService;
53-
import lombok.extern.slf4j.Slf4j;
5454
import org.mockito.Mock;
5555
import org.mockito.MockitoAnnotations;
5656
import org.testng.annotations.AfterClass;
@@ -134,6 +134,7 @@ public void setup() {
134134
.bgTaskExecutor(bgTaskExecutor)
135135
.storeOptions(kvRangeStoreOptions)
136136
.subBrokerManager(subBrokerMgr)
137+
.settingProvider(settingProvider)
137138
.build();
138139
distServer = IDistServer.builder()
139140
.rpcServerBuilder(rpcServerBuilder)

bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/GroupMatching.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,17 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.worker.schema;
2121

22-
import org.apache.bifromq.type.RouteMatcher;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.stream.Collectors;
2625
import lombok.EqualsAndHashCode;
2726
import lombok.ToString;
27+
import org.apache.bifromq.type.RouteMatcher;
2828

2929
/**
3030
* Represent a group matching route.
@@ -38,7 +38,6 @@ public final class GroupMatching extends Matching {
3838
public final List<NormalMatching> receiverList;
3939
private final Map<String, Long> receivers;
4040

41-
4241
GroupMatching(String tenantId, RouteMatcher matcher, Map<String, Long> members) {
4342
super(tenantId, matcher);
4443
assert matcher.getType() != RouteMatcher.Type.Normal;

bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/Matching.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.worker.schema;
2121

22-
import org.apache.bifromq.type.RouteMatcher;
2322
import lombok.EqualsAndHashCode;
2423
import lombok.ToString;
24+
import org.apache.bifromq.type.RouteMatcher;
2525

2626
/**
2727
* The abstract class of matching route.

bifromq-dist/bifromq-dist-worker-schema/src/main/java/org/apache/bifromq/dist/worker/schema/NormalMatching.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.worker.schema;
2121

22-
import org.apache.bifromq.type.MatchInfo;
23-
import org.apache.bifromq.type.RouteMatcher;
2422
import lombok.EqualsAndHashCode;
2523
import lombok.ToString;
24+
import org.apache.bifromq.type.MatchInfo;
25+
import org.apache.bifromq.type.RouteMatcher;
2626

2727
/**
2828
* Represent a normal matching route.

bifromq-dist/bifromq-dist-worker/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@
7878
<groupId>org.apache.bifromq</groupId>
7979
<artifactId>bifromq-plugin-event-collector-helper</artifactId>
8080
</dependency>
81+
<dependency>
82+
<groupId>org.apache.bifromq</groupId>
83+
<artifactId>bifromq-plugin-setting-provider</artifactId>
84+
</dependency>
8185
<dependency>
8286
<groupId>org.apache.bifromq</groupId>
8387
<artifactId>bifromq-dist-client</artifactId>

0 commit comments

Comments
 (0)