diff --git a/docs/reference/kombu.transport.SQS.rst b/docs/reference/kombu.transport.SQS.rst index e5136ebdc..4766b330a 100644 --- a/docs/reference/kombu.transport.SQS.rst +++ b/docs/reference/kombu.transport.SQS.rst @@ -70,4 +70,31 @@ Message Attributes SQS supports sending message attributes along with the message body. To use this feature, you can pass a 'message_attributes' as keyword argument -to `basic_publish` method. \ No newline at end of file +to `basic_publish` method. + +Fair Queue Support (only available from version 5.7.0+) +------------------------ + +Kombu supports Amazon SQS Fair Queues, which provide improved message processing fairness by ensuring that messages from different message groups +are processed in a balanced manner. + +Fair Queues are designed to prevent a single message group (or tenant) from monopolizing +consumer resources, which can happen with standard queues that handle multi-tenant +workloads with unbalanced message distribution. + +When publishing messages to a Fair Queue, you should provide a `MessageGroupId`. This can be done by passing it as a +keyword argument to the `publish` method. While the Kombu implementation only sends `MessageGroupId` if it is present, AWS requires it for FIFO and Fair Queues. If omitted, AWS will reject the message or fairness will not be guaranteed. For standard queues, `MessageGroupId` is optional.:: + + producer.publish( + message, + routing_key='my-fair-queue', + MessageGroupId='customer-123' # Required for FIFO queues; needed for Fair queue functionality on standard queues + ) + +Benefits of using Fair Queues with Kombu: +- Improved message processing fairness across message groups +- Better workload distribution among consumers +- Eliminates noisy neighbor problem + +For more information, refer to the AWS documentation on Fair Queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html + diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index cfe2fe014..b6373a769 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -463,11 +463,13 @@ def _put(self, queue, message, **kwargs): # we don't want to want to have the attribute in the body kwargs['MessageAttributes'] = \ message['properties'].pop('message_attributes') + # Support SQS fair queue system. + if 'MessageGroupId' in message['properties']: + kwargs['MessageGroupId'] = \ + message['properties']['MessageGroupId'] + # Support FIFO queues. if queue.endswith('.fifo'): - if 'MessageGroupId' in message['properties']: - kwargs['MessageGroupId'] = \ - message['properties']['MessageGroupId'] - else: + if 'MessageGroupId' not in kwargs: kwargs['MessageGroupId'] = 'default' if 'MessageDeduplicationId' in message['properties']: kwargs['MessageDeduplicationId'] = \ diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 4456de844..d96b3391b 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -1151,6 +1151,195 @@ def test_predefined_queues_put_to_fifo_queue(self): assert 'MessageDeduplicationId' in \ sqs_queue_mock.send_message.call_args[1] + def test_predefined_queues_put_with_message_group_id(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-1' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message', MessageGroupId='test-group-id') + + sqs_queue_mock.send_message.assert_called_once() + assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1] + assert sqs_queue_mock.send_message.call_args[1]['MessageGroupId'] == 'test-group-id' + + def test_non_fifo_queue_without_message_group_id(self): + """Test that non-FIFO queues don't get MessageGroupId when not provided""" + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-1' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message') + + sqs_queue_mock.send_message.assert_called_once() + assert 'MessageGroupId' not in sqs_queue_mock.send_message.call_args[1] + + def test_fifo_queue_with_custom_message_group_id(self): + """Test that FIFO queues respect custom MessageGroupId and don't override with 'default'""" + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-3.fifo' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message', MessageGroupId='custom-group-123') + + sqs_queue_mock.send_message.assert_called_once() + call_kwargs = sqs_queue_mock.send_message.call_args[1] + assert 'MessageGroupId' in call_kwargs + assert call_kwargs['MessageGroupId'] == 'custom-group-123' + # FIFO queue should also have MessageDeduplicationId + assert 'MessageDeduplicationId' in call_kwargs + + def test_fifo_queue_with_custom_message_group_id_and_deduplication_id(self): + """Test FIFO queue with both custom MessageGroupId and MessageDeduplicationId""" + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-3.fifo' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish( + 'message', + MessageGroupId='tenant-456', + MessageDeduplicationId='unique-msg-789' + ) + + sqs_queue_mock.send_message.assert_called_once() + call_kwargs = sqs_queue_mock.send_message.call_args[1] + assert call_kwargs['MessageGroupId'] == 'tenant-456' + assert call_kwargs['MessageDeduplicationId'] == 'unique-msg-789' + + def test_message_group_id_with_special_characters(self): + """Test MessageGroupId with special characters""" + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-1' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + + # Test with hyphens, underscores, and alphanumeric + group_id = 'customer-123_tenant-abc-XYZ' + p.publish('message', MessageGroupId=group_id) + + sqs_queue_mock.send_message.assert_called_once() + call_kwargs = sqs_queue_mock.send_message.call_args[1] + assert call_kwargs['MessageGroupId'] == group_id + + def test_standard_queue_with_message_group_id_and_delay(self): + """Test that non-FIFO queue can have both MessageGroupId and DelaySeconds""" + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-2' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + p.publish('message', MessageGroupId='group-1', DelaySeconds=5) + + sqs_queue_mock.send_message.assert_called_once() + call_kwargs = sqs_queue_mock.send_message.call_args[1] + assert call_kwargs['MessageGroupId'] == 'group-1' + assert call_kwargs['DelaySeconds'] == 5 + # Non-FIFO queue should not have MessageDeduplicationId + assert 'MessageDeduplicationId' not in call_kwargs + + def test_message_group_id_with_message_attributes(self): + """Test that MessageGroupId works alongside message_attributes""" + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + queue_name = 'queue-1' + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + + p.publish( + 'message', + MessageGroupId='group-2', + message_attributes={ + 'CustomAttr': {'DataType': 'String', 'StringValue': 'test'} + } + ) + + sqs_queue_mock.send_message.assert_called_once() + call_kwargs = sqs_queue_mock.send_message.call_args[1] + assert call_kwargs['MessageGroupId'] == 'group-2' + assert 'MessageAttributes' in call_kwargs + assert call_kwargs['MessageAttributes']['CustomAttr']['StringValue'] == 'test' + def test_predefined_queues_put_to_queue(self): connection = Connection(transport=SQS.Transport, transport_options={ 'predefined_queues': example_predefined_queues,