Skip to content

Getting java.lang.OutOfMemoryError: Java heap space #1084

@Salman83

Description

@Salman83
Image

When I run my springboot app for sometime I'm getting "Getting java.lang.OutOfMemoryError: Java heap space"
Seems that the size of "org.eclipse.paho.mqttv5.client.internal.ClientState" keep increasing.

Am I missing something ?
** Note that I'm setting options.setCleanStart(false); and options.setSessionExpiryInterval(4294967295L);
so if the application instance dies then the new instance continue as if its the same client so it consumes the message that were not consumed before.

` public void connect() {
try {

        client = new MqttAsyncClient(brokerUrl, "my-client");

        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setUserName(username);
        options.setPassword(password.getBytes());
        options.setAutomaticReconnect(true);
        options.setCleanStart(false); // Persistent session
        options.setSessionExpiryInterval(4294967295L); // Max session expiry

        client.setCallback(new MqttCallback() {
            @Override
            public void messageArrived(String topic, MqttMessage message) {
                executor.submit(() -> {
                    try {
                        String payload = new String(message.getPayload());
                        routeMessage(topic, payload);
                    } catch (Exception e) {
                        log.error("❌ Error processing MQTT message from topic {}: {}", topic, e.getMessage(), e);
                    }
                });
            }

            @Override
            public void disconnected(MqttDisconnectResponse disconnectResponse) {
                log.warn("🔌 Disconnected from MQTT broker: {}", disconnectResponse.getReasonString());
            }

            @Override
            public void mqttErrorOccurred(MqttException exception) {
                log.error("❌ MQTT error occurred", exception);
            }

            @Override
            public void deliveryComplete(IMqttToken token) {
                // Not used for subscriptions
            }

            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                log.info("🔄 MQTT connection complete to {}", serverURI);
            }

            @Override
            public void authPacketArrived(int reasonCode, MqttProperties properties) {
                // Optional, unused
            }
        });

        client.connect(options).waitForCompletion();

        log.info("✅ Connected to MQTT broker at {}", brokerUrl);

        for (String topic : topics) {
            client.subscribe(topic, 2).waitForCompletion();
            log.info("📡 Subscribed to topic: {}", topic);
        }

    } catch (MqttException e) {
        log.error("❌ Failed to connect or subscribe to MQTT broker", e);
    }
}`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions