Skip to content

Commit 10f4c35

Browse files
committed
1. Support controlling keep alive at tenant level via Settings
2. Support disabling keep alive via '0' according to the mqtt protocols
1 parent d66a989 commit 10f4c35

File tree

17 files changed

+267
-395
lines changed

17 files changed

+267
-395
lines changed

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/MQTTBroker.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ public final void start() {
9898
.retainClient(builder.retainClient)
9999
.sessionDictClient(builder.sessionDictClient)
100100
.clientBalancer(builder.clientBalancer)
101-
.defaultKeepAliveTimeSeconds(builder.defaultKeepAliveSeconds)
102101
.build();
103102
log.info("Starting MQTT broker");
104103
log.debug("Starting server channel");

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/MQTTBrokerBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class MQTTBrokerBuilder implements IMQTTBrokerBuilder {
4343
int connectTimeoutSeconds = 20;
4444
int connectRateLimit = 1000;
4545
int disconnectRate = 1000;
46-
int defaultKeepAliveSeconds = 5 * 60; // 5 min
4746
long writeLimit = 512 * 1024;
4847
long readLimit = 512 * 1024;
4948
int maxBytesInMessage = 256 * 1024;

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTConnectHandler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@
7777
@Slf4j
7878
public abstract class MQTTConnectHandler extends ChannelDuplexHandler {
7979
protected static final boolean SANITY_CHECK = SanityCheckMqttUtf8String.INSTANCE.get();
80-
private static final int MIN_CLIENT_KEEP_ALIVE_DURATION = 5;
81-
private static final int MAX_CLIENT_KEEP_ALIVE_DURATION = 2 * 60 * 60;
8280
private final FutureTracker cancellableTasks = new FutureTracker();
8381
private ChannelHandlerContext ctx;
8482
private MQTTSessionContext sessionCtx;
@@ -156,7 +154,8 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) {
156154
return CompletableFuture.completedFuture(null);
157155
}
158156
LWT willMessage = connMsg.variableHeader().isWillFlag() ? getWillMessage(connMsg) : null;
159-
int keepAliveSeconds = keepAliveSeconds(connMsg.variableHeader().keepAliveTimeSeconds());
157+
int keepAliveSeconds = keepAliveSeconds(connMsg.variableHeader().keepAliveTimeSeconds(),
158+
settings);
160159
String userSessionId = userSessionId(clientInfo);
161160
String requestClientId = connMsg.payload().clientIdentifier();
162161
if (isCleanSession(connMsg, settings)) {
@@ -525,13 +524,10 @@ protected void handleGoAway(GoAway goAway) {
525524

526525
protected abstract int maxPacketSize(MqttConnectMessage connMsg, TenantSettings settings);
527526

528-
private int keepAliveSeconds(int requestKeepAliveSeconds) {
529-
if (requestKeepAliveSeconds == 0) {
530-
// 20 mins the default keep alive duration
531-
requestKeepAliveSeconds = sessionCtx.defaultKeepAliveTimeSeconds;
527+
private int keepAliveSeconds(int requestKeepAliveSeconds, TenantSettings settings) {
528+
if (requestKeepAliveSeconds > 0) {
529+
requestKeepAliveSeconds = Math.max(settings.minKeepAliveSeconds, requestKeepAliveSeconds);
532530
}
533-
requestKeepAliveSeconds = Math.max(MIN_CLIENT_KEEP_ALIVE_DURATION, requestKeepAliveSeconds);
534-
requestKeepAliveSeconds = Math.min(requestKeepAliveSeconds, MAX_CLIENT_KEEP_ALIVE_DURATION);
535531
return requestKeepAliveSeconds;
536532
}
537533

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTSessionHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,10 @@ public void handlerAdded(ChannelHandlerContext ctx) {
373373
}
374374
});
375375
lastActiveAtNanos = sessionCtx.nanoTime();
376-
idleTimeoutTask = ctx.executor()
377-
.scheduleAtFixedRate(this::checkIdle, idleTimeoutNanos, idleTimeoutNanos, TimeUnit.NANOSECONDS);
376+
if (idleTimeoutNanos > 0) {
377+
idleTimeoutTask = ctx.executor()
378+
.scheduleAtFixedRate(this::checkIdle, idleTimeoutNanos, idleTimeoutNanos, TimeUnit.NANOSECONDS);
379+
}
378380
scheduleRedirectCheck();
379381
onInitialized.whenComplete((v, e) -> tenantMeter.recordCount(MqttConnectCount));
380382
memUsage.addAndGet(estMemSize());
@@ -985,6 +987,7 @@ protected final void sendQoS0SubMessage(SubMessage msg) {
985987
writeAndFlush(pubMsg).addListener(f -> {
986988
memUsage.addAndGet(-msgSize);
987989
if (f.isSuccess()) {
990+
lastActiveAtNanos = sessionCtx.nanoTime();
988991
if (settings.debugMode) {
989992
eventCollector.report(getLocal(QoS0Pushed.class)
990993
.isRetain(msg.isRetain())

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/TenantSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static com.baidu.bifromq.plugin.settingprovider.Setting.MaxTopicLevels;
3131
import static com.baidu.bifromq.plugin.settingprovider.Setting.MaxUserPayloadBytes;
3232
import static com.baidu.bifromq.plugin.settingprovider.Setting.MaximumQoS;
33+
import static com.baidu.bifromq.plugin.settingprovider.Setting.MinKeepAliveSeconds;
3334
import static com.baidu.bifromq.plugin.settingprovider.Setting.MsgPubPerSec;
3435
import static com.baidu.bifromq.plugin.settingprovider.Setting.OutBoundBandWidth;
3536
import static com.baidu.bifromq.plugin.settingprovider.Setting.PayloadFormatValidationEnabled;
@@ -58,6 +59,7 @@ public class TenantSettings {
5859
public final boolean subscriptionIdentifierEnabled;
5960
public final boolean sharedSubscriptionEnabled;
6061
public final QoS maxQoS;
62+
public final int minKeepAliveSeconds;
6163
public final int maxSEI;
6264
public final int maxTopicLevelLength;
6365
public final int maxTopicLevels;
@@ -89,6 +91,7 @@ public TenantSettings(String tenantId, ISettingProvider provider) {
8991
subscriptionIdentifierEnabled = provider.provide(SubscriptionIdentifierEnabled, tenantId);
9092
sharedSubscriptionEnabled = provider.provide(SharedSubscriptionEnabled, tenantId);
9193
maxQoS = QoS.forNumber(provider.provide(MaximumQoS, tenantId));
94+
minKeepAliveSeconds = provider.provide(MinKeepAliveSeconds, tenantId);
9295
maxSEI = provider.provide(MaxSessionExpirySeconds, tenantId);
9396
maxTopicLevelLength = provider.provide(MaxTopicLevelLength, tenantId);
9497
maxTopicLevels = provider.provide(MaxTopicLevels, tenantId);

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/MQTTSessionContext.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
@Slf4j
3939
public final class MQTTSessionContext {
40-
private final IAuthProvider authProvider;
4140
public final ILocalSessionRegistry localSessionRegistry;
4241
public final ILocalDistService localDistService;
4342
public final IEventCollector eventCollector;
@@ -49,7 +48,7 @@ public final class MQTTSessionContext {
4948
public final ISessionDictClient sessionDictClient;
5049
public final IClientBalancer clientBalancer;
5150
public final String serverId;
52-
public final int defaultKeepAliveTimeSeconds;
51+
private final IAuthProvider authProvider;
5352
private final Ticker ticker;
5453
private final FutureTracker futureTracker = new FutureTracker();
5554
private final TenantGauge tenantTransientSubNumGauge;
@@ -65,7 +64,6 @@ public final class MQTTSessionContext {
6564
IRetainClient retainClient,
6665
ISessionDictClient sessionDictClient,
6766
IClientBalancer clientBalancer,
68-
int defaultKeepAliveTimeSeconds,
6967
IEventCollector eventCollector,
7068
IResourceThrottler resourceThrottler,
7169
ISettingProvider settingProvider,
@@ -82,7 +80,6 @@ public final class MQTTSessionContext {
8280
this.retainClient = retainClient;
8381
this.sessionDictClient = sessionDictClient;
8482
this.clientBalancer = clientBalancer;
85-
this.defaultKeepAliveTimeSeconds = defaultKeepAliveTimeSeconds;
8683
this.ticker = ticker == null ? Ticker.systemTicker() : ticker;
8784
this.tenantTransientSubNumGauge = new TenantGauge(MqttTransientSubCountGauge);
8885
this.tenantMemGauge = new TenantGauge(MqttSessionWorkingMemoryGauge);

bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/BaseSessionHandlerTest.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -181,21 +181,20 @@ public void setup(Method method) {
181181
when(oomCondition.meet()).thenReturn(false);
182182
when(clientBalancer.needRedirect(any())).thenReturn(Optional.empty());
183183
sessionContext = MQTTSessionContext.builder()
184-
.serverId(serverId)
185-
.ticker(testTicker)
186-
.defaultKeepAliveTimeSeconds(2)
187-
.distClient(distClient)
188-
.retainClient(retainClient)
189-
.authProvider(authProvider)
190-
.localDistService(localDistService)
191-
.localSessionRegistry(localSessionRegistry)
192-
.sessionDictClient(sessionDictClient)
193-
.clientBalancer(clientBalancer)
194-
.inboxClient(inboxClient)
195-
.eventCollector(eventCollector)
196-
.resourceThrottler(resourceThrottler)
197-
.settingProvider(settingProvider)
198-
.build();
184+
.serverId(serverId)
185+
.ticker(testTicker)
186+
.distClient(distClient)
187+
.retainClient(retainClient)
188+
.authProvider(authProvider)
189+
.localDistService(localDistService)
190+
.localSessionRegistry(localSessionRegistry)
191+
.sessionDictClient(sessionDictClient)
192+
.clientBalancer(clientBalancer)
193+
.inboxClient(inboxClient)
194+
.eventCollector(eventCollector)
195+
.resourceThrottler(resourceThrottler)
196+
.settingProvider(settingProvider)
197+
.build();
199198
mockSettings();
200199
}
201200

@@ -225,8 +224,18 @@ protected void verifyEvent(EventType... types) {
225224

226225
protected void mockSettings() {
227226
Mockito.lenient().when(resourceThrottler.hasResource(anyString(), any())).thenReturn(true);
228-
Mockito.lenient().when(settingProvider.provide(any(Setting.class), anyString())).thenAnswer(
229-
invocation -> ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1)));
227+
Mockito.lenient().when(settingProvider.provide(any(Setting.class), anyString()))
228+
.thenAnswer(invocation -> {
229+
Setting setting = invocation.getArgument(0);
230+
switch (setting) {
231+
case MinKeepAliveSeconds -> {
232+
return 2;
233+
}
234+
default -> {
235+
return ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1));
236+
}
237+
}
238+
});
230239
Mockito.lenient().when(settingProvider.provide(eq(InBoundBandWidth), anyString())).thenReturn(51200 * 1024L);
231240
Mockito.lenient().when(settingProvider.provide(eq(OutBoundBandWidth), anyString())).thenReturn(51200 * 1024L);
232241
Mockito.lenient().when(settingProvider.provide(eq(ForceTransient), anyString())).thenReturn(false);

bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/MQTTConnectHandlerTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ public class MQTTConnectHandlerTest extends MockableTest {
5151
private final int keepAlive = 2;
5252
private final String remoteIp = "127.0.0.1";
5353
private final int remotePort = 8888;
54-
private final ISettingProvider settingProvider = Setting::current;
5554
private MQTTConnectHandler connectHandler;
5655
private EmbeddedChannel channel;
56+
57+
@Mock
58+
private ISettingProvider settingProvider;
5759
@Mock
5860
private IInboxClient inboxClient;
5961
@Mock
@@ -65,9 +67,20 @@ public class MQTTConnectHandlerTest extends MockableTest {
6567
@BeforeMethod(alwaysRun = true)
6668
public void setup() {
6769
connectHandler = Mockito.spy(MQTTConnectHandler.class);
70+
when(settingProvider.provide(any(Setting.class), anyString()))
71+
.thenAnswer(invocation -> {
72+
Setting setting = invocation.getArgument(0);
73+
switch (setting) {
74+
case MinKeepAliveSeconds -> {
75+
return keepAlive;
76+
}
77+
default -> {
78+
return ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1));
79+
}
80+
}
81+
});
6882
sessionContext = MQTTSessionContext.builder()
6983
.serverId(serverId)
70-
.defaultKeepAliveTimeSeconds(keepAlive)
7184
.inboxClient(inboxClient)
7285
.eventCollector(eventCollector)
7386
.resourceThrottler(resourceThrottler)

bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/BaseMQTTTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public abstract class BaseMQTTTest {
166166
public void setup() {
167167
closeable = MockitoAnnotations.openMocks(this);
168168
when(clientBalancer.needRedirect(any())).thenReturn(Optional.empty());
169+
169170
testTicker = new TestTicker();
170171
sessionRegistry = new LocalSessionRegistry();
171172
ILocalTopicRouter router = new LocalTopicRouter(serverId, distClient);
@@ -183,7 +184,6 @@ public void setup() {
183184
.clientBalancer(clientBalancer)
184185
.localSessionRegistry(sessionRegistry)
185186
.localDistService(distService)
186-
.defaultKeepAliveTimeSeconds(300)
187187
.ticker(testTicker)
188188
.serverId(serverId)
189189
.build();
@@ -220,8 +220,18 @@ protected void initChannel(EmbeddedChannel embeddedChannel) {
220220

221221
protected void mockSettings() {
222222
Mockito.lenient().when(resourceThrottler.hasResource(anyString(), any())).thenReturn(true);
223-
Mockito.lenient().when(settingProvider.provide(any(Setting.class), anyString())).thenAnswer(
224-
invocation -> ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1)));
223+
Mockito.lenient().when(settingProvider.provide(any(Setting.class), anyString()))
224+
.thenAnswer(invocation -> {
225+
Setting setting = invocation.getArgument(0);
226+
switch (setting) {
227+
case MinKeepAliveSeconds -> {
228+
return 2;
229+
}
230+
default -> {
231+
return ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1));
232+
}
233+
}
234+
});
225235
Mockito.lenient().when(settingProvider.provide(eq(InBoundBandWidth), anyString())).thenReturn(51200 * 1024L);
226236
Mockito.lenient().when(settingProvider.provide(eq(OutBoundBandWidth), anyString())).thenReturn(51200 * 1024L);
227237
Mockito.lenient().when(settingProvider.provide(eq(ForceTransient), anyString())).thenReturn(false);

bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3ConnectHandlerTest.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,20 @@ public void setup() {
8787
connectHandler = new MQTT3ConnectHandler();
8888
when(resourceThrottler.hasResource(anyString(), any())).thenReturn(true);
8989
when(clientBalancer.needRedirect(any())).thenReturn(Optional.empty());
90-
when(settingProvider.provide(any(Setting.class), anyString())).thenAnswer(
91-
invocation -> ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1)));
90+
when(settingProvider.provide(any(Setting.class), anyString()))
91+
.thenAnswer(invocation -> {
92+
Setting setting = invocation.getArgument(0);
93+
switch (setting) {
94+
case MinKeepAliveSeconds -> {
95+
return keepAlive;
96+
}
97+
default -> {
98+
return ((Setting) invocation.getArgument(0)).current(invocation.getArgument(1));
99+
}
100+
}
101+
});
92102
sessionContext = MQTTSessionContext.builder()
93103
.serverId(serverId)
94-
.defaultKeepAliveTimeSeconds(keepAlive)
95104
.authProvider(authProvider)
96105
.inboxClient(inboxClient)
97106
.eventCollector(eventCollector)

0 commit comments

Comments
 (0)