Skip to content

Tasks not delivered with DIRECT exchange #43

@recursionhater0

Description

@recursionhater0

When using ExchangeType.DIRECT, tasks are sent but workers don't consume them due to a routing key mismatch.

Reproduction:

  1. Install dependencies.
pip install taskiq taskiq-aiopika taskiq-redis
  1. Copy this script.
from taskiq_redis import RedisAsyncResultBackend
from taskiq_aio_pika import AioPikaBroker

APP_NAME = "test-app"
AMQP_URL = "your rabbitmq url"
REDIS_URL = "your redis url"

broker = AioPikaBroker(
    url=AMQP_URL,
    queue_name=APP_NAME,
    exchange_name=APP_NAME,
    routing_key=APP_NAME,
    exchange_type=ExchangeType.DIRECT,
).with_result_backend(
    RedisAsyncResultBackend(redis_url=REDIS_URL),
)

@broker.task
async def test_task(message: str) -> str:
    return f"Processed: {message}"


async def main():
    await broker.startup()

    task = await test_task.kiq("Hello World!")
    try:
        result = await task.wait_result(timeout=5)
        print(result.return_value)
    except TaskiqResultTimeoutError:
        print("TIMEOUT")

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  1. Run worker.
taskiq worker script_name:broker

python your-script.py

Result: The script prints "TIMEOUT" because the worker never receives the task.

Root Cause

In AioPikaBroker.kick() method, messages are published using the task name as routing key:

await exchange.publish(rmq_message, routing_key=message.task_name)

However, the queue is bound using the configured routing_key parameter. For DIRECT exchanges, routing keys must match exactly:

  • Queue binding: routing_key="test-app"
  • Message publishing: routing_key="script_name:test_task"
  • Result: "test-app" ≠ "script_name:test_task" → no delivery

For TOPIC exchange type everything works cause routing_key is set as wildcard by default.

Expected Fix

# In kick() method
if self._exchange_type == ExchangeType.DIRECT:
    routing_key = self._routing_key
else:
    routing_key = message.task_name

# inside if delay is None:
await exchange.publish(rmq_message, routing_key=routing_key)

With this fix everything works fine.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions