From ac7a4f29135f11dab922d0e1aa26d1fe735ea02b Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Thu, 5 Mar 2026 10:40:50 +0100 Subject: [PATCH 1/3] add rabbitmq consumer autoconfiguration for consumeRun queues priority Signed-off-by: LE SAULNIER Kevin --- pom.xml | 6 +++ .../RabbitConsumerAutoConfiguration.java | 48 +++++++++++++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 3 +- 3 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java diff --git a/pom.xml b/pom.xml index b8c8de9..cebad49 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,12 @@ spring-cloud-aws-starter-s3 + + + org.springframework.boot + spring-boot-starter-amqp + + org.aspectj diff --git a/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java b/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java new file mode 100644 index 0000000..93c659c --- /dev/null +++ b/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java @@ -0,0 +1,48 @@ +package org.gridsuite.computation.rabbitmq; + +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.stream.config.BindingServiceProperties; +import org.springframework.cloud.stream.config.ListenerContainerCustomizer; +import org.springframework.context.annotation.Bean; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +@AutoConfiguration +@ConditionalOnProperty( + name = "computation.rabbit.consume-run-load-balanced.enabled", + havingValue = "true", + matchIfMissing = true +) +public class RabbitConsumerAutoConfiguration { + /* + * RabbitMQ consumer priority: + * https://www.rabbitmq.com/docs/consumer-priority + * + * Each container creates exactly one AMQP consumer with prefetch=1 and its own priority. + * When dispatching messages, RabbitMQ always selects the highest-priority consumer + * that is available. + */ + @Bean + public ListenerContainerCustomizer customizer(BindingServiceProperties bindingServiceProperties) { + String computationRunGroup = bindingServiceProperties + .getBindings() + .get("consumeRun1-in-0") + .getGroup(); + + /* + * Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java + * We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities + */ + AtomicInteger index = new AtomicInteger(); + return (container, destination, group) -> { + if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, computationRunGroup)) { + smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement())); + } + }; + } +} diff --git a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 00f723d..42b361f 100644 --- a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1,3 @@ # AutoConfigureCache auto-configuration imports -org.gridsuite.computation.s3.S3AutoConfiguration \ No newline at end of file +org.gridsuite.computation.s3.S3AutoConfiguration +org.gridsuite.computation.rabbitmq.RabbitConsumerAutoConfiguration \ No newline at end of file From ddb923b1a21bef889c8b5e9e8f822d48aae06493 Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Wed, 11 Mar 2026 11:04:22 +0100 Subject: [PATCH 2/3] fix NPE + unit tests coverage Signed-off-by: LE SAULNIER Kevin --- .../RabbitConsumerAutoConfiguration.java | 16 ++- .../RabbitConsumerAutoConfigurationTest.java | 98 +++++++++++++++++++ 2 files changed, 109 insertions(+), 5 deletions(-) create mode 100644 src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java diff --git a/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java b/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java index 93c659c..0dc4505 100644 --- a/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java +++ b/src/main/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfiguration.java @@ -4,12 +4,14 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.context.annotation.Bean; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @AutoConfiguration @@ -19,6 +21,8 @@ matchIfMissing = true ) public class RabbitConsumerAutoConfiguration { + private static final String RABBITMQ_CONSUMER_NAME_TO_LOAD_BALANCE = "consumeRun1-in-0"; + /* * RabbitMQ consumer priority: * https://www.rabbitmq.com/docs/consumer-priority @@ -29,10 +33,10 @@ public class RabbitConsumerAutoConfiguration { */ @Bean public ListenerContainerCustomizer customizer(BindingServiceProperties bindingServiceProperties) { - String computationRunGroup = bindingServiceProperties - .getBindings() - .get("consumeRun1-in-0") - .getGroup(); + String computationRunGroup = Optional.ofNullable(bindingServiceProperties.getBindings()) + .map(bindings -> bindings.get(RABBITMQ_CONSUMER_NAME_TO_LOAD_BALANCE)) + .map(BindingProperties::getGroup) + .orElse(null); /* * Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java @@ -40,7 +44,9 @@ public ListenerContainerCustomizer customizer(BindingS */ AtomicInteger index = new AtomicInteger(); return (container, destination, group) -> { - if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, computationRunGroup)) { + if (container instanceof SimpleMessageListenerContainer smlc + && computationRunGroup != null + && Objects.equals(group, computationRunGroup)) { smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement())); } }; diff --git a/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java b/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java new file mode 100644 index 0000000..2c4c643 --- /dev/null +++ b/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java @@ -0,0 +1,98 @@ +package org.gridsuite.computation.rabbitmq; + +import org.junit.jupiter.api.Test; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.cloud.stream.config.BindingProperties; +import org.springframework.cloud.stream.config.BindingServiceProperties; +import org.springframework.cloud.stream.config.ListenerContainerCustomizer; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class RabbitConsumerAutoConfigurationTest { + + @Test + void configureXPriorityForConsumerName() { + RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration(); + BindingServiceProperties props = new BindingServiceProperties(); + + BindingProperties binding = new BindingProperties(); + binding.setGroup("testGroup"); + Map bindings = new HashMap<>(); + bindings.put("consumeRun1-in-0", binding); + props.setBindings(bindings); + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + ListenerContainerCustomizer customizer = config.customizer(props); + customizer.configure(container, "destination", "testGroup"); + + assertThat(container.getConsumerArguments()) + .containsEntry("x-priority", 0); + } + + @Test + void configureXPriorityForMultipleConsumers() { + RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration(); + BindingServiceProperties props = new BindingServiceProperties(); + + BindingProperties binding = new BindingProperties(); + binding.setGroup("testGroup"); + Map bindings = new HashMap<>(); + bindings.put("consumeRun1-in-0", binding); + + props.setBindings(bindings); + + ListenerContainerCustomizer customizer = + config.customizer(props); + + SimpleMessageListenerContainer c1 = new SimpleMessageListenerContainer(); + SimpleMessageListenerContainer c2 = new SimpleMessageListenerContainer(); + + customizer.configure(c1, "destination", "testGroup"); + customizer.configure(c2, "destination", "testGroup"); + + assertThat(c1.getConsumerArguments()) + .containsEntry("x-priority", 0); + assertThat(c2.getConsumerArguments()) + .containsEntry("x-priority", 1); + } + + @Test + void shouldNotSetPriorityForDifferentGroup() { + RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration(); + BindingServiceProperties props = new BindingServiceProperties(); + + BindingProperties binding = new BindingProperties(); + binding.setGroup("expectedGroup"); + Map bindings = new HashMap<>(); + bindings.put("consumeRun1-in-0", binding); + + props.setBindings(bindings); + + ListenerContainerCustomizer customizer = + config.customizer(props); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + customizer.configure(container, "destination", "otherGroup"); + + assertThat(container.getConsumerArguments()).isNullOrEmpty(); + } + + @Test + void shouldHandleMissingBindingWithoutNpe() { + RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration(); + BindingServiceProperties props = new BindingServiceProperties(); + props.setBindings(new HashMap<>()); + + ListenerContainerCustomizer customizer = + config.customizer(props); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + customizer.configure(container, "destination", "anyGroup"); + + assertThat(container.getConsumerArguments()).isNullOrEmpty(); + } +} \ No newline at end of file From f5f8e0a347982346a466d21a81e357e235958aeb Mon Sep 17 00:00:00 2001 From: LE SAULNIER Kevin Date: Wed, 11 Mar 2026 11:19:12 +0100 Subject: [PATCH 3/3] fix checkstyle Signed-off-by: LE SAULNIER Kevin --- .../rabbitmq/RabbitConsumerAutoConfigurationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java b/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java index 2c4c643..5e4d757 100644 --- a/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java +++ b/src/test/java/org/gridsuite/computation/rabbitmq/RabbitConsumerAutoConfigurationTest.java @@ -95,4 +95,4 @@ void shouldHandleMissingBindingWithoutNpe() { assertThat(container.getConsumerArguments()).isNullOrEmpty(); } -} \ No newline at end of file +}