Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@
<artifactId>spring-cloud-aws-starter-s3</artifactId>
</dependency>

<!-- AMQP - rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- AspectJ -->
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.gridsuite.computation.rabbitmq;

import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.annotation.Bean;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

@AutoConfiguration
@ConditionalOnProperty(
name = "computation.rabbit.consume-run-load-balanced.enabled",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the intent of this?
If it's not used in our services and true by default?

havingValue = "true",
matchIfMissing = true
)
public class RabbitConsumerAutoConfiguration {
private static final String RABBITMQ_CONSUMER_NAME_TO_LOAD_BALANCE = "consumeRun1-in-0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect all computation servers to use this consumer name, right?


/*
* RabbitMQ consumer priority:
* https://www.rabbitmq.com/docs/consumer-priority
*
* Each container creates exactly one AMQP consumer with prefetch=1 and its own priority.
* When dispatching messages, RabbitMQ always selects the highest-priority consumer
* that is available.
*/
@Bean
public ListenerContainerCustomizer<MessageListenerContainer> customizer(BindingServiceProperties bindingServiceProperties) {
String computationRunGroup = Optional.ofNullable(bindingServiceProperties.getBindings())
.map(bindings -> bindings.get(RABBITMQ_CONSUMER_NAME_TO_LOAD_BALANCE))
.map(BindingProperties::getGroup)
.orElse(null);

/*
* Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
* We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities
*/
AtomicInteger index = new AtomicInteger();
return (container, destination, group) -> {
if (container instanceof SimpleMessageListenerContainer smlc
&& computationRunGroup != null
&& Objects.equals(group, computationRunGroup)) {
smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement()));
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# AutoConfigureCache auto-configuration imports
org.gridsuite.computation.s3.S3AutoConfiguration
org.gridsuite.computation.s3.S3AutoConfiguration
org.gridsuite.computation.rabbitmq.RabbitConsumerAutoConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.gridsuite.computation.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;

import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

class RabbitConsumerAutoConfigurationTest {

@Test
void configureXPriorityForConsumerName() {
RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration();
BindingServiceProperties props = new BindingServiceProperties();

BindingProperties binding = new BindingProperties();
binding.setGroup("testGroup");
Map<String, BindingProperties> bindings = new HashMap<>();
bindings.put("consumeRun1-in-0", binding);
props.setBindings(bindings);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
ListenerContainerCustomizer<MessageListenerContainer> customizer = config.customizer(props);
customizer.configure(container, "destination", "testGroup");

assertThat(container.getConsumerArguments())
.containsEntry("x-priority", 0);
}

@Test
void configureXPriorityForMultipleConsumers() {
RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration();
BindingServiceProperties props = new BindingServiceProperties();

BindingProperties binding = new BindingProperties();
binding.setGroup("testGroup");
Map<String, BindingProperties> bindings = new HashMap<>();
bindings.put("consumeRun1-in-0", binding);

props.setBindings(bindings);

ListenerContainerCustomizer<MessageListenerContainer> customizer =
config.customizer(props);

SimpleMessageListenerContainer c1 = new SimpleMessageListenerContainer();
SimpleMessageListenerContainer c2 = new SimpleMessageListenerContainer();

customizer.configure(c1, "destination", "testGroup");
customizer.configure(c2, "destination", "testGroup");

assertThat(c1.getConsumerArguments())
.containsEntry("x-priority", 0);
assertThat(c2.getConsumerArguments())
.containsEntry("x-priority", 1);
}

@Test
void shouldNotSetPriorityForDifferentGroup() {
RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration();
BindingServiceProperties props = new BindingServiceProperties();

BindingProperties binding = new BindingProperties();
binding.setGroup("expectedGroup");
Map<String, BindingProperties> bindings = new HashMap<>();
bindings.put("consumeRun1-in-0", binding);

props.setBindings(bindings);

ListenerContainerCustomizer<MessageListenerContainer> customizer =
config.customizer(props);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

customizer.configure(container, "destination", "otherGroup");

assertThat(container.getConsumerArguments()).isNullOrEmpty();
}

@Test
void shouldHandleMissingBindingWithoutNpe() {
RabbitConsumerAutoConfiguration config = new RabbitConsumerAutoConfiguration();
BindingServiceProperties props = new BindingServiceProperties();
props.setBindings(new HashMap<>());

ListenerContainerCustomizer<MessageListenerContainer> customizer =
config.customizer(props);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

customizer.configure(container, "destination", "anyGroup");

assertThat(container.getConsumerArguments()).isNullOrEmpty();
}
}
Loading