Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,26 @@ AioPikaBroker parameters:
* `qos` - number of messages that worker can prefetch.
* `declare_queues` - whether you want to declare queues even on
client side. May be useful for message persistance.
* `declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details.

## Custom Queue Arguments

You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`.

These arguments will be merged with the default arguments used by the broker (such as dead-lettering and priority settings).

**Example:**

```python
broker = AioPikaBroker(
declare_queues_kwargs={
"arguments": {
"x-message-ttl": 60000, # Set message TTL to 60 seconds
"x-queue-type": "quorum", # Use quorum queue type
}
}
)
```

This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.

22 changes: 15 additions & 7 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,21 @@ async def declare_queues(
self._dead_letter_queue_name,
**self._declare_queues_kwargs,
)
args: "Dict[str, Any]" = {
args: Dict[str, Any] = {
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._dead_letter_queue_name,
}
if self._max_priority is not None:
args["x-max-priority"] = self._max_priority
queue = await channel.declare_queue(
self._queue_name,
arguments=args,
**self._declare_queues_kwargs,
**{
**self._declare_queues_kwargs,
"arguments": {
**self._declare_queues_kwargs.get("arguments", {}),
**args,
},
},
)
if self._delayed_message_exchange_plugin:
await queue.bind(
Expand All @@ -206,11 +211,14 @@ async def declare_queues(
else:
await channel.declare_queue(
self._delay_queue_name,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
**{
**self._declare_queues_kwargs,
"arguments": {
**self._declare_queues_kwargs.get("arguments", {}),
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
},
**self._declare_queues_kwargs,
)

await queue.bind(
Expand Down