diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc index a3de5dc58..af96fab57 100644 --- a/docs/src/main/asciidoc/_configprops.adoc +++ b/docs/src/main/asciidoc/_configprops.adoc @@ -94,6 +94,7 @@ |spring.cloud.aws.sns.endpoint | | Overrides the default endpoint. |spring.cloud.aws.sns.region | | Overrides the default region. |spring.cloud.aws.sqs.dualstack-enabled | | Configure whether the AWS client should use the AWS dualstack endpoint. Note that not each AWS service supports dual-stack. For complete list check AWS services that support IPv6 +|spring.cloud.aws.sqs.convert-message-id-to-uuid | `+++true+++` | Whether to convert SQS message IDs to UUIDs. Set to `false` for SQS-compatible providers that return non-UUID message IDs. |spring.cloud.aws.sqs.enabled | `+++true+++` | Enables SQS integration. |spring.cloud.aws.sqs.endpoint | | Overrides the default endpoint. |spring.cloud.aws.sqs.listener.auto-startup | | Defines whether SQS listeners will start automatically or not. diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index bf120b162..6dbba8cd7 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -319,6 +319,28 @@ If `SendBatchFailureStrategy#DO_NOT_THROW` is configured, no exception is thrown For convenience, the `additionalInformation` parameters can be found as constants in the `SqsTemplateParameters` class. +===== Non-UUID Message IDs + +By default, Spring Cloud AWS SQS expects the message ID returned by SQS to be a valid UUID. +If a non-UUID message ID is received, an error is thrown with instructions to enable non-UUID support. + +The raw provider message ID is always stored in the `Sqs_RawMessageId` header, regardless of whether UUID conversion is enabled or disabled. +Access the raw ID via `MessageHeaderUtils.getRawMessageId(message)`. + +To enable non-UUID message ID support (e.g., for Yandex Message Queue or other SQS-compatible providers): + +[source,properties] +---- +spring.cloud.aws.sqs.convert-message-id-to-uuid=false +---- + +When disabled: + +* **Receive side**: A deterministic UUID derived from the raw ID is used as the Spring `MessageHeaders.ID`. +* **Send side**: If the send response contains a non-UUID message ID, + `SendResult.messageId()` returns a deterministic UUID and the raw ID is available + in the `SendResult.message()` headers. + [[template-message-conversion]] ==== Template Message Conversion diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index c6c89301e..6a0adbbe9 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -64,6 +64,7 @@ * @author Maciej Walkowiak * @author Wei Jiang * @author Dongha Kim + * @author Jeongmin Kim * @since 3.0 */ @AutoConfiguration @@ -111,6 +112,7 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, if (sqsProperties.getQueueNotFoundStrategy() != null) { builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy())); } + builder.configure(options -> options.convertMessageIdToUuid(sqsProperties.getConvertMessageIdToUuid())); return builder.build(); } @@ -152,6 +154,7 @@ private void configureProperties(SqsContainerOptionsBuilder options) { mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout); mapper.from(this.sqsProperties.getListener().getMaxDelayBetweenPolls()).to(options::maxDelayBetweenPolls); mapper.from(this.sqsProperties.getListener().getAutoStartup()).to(options::autoStartup); + mapper.from(this.sqsProperties.getConvertMessageIdToUuid()).to(options::convertMessageIdToUuid); } @ConditionalOnClass(name = "tools.jackson.databind.json.JsonMapper") @@ -160,8 +163,8 @@ static class SqsJacksonConfiguration { @ConditionalOnMissingBean @Bean public MessagingMessageConverter messageConverter(ObjectProvider jsonMapperProvider) { - JsonMapper jsonMapper = jsonMapperProvider.getIfAvailable(); - return jsonMapper != null ? new SqsMessagingMessageConverter(jsonMapper) + return jsonMapperProvider.getIfAvailable() != null + ? new SqsMessagingMessageConverter(jsonMapperProvider.getIfAvailable()) : new SqsMessagingMessageConverter(); } diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java index 509768d2c..fb37f35bb 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java @@ -26,6 +26,7 @@ * * @author Tomaz Fernandes * @author Wei Jiang + * @author Jeongmin Kim * @since 3.0 */ @ConfigurationProperties(prefix = SqsProperties.PREFIX) @@ -51,6 +52,20 @@ public void setListener(Listener listener) { private Boolean observationEnabled = false; + /** + * Whether to convert SQS message IDs to UUIDs. Set to {@code false} for SQS-compatible providers that return + * non-UUID message IDs. + */ + private Boolean convertMessageIdToUuid = true; + + public Boolean getConvertMessageIdToUuid() { + return convertMessageIdToUuid; + } + + public void setConvertMessageIdToUuid(Boolean convertMessageIdToUuid) { + this.convertMessageIdToUuid = convertMessageIdToUuid; + } + /** * Return the strategy to use if the queue is not found. * @return the {@link QueueNotFoundStrategy} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/MessageHeaderUtils.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/MessageHeaderUtils.java index 5ee1e238c..6f167157c 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/MessageHeaderUtils.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/MessageHeaderUtils.java @@ -15,6 +15,7 @@ */ package io.awspring.cloud.sqs; +import io.awspring.cloud.sqs.listener.SqsHeaders; import io.awspring.cloud.sqs.support.converter.MessagingMessageHeaders; import java.util.Collection; import java.util.Map; @@ -30,6 +31,7 @@ * Utility class for extracting {@link MessageHeaders} from a {@link Message}. * * @author Tomaz Fernandes + * @author Jeongmin Kim * @since 3.0 */ public class MessageHeaderUtils { @@ -150,4 +152,22 @@ public static Message removeHeaderIfPresent(Message message, String ke return new GenericMessage<>(message.getPayload(), newHeaders); } + /** + * Return the raw provider message ID, falling back to Spring message ID if not present. + * @param message the message. + * @return the raw provider ID or Spring ID. + */ + public static String getRawMessageId(Message message) { + String rawMessageId = message.getHeaders().get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER, String.class); + return rawMessageId != null ? rawMessageId : getId(message); + } + + /** + * Return the messages' raw provider IDs as a concatenated {@link String}. + * @param messages the messages. + * @return the raw provider IDs. + */ + public static String getRawMessageId(Collection> messages) { + return messages.stream().map(MessageHeaderUtils::getRawMessageId).collect(Collectors.joining("; ")); + } } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java index 25c9b64c8..3a923e202 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsContainerOptions.java @@ -32,6 +32,7 @@ * Sqs specific implementation of {@link ContainerOptions}. * * @author Tomaz Fernandes + * @author Jeongmin Kim * @since 3.0 */ public class SqsContainerOptions extends AbstractContainerOptions { @@ -49,6 +50,8 @@ public class SqsContainerOptions extends AbstractContainerOptions implements MessageSource { @@ -133,4 +134,8 @@ protected MessageConversionContext getMessageConversionContext() { return this.messageConversionContext; } + protected MessagingMessageConverter getMessagingMessageConverter() { + return this.messagingMessageConverter; + } + } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java index 7267b259d..7541262f1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java @@ -28,6 +28,7 @@ import io.awspring.cloud.sqs.listener.acknowledgement.SqsAcknowledgementExecutor; import io.awspring.cloud.sqs.support.converter.MessageConversionContext; import io.awspring.cloud.sqs.support.converter.SqsMessageConversionContext; +import io.awspring.cloud.sqs.support.converter.SqsMessageIdResolver; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,6 +64,7 @@ * @param the {@link Message} payload type. * * @author Tomaz Fernandes + * @author Jeongmin Kim * @since 3.0 */ public abstract class AbstractSqsMessageSource extends AbstractPollingMessageSource @@ -109,6 +111,8 @@ protected void doConfigure(ContainerOptions containerOptions) { this.messageVisibility = sqsContainerOptions.getMessageVisibility() != null ? (int) sqsContainerOptions.getMessageVisibility().getSeconds() : MESSAGE_VISIBILITY_DISABLED; + SqsMessageIdResolver.configureMessageIdResolution(getMessagingMessageConverter(), + sqsContainerOptions.getConvertMessageIdToUuid()); } @Override diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java index dd577996c..5816fd3a1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java @@ -30,6 +30,7 @@ import io.awspring.cloud.sqs.support.converter.MessageConversionContext; import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessageConversionContext; +import io.awspring.cloud.sqs.support.converter.SqsMessageIdResolver; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.legacy.LegacyJackson2SqsMessagingMessageConverter; import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation; @@ -49,6 +50,7 @@ import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import software.amazon.awssdk.services.sqs.SqsAsyncClient; @@ -77,6 +79,7 @@ * @author Tomaz Fernandes * @author Zhong Xi Lu * @author Hyunggeol Lee + * @author Jeongmin Kim * * @since 3.0 */ @@ -102,6 +105,8 @@ public class SqsTemplate extends AbstractMessagingTemplate implements S private final TemplateContentBasedDeduplication contentBasedDeduplication; + private final boolean convertMessageIdToUuid; + private SqsTemplate(SqsTemplateBuilderImpl builder) { super(builder.messageConverter, builder.options, SQS_OBSERVATION_SPECIFICS); SqsTemplateOptionsImpl options = builder.options; @@ -111,6 +116,8 @@ private SqsTemplate(SqsTemplateBuilderImpl builder) { this.queueNotFoundStrategy = options.queueNotFoundStrategy; this.messageSystemAttributeNames = options.messageSystemAttributeNames; this.contentBasedDeduplication = options.contentBasedDeduplication; + this.convertMessageIdToUuid = options.convertMessageIdToUuid; + SqsMessageIdResolver.configureMessageIdResolution(builder.messageConverter, options.convertMessageIdToUuid); } /** @@ -327,16 +334,23 @@ private CompletableFuture handleAutoDeduplication(String endpointName) protected CompletableFuture> doSendAsync(String endpointName, Message message, org.springframework.messaging.Message originalMessage) { return createSendMessageRequest(endpointName, message).thenCompose(this.sqsAsyncClient::sendMessage) - .thenApply(response -> createSendResult(UUID.fromString(response.messageId()), - response.sequenceNumber(), endpointName, originalMessage)); + .thenApply(response -> createSendResult(response.messageId(), response.sequenceNumber(), endpointName, + originalMessage)); } - private SendResult createSendResult(UUID messageId, @Nullable String sequenceNumber, String endpointName, - org.springframework.messaging.Message originalMessage) { - return new SendResult<>(messageId, endpointName, originalMessage, - sequenceNumber != null - ? Collections.singletonMap(SqsTemplateParameters.SEQUENCE_NUMBER_PARAMETER_NAME, sequenceNumber) - : Collections.emptyMap()); + private SendResult createSendResult(String rawMessageId, @Nullable String sequenceNumber, + String endpointName, org.springframework.messaging.Message originalMessage) { + MessageHeaders resolvedHeaders = SqsMessageIdResolver.resolveAndAddMessageId(rawMessageId, + originalMessage.getHeaders(), this.convertMessageIdToUuid); + UUID messageId = resolvedHeaders.getId(); + org.springframework.messaging.Message messageWithHeaders = MessageBuilder.createMessage( + originalMessage.getPayload(), resolvedHeaders); + Map additionalInfo = new HashMap<>(); + if (sequenceNumber != null) { + additionalInfo.put(SqsTemplateParameters.SEQUENCE_NUMBER_PARAMETER_NAME, sequenceNumber); + } + return new SendResult<>(messageId, endpointName, messageWithHeaders, + additionalInfo.isEmpty() ? Collections.emptyMap() : additionalInfo); } private CompletableFuture createSendMessageRequest(String endpointName, Message message) { @@ -358,8 +372,8 @@ protected CompletableFuture> doSendBatchAsync(String end Collection messages, Collection> originalMessages) { logger.debug("Sending messages {} to endpoint {}", messages, endpointName); return createSendMessageBatchRequest(endpointName, messages).thenCompose(this.sqsAsyncClient::sendMessageBatch) - .thenApply(response -> createSendResultBatch(response, endpointName, - originalMessages.stream().collect(Collectors.toMap(MessageHeaderUtils::getId, msg -> msg)))); + .thenApply(response -> createSendResultBatch(response, endpointName, originalMessages.stream() + .collect(Collectors.toMap(MessageHeaderUtils::getRawMessageId, msg -> msg)))); } private SendResult.Batch createSendResultBatch(SendMessageBatchResponse response, String endpointName, @@ -379,10 +393,8 @@ private Collection> createSendResultFailed(SendMessageB private Collection> doCreateSendResultBatch(SendMessageBatchResponse response, String endpointName, Map> originalMessagesById) { - return response - .successful().stream().map(entry -> createSendResult(UUID.fromString(entry.messageId()), - entry.sequenceNumber(), endpointName, getOriginalMessage(originalMessagesById, entry))) - .toList(); + return response.successful().stream().map(entry -> createSendResult(entry.messageId(), entry.sequenceNumber(), + endpointName, getOriginalMessage(originalMessagesById, entry))).toList(); } private org.springframework.messaging.Message getOriginalMessage( @@ -540,7 +552,7 @@ private Map addMissingFifoReceiveHeaders(Map hea private CompletableFuture deleteMessages(String endpointName, Collection> messages) { logger.trace("Acknowledging in queue {} messages {}", endpointName, - MessageHeaderUtils.getId(addTypeToMessages(messages))); + MessageHeaderUtils.getRawMessageId(addTypeToMessages(messages))); return getQueueAttributes(endpointName) .thenCompose(attributes -> this.sqsAsyncClient.deleteMessageBatch(DeleteMessageBatchRequest.builder() .queueUrl(attributes.getQueueUrl()).entries(createDeleteMessageEntries(messages)).build())) @@ -559,7 +571,8 @@ private Collection> getFailedAckMessage DeleteMessageBatchResponse response, Collection> messages, String endpointName) { return response.failed().stream().map(BatchResultErrorEntry::id) - .map(id -> messages.stream().filter(msg -> MessageHeaderUtils.getId(msg).equals(id)).findFirst() + .map(id -> messages.stream().filter(msg -> MessageHeaderUtils.getRawMessageId(msg).equals(id)) + .findFirst() .orElseThrow(() -> new SqsAcknowledgementException( "Could not correlate ids for acknowledgement failure", Collections.emptyList(), messages, endpointName))) @@ -570,7 +583,8 @@ private Collection> getSuccessfulAckMes DeleteMessageBatchResponse response, Collection> messages, String endpointName) { return response.successful().stream().map(DeleteMessageBatchResultEntry::id) - .map(id -> messages.stream().filter(msg -> MessageHeaderUtils.getId(msg).equals(id)).findFirst() + .map(id -> messages.stream().filter(msg -> MessageHeaderUtils.getRawMessageId(msg).equals(id)) + .findFirst() .orElseThrow(() -> new SqsAcknowledgementException( "Could not correlate ids for acknowledgement failure", Collections.emptyList(), messages, endpointName))) @@ -588,7 +602,7 @@ private void logAcknowledgement(String endpointName, Collection createDeleteMessageEntries( Collection> messages) { return messages.stream() - .map(message -> DeleteMessageBatchRequestEntry.builder().id(MessageHeaderUtils.getId(message)) + .map(message -> DeleteMessageBatchRequestEntry.builder().id(MessageHeaderUtils.getRawMessageId(message)) .receiptHandle( MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER)) .build()) @@ -661,6 +675,8 @@ private static class SqsTemplateOptionsImpl extends AbstractMessagingTemplateOpt private TemplateContentBasedDeduplication contentBasedDeduplication = TemplateContentBasedDeduplication.AUTO; + private boolean convertMessageIdToUuid = true; + @Override public SqsTemplateOptions queueAttributeNames(Collection queueAttributeNames) { Assert.notEmpty(queueAttributeNames, "queueAttributeNames cannot be null or empty"); @@ -709,6 +725,12 @@ public SqsTemplateOptions observationConvention(SqsTemplateObservation.Conventio return this; } + @Override + public SqsTemplateOptions convertMessageIdToUuid(boolean convertMessageIdToUuid) { + this.convertMessageIdToUuid = convertMessageIdToUuid; + return this; + } + } private static class SqsTemplateBuilderImpl implements SqsTemplateBuilder { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplateOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplateOptions.java index 51cc84606..3ae4d1602 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplateOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplateOptions.java @@ -24,6 +24,9 @@ /** * Sqs specific options for the {@link SqsTemplate}. * + * @author Tomaz Fernandes + * @author Jeongmin Kim + * @since 3.0.0 */ public interface SqsTemplateOptions extends MessagingTemplateOptions { @@ -86,4 +89,12 @@ public interface SqsTemplateOptions extends MessagingTemplateOptions, String> paylo this.payloadTypeHeaderFunction = payloadTypeHeaderFunction; } + /** + * Configure the {@link HeaderMapper} used to convert headers for + * {@link software.amazon.awssdk.services.sqs.model.Message} instances. + * @param configurer the consumer to configure the header mapper. + */ + public void configureHeaderMapper(Consumer> configurer) { + Assert.notNull(configurer, "configurer cannot be null"); + configurer.accept(this.headerMapper); + } + /** * Set the {@link HeaderMapper} to used to convert headers for * {@link software.amazon.awssdk.services.sqs.model.Message} instances. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageHeaders.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageHeaders.java index 99ad15a0b..a03e85fab 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageHeaders.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageHeaders.java @@ -53,4 +53,5 @@ public MessagingMessageHeaders(@Nullable Map headers, @Nullable public MessagingMessageHeaders(@Nullable Map headers, @Nullable UUID id, @Nullable Long timestamp) { super(headers, id, timestamp); } + } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java index c9f454616..484439d6c 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.UUID; import java.util.function.BiFunction; import java.util.stream.Collectors; import org.jspecify.annotations.Nullable; @@ -51,6 +50,7 @@ * @author Tomaz Fernandes * @author Alain Sahli * @author Maciej Walkowiak + * @author Jeongmin Kim * * @since 3.0 * @see LegacyJackson2SqsMessagingMessageConverter @@ -62,12 +62,18 @@ public class SqsHeaderMapper implements ContextAwareHeaderMapper { private BiFunction additionalHeadersFunction = ((message, accessor) -> accessor.toMessageHeaders()); + private boolean convertMessageIdToUuid = true; + public void setAdditionalHeadersFunction( BiFunction headerFunction) { Assert.notNull(headerFunction, "headerFunction cannot be null"); this.additionalHeadersFunction = headerFunction; } + public void setConvertMessageIdToUuid(boolean convertMessageIdToUuid) { + this.convertMessageIdToUuid = convertMessageIdToUuid; + } + @Override public Message fromHeaders(MessageHeaders headers) { Message.Builder builder = Message.builder(); @@ -157,9 +163,11 @@ public MessageHeaders toHeaders(Message source) { accessor.copyHeadersIfAbsent(getMessageAttributesAsHeaders(source)); accessor.copyHeadersIfAbsent(createDefaultHeaders(source)); accessor.copyHeadersIfAbsent(createAdditionalHeaders(source)); + MessageHeaders messageHeaders = accessor.toMessageHeaders(); logger.trace("Mapped headers {} for message {}", messageHeaders, source.messageId()); - return new MessagingMessageHeaders(messageHeaders, UUID.fromString(source.messageId())); + return SqsMessageIdResolver.resolveAndAddMessageId(source.messageId(), messageHeaders, convertMessageIdToUuid); + } private MessageHeaders createAdditionalHeaders(Message source) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessageIdResolver.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessageIdResolver.java new file mode 100644 index 000000000..1de4ec885 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsMessageIdResolver.java @@ -0,0 +1,99 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.support.converter; + +import io.awspring.cloud.sqs.MessageHeaderUtils; +import io.awspring.cloud.sqs.listener.SqsHeaders; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.UUID; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import software.amazon.awssdk.services.sqs.model.Message; + +/** + * Utility class for resolving SQS message IDs. Consolidates UUID validation, conversion, and fail-fast logic used by + * both {@link SqsHeaderMapper} and {@link io.awspring.cloud.sqs.operations.SqsTemplate}. + * + * @author Jeongmin Kim + * @since 4.1.0 + */ +public final class SqsMessageIdResolver { + + private SqsMessageIdResolver() { + } + + /** + * Resolve the message ID and add it to the provided headers. The raw message ID is always stored in the + * {@link SqsHeaders#SQS_RAW_MESSAGE_ID_HEADER} header. + * + *

+ * If the message ID is a valid UUID, it is used directly. If not, and {@code convertMessageIdToUuid} is + * {@code true}, a {@link MessagingException} is thrown with instructions to disable UUID conversion. If + * {@code convertMessageIdToUuid} is {@code false}, a deterministic UUID is generated from the raw message ID. + * @param messageId the raw message ID from SQS. + * @param headers the existing message headers. + * @param convertMessageIdToUuid whether to enforce UUID message IDs. + * @return the resolved {@link MessageHeaders} with the message ID set. + * @throws MessagingException if the message ID is not a valid UUID and conversion is enabled. + */ + public static MessageHeaders resolveAndAddMessageId(String messageId, MessageHeaders headers, + boolean convertMessageIdToUuid) { + MessageHeaders withRawId = MessageHeaderUtils.addHeaderIfAbsent(headers, SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER, + messageId); + Optional uuid = tryParseUuid(messageId); + if (uuid.isPresent()) { + return new MessagingMessageHeaders(withRawId, uuid.get()); + } + if (convertMessageIdToUuid) { + throw new MessagingException(String.format( + "Message ID '%s' is not a valid UUID. To support non-UUID message IDs, " + + "set 'spring.cloud.aws.sqs.convert-message-id-to-uuid=false'. " + + "The raw message ID will be available via the '%s' header.", + messageId, SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)); + } + return new MessagingMessageHeaders(withRawId, + UUID.nameUUIDFromBytes(messageId.getBytes(StandardCharsets.UTF_8))); + } + + /** + * Configure message ID resolution on the given converter. If the converter is an + * {@link AbstractMessagingMessageConverter} with a {@link SqsHeaderMapper}, sets the {@code convertMessageIdToUuid} + * flag on it. + * @param converter the messaging message converter. + * @param convertMessageIdToUuid whether to enforce UUID message IDs. + */ + public static void configureMessageIdResolution(MessagingMessageConverter converter, + boolean convertMessageIdToUuid) { + if (converter instanceof AbstractMessagingMessageConverter abstractConverter) { + abstractConverter.configureHeaderMapper(headerMapper -> { + if (headerMapper instanceof SqsHeaderMapper sqsHeaderMapper) { + sqsHeaderMapper.setConvertMessageIdToUuid(convertMessageIdToUuid); + } + }); + } + } + + private static Optional tryParseUuid(String value) { + try { + return Optional.of(UUID.fromString(value)); + } + catch (IllegalArgumentException e) { + return Optional.empty(); + } + } + +} \ No newline at end of file diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/observation/AbstractListenerObservation.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/observation/AbstractListenerObservation.java index 5230553cc..e46472833 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/observation/AbstractListenerObservation.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/observation/AbstractListenerObservation.java @@ -29,6 +29,7 @@ * Observation for Message Listener Containers. * * @author Tomaz Fernandes + * @author Jeongmin Kim * @since 3.4 */ public abstract class AbstractListenerObservation { @@ -234,7 +235,7 @@ protected Context(Message message, String sourceName) { super((carrier, key) -> carrier.getHeaders().get(key, String.class)); setCarrier(message); this.message = message; - this.messageId = MessageHeaderUtils.getId(message); + this.messageId = MessageHeaderUtils.getRawMessageId(message); this.sourceName = sourceName; } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/MessageHeaderUtilsTest.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/MessageHeaderUtilsTest.java index e156e9ec8..784bbf301 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/MessageHeaderUtilsTest.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/MessageHeaderUtilsTest.java @@ -17,6 +17,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.awspring.cloud.sqs.listener.SqsHeaders; +import java.util.Collection; +import java.util.List; import org.junit.jupiter.api.Test; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -25,6 +28,7 @@ * Tests for {@link MessageHeaderUtils}. * * @author Tomaz Fernandes + * @author Jeongmin Kim */ class MessageHeaderUtilsTest { @@ -93,4 +97,51 @@ void shouldPreserveOtherHeaders() { assertThat(result.getHeaders().get("another-header")).isEqualTo("another-value"); assertThat(result.getHeaders().size()).isEqualTo(message.getHeaders().size() - 1); } + + @Test + void shouldReturnRawMessageIdWhenHeaderPresent() { + // given + String rawMessageId = "92898073-7bd6a160-5797b060-54a7e539"; + Message message = MessageBuilder.withPayload("test-payload") + .setHeader(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER, rawMessageId).build(); + + // when + String result = MessageHeaderUtils.getRawMessageId(message); + + // then + assertThat(result).isEqualTo(rawMessageId); + } + + @Test + void shouldFallbackToSpringMessageIdWhenRawHeaderNotPresent() { + // given + Message message = MessageBuilder.withPayload("test-payload").build(); + String expectedId = message.getHeaders().getId().toString(); + + // when + String result = MessageHeaderUtils.getRawMessageId(message); + + // then + assertThat(result).isEqualTo(expectedId); + } + + @Test + void shouldConcatenateRawMessageIdsFromCollection() { + // given + String rawMessageId1 = "raw-id-1"; + String rawMessageId2 = "raw-id-2"; + + Message message1 = MessageBuilder.withPayload("payload1") + .setHeader(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER, rawMessageId1).build(); + Message message2 = MessageBuilder.withPayload("payload2") + .setHeader(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER, rawMessageId2).build(); + + Collection> messages = List.of(message1, message2); + + // when + String result = MessageHeaderUtils.getRawMessageId(messages); + + // then + assertThat(result).isEqualTo("raw-id-1; raw-id-2"); + } } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateFifoTracingIntegrationTest.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateFifoTracingIntegrationTest.java index f03b8f3ef..7fa78880c 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateFifoTracingIntegrationTest.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsTemplateFifoTracingIntegrationTest.java @@ -15,6 +15,8 @@ */ package io.awspring.cloud.sqs.integration; +import static org.assertj.core.api.Assertions.assertThat; + import io.awspring.cloud.sqs.operations.SqsTemplate; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; @@ -29,6 +31,10 @@ import io.micrometer.tracing.propagation.Propagator; import io.micrometer.tracing.test.simple.SimpleTraceContext; import io.micrometer.tracing.test.simple.SimpleTracer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,13 +47,6 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.assertj.core.api.Assertions.assertThat; - /** * Integration tests for trace context propagation in FIFO queues with SqsTemplate. *

@@ -76,8 +75,10 @@ public class SqsTemplateFifoTracingIntegrationTest extends BaseSqsIntegrationTes @BeforeAll static void beforeTests() { var client = createAsyncClient(); - createFifoQueue(client, FIFO_QUEUE_NAME, Map.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false")).join(); - createFifoQueue(client, FIFO_CACHE_HIT_QUEUE_NAME, Map.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true")).join(); + createFifoQueue(client, FIFO_QUEUE_NAME, Map.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false")) + .join(); + createFifoQueue(client, FIFO_CACHE_HIT_QUEUE_NAME, + Map.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true")).join(); } @@ -119,7 +120,8 @@ void sendAsync_toFifoQueue_shouldCreateObservationOnCallingThreadAfterCacheHit() sqsTemplate.sendAsync(FIFO_CACHE_HIT_QUEUE_NAME, warmupPayload).join(); // Drain the warmup message - sqsTemplate.receive(from -> from.queue(FIFO_CACHE_HIT_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), TestEvent.class); + sqsTemplate.receive(from -> from.queue(FIFO_CACHE_HIT_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), + TestEvent.class); // Given - Start a NEW observation for the actual test var observation = Observation.start("test-send-second", observationRegistry); @@ -138,7 +140,8 @@ void sendAsync_toFifoQueue_shouldCreateObservationOnCallingThreadAfterCacheHit() logger.info("expectedTraceId={}", expectedTraceId); var receivedMessage = sqsTemplate - .receive(from -> from.queue(FIFO_CACHE_HIT_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), TestEvent.class) + .receive(from -> from.queue(FIFO_CACHE_HIT_QUEUE_NAME).pollTimeout(Duration.ofSeconds(5)), + TestEvent.class) .orElseThrow(() -> new AssertionError("Expected message was not received")); assertThat(receivedMessage.getPayload()).isEqualTo(payload); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java index a31d65fe1..144b07e3b 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java @@ -1192,6 +1192,57 @@ void shouldReceiveBatchFifo() { } + @Test + void shouldHandleNonUuidMessageIdInSendResponse() { + String queue = "test-queue"; + GetQueueUrlResponse urlResponse = GetQueueUrlResponse.builder().queueUrl(queue).build(); + given(mockClient.getQueueUrl(any(GetQueueUrlRequest.class))) + .willReturn(CompletableFuture.completedFuture(urlResponse)); + mockQueueAttributes(mockClient, Map.of()); + String nonUuidMessageId = "92898073-7bd6a160-5797b060-54a7e539"; + SendMessageResponse response = SendMessageResponse.builder().messageId(nonUuidMessageId).build(); + given(mockClient.sendMessage(any(SendMessageRequest.class))) + .willReturn(CompletableFuture.completedFuture(response)); + SqsOperations template = SqsTemplate.builder().sqsAsyncClient(mockClient) + .configure(options -> options.convertMessageIdToUuid(false)).build(); + String payload = "test-payload"; + SendResult result = template.send(to -> to.queue(queue).payload(payload)); + assertThat(result.messageId()) + .isEqualTo(UUID.nameUUIDFromBytes(nonUuidMessageId.getBytes(java.nio.charset.StandardCharsets.UTF_8))); + assertThat(result.message().getHeaders().get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)).isEqualTo(nonUuidMessageId); + } + + @Test + void shouldHandleNonUuidMessageIdInBatchSendResponse() { + String queue = "test-queue"; + String payload1 = "test-payload-1"; + String payload2 = "test-payload-2"; + Message message1 = MessageBuilder.withPayload(payload1).build(); + Message message2 = MessageBuilder.withPayload(payload2).build(); + List> messages = List.of(message1, message2); + + GetQueueUrlResponse urlResponse = GetQueueUrlResponse.builder().queueUrl(queue).build(); + given(mockClient.getQueueUrl(any(GetQueueUrlRequest.class))) + .willReturn(CompletableFuture.completedFuture(urlResponse)); + mockQueueAttributes(mockClient, Map.of()); + String nonUuidMessageId1 = "92898073-7bd6a160-5797b060-54a7e539"; + String nonUuidMessageId2 = "a2898073-8bd6a160-6797b060-64a7e539"; + SendMessageBatchResponse response = SendMessageBatchResponse.builder() + .successful( + builder -> builder.id(message1.getHeaders().getId().toString()).messageId(nonUuidMessageId1), + builder -> builder.id(message2.getHeaders().getId().toString()).messageId(nonUuidMessageId2)) + .build(); + given(mockClient.sendMessageBatch(any(SendMessageBatchRequest.class))) + .willReturn(CompletableFuture.completedFuture(response)); + SqsOperations template = SqsTemplate.builder().sqsAsyncClient(mockClient) + .configure(options -> options.convertMessageIdToUuid(false)).build(); + SendResult.Batch results = template.sendMany(queue, messages); + assertThat(results.successful()).hasSize(2); + results.successful().forEach(result -> { + assertThat(result.messageId()).isNotNull(); + }); + } + @Test void shouldPropagateTracingAsMessageSystemAttribute() { String queue = "test-queue"; diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java index ffbe0683a..05944615e 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java @@ -16,6 +16,7 @@ package io.awspring.cloud.sqs.support.converter; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.awspring.cloud.sqs.listener.SqsHeaders; import java.math.BigDecimal; @@ -27,6 +28,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; @@ -37,6 +39,7 @@ * * @author Tomaz Fernandes * @author Maciej Walkowiak + * @author Jeongmin Kim */ class SqsHeaderMapperTests { @@ -177,6 +180,39 @@ void createsMessageWithNumberHeader(String value, String type, Number expected) assertThat(headers.get(headerName)).isEqualTo(expected); } + @Test + void shouldConvertUuidMessageIdWhenConvertMessageIdToUuidIsTrue() { + SqsHeaderMapper mapper = new SqsHeaderMapper(); + mapper.setConvertMessageIdToUuid(true); + String uuidMessageId = "550e8400-e29b-41d4-a716-446655440000"; + Message message = Message.builder().body("payload").messageId(uuidMessageId).build(); + MessageHeaders headers = mapper.toHeaders(message); + assertThat(headers.getId()).isEqualTo(UUID.fromString(uuidMessageId)); + assertThat(headers.get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)).isEqualTo(uuidMessageId); + } + + @Test + void shouldThrowWhenConvertMessageIdToUuidIsTrueAndMessageIdIsNotValidUuid() { + SqsHeaderMapper mapper = new SqsHeaderMapper(); + mapper.setConvertMessageIdToUuid(true); + String nonUuidMessageId = "92898073-7bd6a160-5797b060-54a7e539"; + Message message = Message.builder().body("payload").messageId(nonUuidMessageId).build(); + assertThatThrownBy(() -> mapper.toHeaders(message)).isInstanceOf(MessagingException.class) + .hasMessageContaining("not a valid UUID").hasMessageContaining("convert-message-id-to-uuid"); + } + + @Test + void shouldStoreAwsMessageIdInHeaderWhenConvertMessageIdToUuidIsFalse() { + SqsHeaderMapper mapper = new SqsHeaderMapper(); + mapper.setConvertMessageIdToUuid(false); + String nonUuidMessageId = "92898073-7bd6a160-5797b060-54a7e539"; + Message message = Message.builder().body("payload").messageId(nonUuidMessageId).build(); + MessageHeaders headers = mapper.toHeaders(message); + assertThat(headers.get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)).isEqualTo(nonUuidMessageId); + assertThat(headers.getId()).isNotEqualTo(nonUuidMessageId); + assertThat(headers.getId()).isNotNull(); + } + private static Stream validArguments() { return Stream.of(Arguments.of("10", "Number", BigDecimal.valueOf(10)), Arguments.of("3", "Number.byte", (byte) 3), Arguments.of("3", "Number.Byte", (byte) 3), diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsMessageIdResolverTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsMessageIdResolverTests.java new file mode 100644 index 000000000..1265ab2c8 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsMessageIdResolverTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.support.converter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.awspring.cloud.sqs.listener.SqsHeaders; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.MessageHeaderAccessor; + +/** + * Tests for {@link SqsMessageIdResolver}. + * + * @author Jeongmin Kim + */ +class SqsMessageIdResolverTests { + + @Test + void shouldResolveAndAddMessageIdWithValidUuid() { + String uuidMessageId = "550e8400-e29b-41d4-a716-446655440000"; + MessageHeaders inputHeaders = new MessageHeaderAccessor().toMessageHeaders(); + MessageHeaders result = SqsMessageIdResolver.resolveAndAddMessageId(uuidMessageId, inputHeaders, true); + assertThat(result.getId()).isEqualTo(UUID.fromString(uuidMessageId)); + assertThat(result.get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)).isEqualTo(uuidMessageId); + } + + @Test + void shouldResolveAndAddMessageIdWithValidUuidWhenConvertMessageIdToUuidIsFalse() { + String uuidMessageId = "550e8400-e29b-41d4-a716-446655440000"; + MessageHeaders inputHeaders = new MessageHeaderAccessor().toMessageHeaders(); + MessageHeaders result = SqsMessageIdResolver.resolveAndAddMessageId(uuidMessageId, inputHeaders, false); + assertThat(result.getId()).isEqualTo(UUID.fromString(uuidMessageId)); + assertThat(result.get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)).isEqualTo(uuidMessageId); + } + + @Test + void shouldThrowWhenConvertMessageIdToUuidIsTrueAndIdIsNotValidUuid() { + String nonUuid = "92898073-7bd6a160-5797b060-54a7e539"; + MessageHeaders inputHeaders = new MessageHeaderAccessor().toMessageHeaders(); + assertThatThrownBy(() -> SqsMessageIdResolver.resolveAndAddMessageId(nonUuid, inputHeaders, true)) + .isInstanceOf(MessagingException.class).hasMessageContaining("not a valid UUID") + .hasMessageContaining("convert-message-id-to-uuid"); + } + + @Test + void shouldGenerateDeterministicUuidWhenConvertMessageIdToUuidIsFalse() { + String nonUuid = "92898073-7bd6a160-5797b060-54a7e539"; + MessageHeaders inputHeaders = new MessageHeaderAccessor().toMessageHeaders(); + MessageHeaders result = SqsMessageIdResolver.resolveAndAddMessageId(nonUuid, inputHeaders, false); + assertThat(result.getId()).isEqualTo(UUID.nameUUIDFromBytes(nonUuid.getBytes(StandardCharsets.UTF_8))); + assertThat(result.get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER)).isEqualTo(nonUuid); + // Verify deterministic + MessageHeaders result2 = SqsMessageIdResolver.resolveAndAddMessageId(nonUuid, + new MessageHeaderAccessor().toMessageHeaders(), false); + assertThat(result2.getId()).isEqualTo(result.getId()); + } + + @Test + void shouldPreserveExistingHeadersWhenResolvingMessageId() { + String uuidMessageId = UUID.randomUUID().toString(); + MessageHeaderAccessor accessor = new MessageHeaderAccessor(); + accessor.setHeader("customHeader", "customValue"); + MessageHeaders inputHeaders = accessor.toMessageHeaders(); + MessageHeaders result = SqsMessageIdResolver.resolveAndAddMessageId(uuidMessageId, inputHeaders, true); + assertThat(result.get("customHeader")).isEqualTo("customValue"); + } + +} \ No newline at end of file