diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java index 0f2844bec..c04c455fa 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java @@ -33,7 +33,6 @@ import static org.apache.bifromq.metrics.TenantMetric.MqttQoS2DistBytes; import static org.apache.bifromq.metrics.TenantMetric.MqttQoS2ExternalLatency; import static org.apache.bifromq.metrics.TenantMetric.MqttQoS2IngressBytes; -import static org.apache.bifromq.metrics.TenantMetric.MqttTransientSubLatency; import static org.apache.bifromq.mqtt.handler.IMQTTProtocolHelper.SubResult.EXCEED_LIMIT; import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.packetId; import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.userSessionId; @@ -570,12 +569,10 @@ protected final CompletableFuture checkAndSubscri .setUserProperties(grantedUserProps); subTask.subId().ifPresent(optionBuilder::setSubId); TopicFilterOption tfOption = optionBuilder.build(); - Timer.Sample start = Timer.start(); return addFgTask(subTopicFilter(reqId, topicFilter, tfOption)) .thenComposeAsync(subResult -> { switch (subResult) { case OK, EXISTS -> { - start.stop(tenantMeter.timer(MqttTransientSubLatency)); if (!isSharedSubscription(topicFilter) && settings.retainEnabled && (tfOption.getRetainHandling() == SEND_AT_SUBSCRIBE || (subResult == IMQTTProtocolHelper.SubResult.OK diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java index af3f45d70..69c19ac5c 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java @@ -23,6 +23,7 @@ import static org.apache.bifromq.metrics.TenantMetric.MqttQoS1InternalLatency; import static org.apache.bifromq.metrics.TenantMetric.MqttQoS2InternalLatency; import static org.apache.bifromq.metrics.TenantMetric.MqttTransientSubCount; +import static org.apache.bifromq.metrics.TenantMetric.MqttTransientSubLatency; import static org.apache.bifromq.metrics.TenantMetric.MqttTransientUnsubCount; import static org.apache.bifromq.metrics.TenantMetric.MqttTransientUnsubLatency; import static org.apache.bifromq.mqtt.handler.IMQTTProtocolHelper.SubResult.EXCEED_LIMIT; @@ -181,10 +182,12 @@ protected final CompletableFuture subTopicFilter( memUsage.addAndGet(topicFilter.length()); memUsage.addAndGet(option.getSerializedSize()); } + Timer.Sample start = Timer.start(); return addMatchRecord(reqId, topicFilter, option.getIncarnation()) .thenApplyAsync(matchResult -> { switch (matchResult) { case OK -> { + start.stop(tenantMeter.timer(MqttTransientSubLatency)); if (prevOption == null) { return OK; } else {