Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/native/mqtt5_packets.c
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ struct aws_mqtt5_packet_publish_view_java_jni *aws_mqtt5_packet_publish_view_cre
mqtt5_publish_packet_properties.publish_payload_field_id,
s_publish_packet_string,
"payload",
&java_packet->correlation_data_buf,
&java_packet->payload_buf,
&java_packet->payload_cursor,
true,
&was_value_set) == AWS_OP_ERR) {
Expand Down
105 changes: 105 additions & 0 deletions src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,111 @@ public void Op_DirectPacketBuilders() throws Exception {
CrtResource.waitForNoResources();
}

/* Full Publish Packet Tests */
private void doPublishWithFullPacket() {
try (TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
TlsContext tlsContext = new TlsContext(tlsOptions)) {

String testUUID = UUID.randomUUID().toString();
String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
String testResponseTopic = "test/MQTT5_Binding_Java_Response_" + testUUID;
String testContentType = "application/json";
byte[] testPayload = "{\"message\": \"Hello World\"}".getBytes();
byte[] testCorrelationData = "correlation-data-12345".getBytes();

Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
LifecycleEvents_Futured events = new LifecycleEvents_Futured();
builder.withLifecycleEvents(events);
builder.withTlsContext(tlsContext);

PublishEvents_Futured publishEvents = new PublishEvents_Futured();
builder.withPublishEvents(publishEvents);

// Build a publish packet with all available fields populated
PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder();
publishPacketBuilder.withTopic(testTopic);
publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE);
publishPacketBuilder.withPayload(testPayload);
publishPacketBuilder.withRetain(false);
publishPacketBuilder.withPayloadFormat(PublishPacket.PayloadFormatIndicator.UTF8);
publishPacketBuilder.withMessageExpiryIntervalSeconds(60L);
publishPacketBuilder.withResponseTopic(testResponseTopic);
publishPacketBuilder.withCorrelationData(testCorrelationData);
publishPacketBuilder.withContentType(testContentType);

ArrayList<UserProperty> userProperties = new ArrayList<UserProperty>();
userProperties.add(new UserProperty("test-name-1", "test-value-1"));
userProperties.add(new UserProperty("test-name-2", "test-value-2"));
publishPacketBuilder.withUserProperties(userProperties);

SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(testTopic, QOS.AT_LEAST_ONCE);

try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
client.start();
events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Subscribe to the topic first
client.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Publish the message with all fields
client.publish(publishPacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Wait for the publish to be received
publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Verify the received publish packet has the expected fields
PublishPacket receivedPacket = publishEvents.publishPacket;
assertNotNull("Received publish packet should not be null", receivedPacket);

// Verify topic
assertEquals("Topic should match", testTopic, receivedPacket.getTopic());

// Verify QoS
assertEquals("QoS should match", QOS.AT_LEAST_ONCE, receivedPacket.getQOS());

// Verify payload
assertTrue("Payload should match", java.util.Arrays.equals(testPayload, receivedPacket.getPayload()));

// Verify payload format indicator
assertEquals("Payload format should match",
PublishPacket.PayloadFormatIndicator.UTF8, receivedPacket.getPayloadFormat());

// Verify response topic
assertEquals("Response topic should match", testResponseTopic, receivedPacket.getResponseTopic());

// Verify correlation data
assertTrue("Correlation data should match",
java.util.Arrays.equals(testCorrelationData, receivedPacket.getCorrelationData()));

// Verify content type
assertEquals("Content type should match", testContentType, receivedPacket.getContentType());

// Verify user properties
assertNotNull("User properties should not be null", receivedPacket.getUserProperties());
assertEquals("User properties count should match", 2, receivedPacket.getUserProperties().size());

client.stop();
events.stopFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/* Publish with full packet - all fields populated */
@Test
public void Op_PublishWithFullPacket() throws Exception {
skipIfNetworkUnavailable();
Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT,
AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);

TestUtils.doRetryableTest(this::doPublishWithFullPacket, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES,
TEST_RETRY_SLEEP_MILLIS);

CrtResource.waitForNoResources();
}

/**
* ============================================================
* Error Operation Tests
Expand Down
Loading