Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
|spring.cloud.aws.sns.endpoint | | Overrides the default endpoint.
|spring.cloud.aws.sns.region | | Overrides the default region.
|spring.cloud.aws.sqs.dualstack-enabled | | Configure whether the AWS client should use the AWS dualstack endpoint. Note that not each AWS service supports dual-stack. For complete list check <a href="https://docs.aws.amazon.com/vpc/latest/userguide/aws-ipv6-support.html">AWS services that support IPv6</a>
|spring.cloud.aws.sqs.convert-message-id-to-uuid | `+++true+++` | Whether to convert SQS message IDs to UUIDs. Set to `false` for SQS-compatible providers that return non-UUID message IDs.
|spring.cloud.aws.sqs.enabled | `+++true+++` | Enables SQS integration.
|spring.cloud.aws.sqs.endpoint | | Overrides the default endpoint.
|spring.cloud.aws.sqs.listener.auto-startup | | Defines whether SQS listeners will start automatically or not.
Expand Down
21 changes: 21 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,27 @@ If `SendBatchFailureStrategy#DO_NOT_THROW` is configured, no exception is thrown

For convenience, the `additionalInformation` parameters can be found as constants in the `SqsTemplateParameters` class.

===== Non-UUID Message IDs

By default, Spring Cloud AWS SQS expects the message ID returned by SQS to be a valid UUID.
If a non-UUID message ID is received, an error is thrown with instructions to enable non-UUID support.

To enable non-UUID message ID support (e.g., for Yandex Message Queue or other SQS-compatible providers):

[source,properties]
----
spring.cloud.aws.sqs.convert-message-id-to-uuid=false
----

When disabled:

* **Receive side**: The raw provider message ID is stored in the `Sqs_RawMessageId` header.
A deterministic UUID derived from the raw ID is used as the Spring `MessageHeaders.ID`.
Access the raw ID via `MessageHeaderUtils.getRawMessageId(message)`.
* **Send side**: If the send response contains a non-UUID message ID,
`SendResult.messageId()` returns a deterministic UUID and the raw ID is available
in `SendResult.additionalInformation()` under the `rawMessageId` key.

[[template-message-conversion]]
==== Template Message Conversion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* @author Maciej Walkowiak
* @author Wei Jiang
* @author Dongha Kim
* @author Jeongmin Kim
* @since 3.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -111,6 +112,7 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient,
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
builder.configure(options -> options.convertMessageIdToUuid(sqsProperties.getConvertMessageIdToUuid()));
return builder.build();
}

Expand Down Expand Up @@ -152,6 +154,7 @@ private void configureProperties(SqsContainerOptionsBuilder options) {
mapper.from(this.sqsProperties.getListener().getPollTimeout()).to(options::pollTimeout);
mapper.from(this.sqsProperties.getListener().getMaxDelayBetweenPolls()).to(options::maxDelayBetweenPolls);
mapper.from(this.sqsProperties.getListener().getAutoStartup()).to(options::autoStartup);
mapper.from(this.sqsProperties.getConvertMessageIdToUuid()).to(options::convertMessageIdToUuid);
}

@ConditionalOnClass(name = "tools.jackson.databind.json.JsonMapper")
Expand All @@ -160,8 +163,8 @@ static class SqsJacksonConfiguration {
@ConditionalOnMissingBean
@Bean
public MessagingMessageConverter<Message> messageConverter(ObjectProvider<JsonMapper> jsonMapperProvider) {
JsonMapper jsonMapper = jsonMapperProvider.getIfAvailable();
return jsonMapper != null ? new SqsMessagingMessageConverter(jsonMapper)
return jsonMapperProvider.getIfAvailable() != null
? new SqsMessagingMessageConverter(jsonMapperProvider.getIfAvailable())
: new SqsMessagingMessageConverter();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*
* @author Tomaz Fernandes
* @author Wei Jiang
* @author Jeongmin Kim
* @since 3.0
*/
@ConfigurationProperties(prefix = SqsProperties.PREFIX)
Expand All @@ -51,6 +52,20 @@ public void setListener(Listener listener) {

private Boolean observationEnabled = false;

/**
* Whether to convert SQS message IDs to UUIDs. Set to {@code false} for SQS-compatible providers that return
* non-UUID message IDs.
*/
private Boolean convertMessageIdToUuid = true;

public Boolean getConvertMessageIdToUuid() {
return convertMessageIdToUuid;
}

public void setConvertMessageIdToUuid(Boolean convertMessageIdToUuid) {
this.convertMessageIdToUuid = convertMessageIdToUuid;
}

/**
* Return the strategy to use if the queue is not found.
* @return the {@link QueueNotFoundStrategy}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.awspring.cloud.sqs;

import io.awspring.cloud.sqs.listener.SqsHeaders;
import io.awspring.cloud.sqs.support.converter.MessagingMessageHeaders;
import java.util.Collection;
import java.util.Map;
Expand All @@ -30,6 +31,7 @@
* Utility class for extracting {@link MessageHeaders} from a {@link Message}.
*
* @author Tomaz Fernandes
* @author Jeongmin Kim
* @since 3.0
*/
public class MessageHeaderUtils {
Expand Down Expand Up @@ -150,4 +152,22 @@ public static <T> Message<T> removeHeaderIfPresent(Message<T> message, String ke
return new GenericMessage<>(message.getPayload(), newHeaders);
}

/**
* Return the raw provider message ID, falling back to Spring message ID if not present.
* @param message the message.
* @return the raw provider ID or Spring ID.
*/
public static String getRawMessageId(Message<?> message) {
String rawMessageId = message.getHeaders().get(SqsHeaders.SQS_RAW_MESSAGE_ID_HEADER, String.class);
return rawMessageId != null ? rawMessageId : getId(message);
}

/**
* Return the messages' raw provider IDs as a concatenated {@link String}.
* @param messages the messages.
* @return the raw provider IDs.
*/
public static <T> String getRawMessageId(Collection<Message<T>> messages) {
return messages.stream().map(MessageHeaderUtils::getRawMessageId).collect(Collectors.joining("; "));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* Sqs specific implementation of {@link ContainerOptions}.
*
* @author Tomaz Fernandes
* @author Jeongmin Kim
* @since 3.0
*/
public class SqsContainerOptions extends AbstractContainerOptions<SqsContainerOptions, SqsContainerOptionsBuilder> {
Expand All @@ -49,6 +50,8 @@ public class SqsContainerOptions extends AbstractContainerOptions<SqsContainerOp

private final QueueNotFoundStrategy queueNotFoundStrategy;

private final boolean convertMessageIdToUuid;

/**
* Create a {@link ContainerOptions} instance from the builder.
* @param builder the builder.
Expand All @@ -61,6 +64,7 @@ private SqsContainerOptions(BuilderImpl builder) {
this.messageVisibility = builder.messageVisibility;
this.queueNotFoundStrategy = builder.queueNotFoundStrategy;
this.fifoBatchGroupingStrategy = builder.fifoBatchGroupingStrategy;
this.convertMessageIdToUuid = builder.convertMessageIdToUuid;
}

/**
Expand Down Expand Up @@ -121,6 +125,14 @@ public QueueNotFoundStrategy getQueueNotFoundStrategy() {
return this.queueNotFoundStrategy;
}

/**
* Get whether to convert SQS message IDs to UUIDs.
* @return whether to convert message IDs to UUIDs.
*/
public boolean getConvertMessageIdToUuid() {
return this.convertMessageIdToUuid;
}

@Override
public SqsContainerOptionsBuilder toBuilder() {
return new BuilderImpl(this);
Expand Down Expand Up @@ -153,6 +165,8 @@ protected static class BuilderImpl
@Nullable
private Duration messageVisibility;

private boolean convertMessageIdToUuid = true;

protected BuilderImpl() {
super();
}
Expand All @@ -165,6 +179,7 @@ protected BuilderImpl(SqsContainerOptions options) {
this.messageVisibility = options.messageVisibility;
this.fifoBatchGroupingStrategy = options.fifoBatchGroupingStrategy;
this.queueNotFoundStrategy = options.queueNotFoundStrategy;
this.convertMessageIdToUuid = options.convertMessageIdToUuid;
}

@Override
Expand Down Expand Up @@ -220,6 +235,12 @@ public SqsContainerOptionsBuilder observationConvention(
return this;
}

@Override
public SqsContainerOptionsBuilder convertMessageIdToUuid(boolean convertMessageIdToUuid) {
this.convertMessageIdToUuid = convertMessageIdToUuid;
return this;
}

@Override
public SqsContainerOptions build() {
return new SqsContainerOptions(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
/**
* {@link ContainerOptionsBuilder} specialization for SQS specific options.
* @author Tomaz Fernandes
* @author Jeongmin Kim
* @since 3.0
*/
public interface SqsContainerOptionsBuilder
Expand Down Expand Up @@ -82,4 +83,12 @@ SqsContainerOptionsBuilder messageSystemAttributeNames(
*/
SqsContainerOptionsBuilder observationConvention(SqsListenerObservation.Convention observationConvention);

/**
* Set whether to convert SQS message IDs to UUIDs. Set to {@code false} for SQS-compatible providers that return
* non-UUID message IDs.
* @param convertMessageIdToUuid whether to convert message IDs to UUIDs.
* @return this instance.
*/
SqsContainerOptionsBuilder convertMessageIdToUuid(boolean convertMessageIdToUuid);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*
* @author Tomaz Fernandes
* @author Artem Bilan
* @author Jeongmin Kim
*
* @since 3.0
*
Expand Down Expand Up @@ -88,6 +89,11 @@ private SqsHeaders() {
*/
public static final String SQS_DEFAULT_TYPE_HEADER = "JavaType";

/**
* Header for the raw provider message ID when not using UUID conversion.
*/
public static final String SQS_RAW_MESSAGE_ID_HEADER = SQS_HEADER_PREFIX + "RawMessageId";

public static class MessageSystemAttributes {

private MessageSystemAttributes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* method/.
*
* @author Tomaz Fernandes
* @author Jeongmin Kim
* @since 3.0
*/
public abstract class AbstractMessageConvertingMessageSource<T, S> implements MessageSource<T> {
Expand Down Expand Up @@ -133,4 +134,8 @@ protected MessageConversionContext getMessageConversionContext() {
return this.messageConversionContext;
}

protected MessagingMessageConverter<S> getMessagingMessageConverter() {
return this.messagingMessageConverter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.awspring.cloud.sqs.listener.acknowledgement.SqsAcknowledgementExecutor;
import io.awspring.cloud.sqs.support.converter.MessageConversionContext;
import io.awspring.cloud.sqs.support.converter.SqsMessageConversionContext;
import io.awspring.cloud.sqs.support.converter.SqsMessageIdResolver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -63,6 +64,7 @@
* @param <T> the {@link Message} payload type.
*
* @author Tomaz Fernandes
* @author Jeongmin Kim
* @since 3.0
*/
public abstract class AbstractSqsMessageSource<T> extends AbstractPollingMessageSource<T, Message>
Expand Down Expand Up @@ -109,6 +111,8 @@ protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
this.messageVisibility = sqsContainerOptions.getMessageVisibility() != null
? (int) sqsContainerOptions.getMessageVisibility().getSeconds()
: MESSAGE_VISIBILITY_DISABLED;
SqsMessageIdResolver.configureMessageIdResolution(getMessagingMessageConverter(),
sqsContainerOptions.getConvertMessageIdToUuid());
}

@Override
Expand Down
Loading