-
Notifications
You must be signed in to change notification settings - Fork 916
Open
Description
When I run my springboot app for sometime I'm getting "Getting java.lang.OutOfMemoryError: Java heap space"
Seems that the size of "org.eclipse.paho.mqttv5.client.internal.ClientState" keep increasing.
Am I missing something ?
** Note that I'm setting options.setCleanStart(false); and options.setSessionExpiryInterval(4294967295L);
so if the application instance dies then the new instance continue as if its the same client so it consumes the message that were not consumed before.
` public void connect() {
try {
client = new MqttAsyncClient(brokerUrl, "my-client");
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password.getBytes());
options.setAutomaticReconnect(true);
options.setCleanStart(false); // Persistent session
options.setSessionExpiryInterval(4294967295L); // Max session expiry
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
executor.submit(() -> {
try {
String payload = new String(message.getPayload());
routeMessage(topic, payload);
} catch (Exception e) {
log.error("❌ Error processing MQTT message from topic {}: {}", topic, e.getMessage(), e);
}
});
}
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.warn("🔌 Disconnected from MQTT broker: {}", disconnectResponse.getReasonString());
}
@Override
public void mqttErrorOccurred(MqttException exception) {
log.error("❌ MQTT error occurred", exception);
}
@Override
public void deliveryComplete(IMqttToken token) {
// Not used for subscriptions
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("🔄 MQTT connection complete to {}", serverURI);
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
// Optional, unused
}
});
client.connect(options).waitForCompletion();
log.info("✅ Connected to MQTT broker at {}", brokerUrl);
for (String topic : topics) {
client.subscribe(topic, 2).waitForCompletion();
log.info("📡 Subscribed to topic: {}", topic);
}
} catch (MqttException e) {
log.error("❌ Failed to connect or subscribe to MQTT broker", e);
}
}`
Metadata
Metadata
Assignees
Labels
No labels