-
Notifications
You must be signed in to change notification settings - Fork 16
Open
Description
When using ExchangeType.DIRECT, tasks are sent but workers don't consume them due to a routing key mismatch.
Reproduction:
- Install dependencies.
pip install taskiq taskiq-aiopika taskiq-redis
- Copy this script.
from taskiq_redis import RedisAsyncResultBackend
from taskiq_aio_pika import AioPikaBroker
APP_NAME = "test-app"
AMQP_URL = "your rabbitmq url"
REDIS_URL = "your redis url"
broker = AioPikaBroker(
url=AMQP_URL,
queue_name=APP_NAME,
exchange_name=APP_NAME,
routing_key=APP_NAME,
exchange_type=ExchangeType.DIRECT,
).with_result_backend(
RedisAsyncResultBackend(redis_url=REDIS_URL),
)
@broker.task
async def test_task(message: str) -> str:
return f"Processed: {message}"
async def main():
await broker.startup()
task = await test_task.kiq("Hello World!")
try:
result = await task.wait_result(timeout=5)
print(result.return_value)
except TaskiqResultTimeoutError:
print("TIMEOUT")
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
- Run worker.
taskiq worker script_name:broker
python your-script.py
Result: The script prints "TIMEOUT" because the worker never receives the task.
Root Cause
In AioPikaBroker.kick() method, messages are published using the task name as routing key:
await exchange.publish(rmq_message, routing_key=message.task_name)
However, the queue is bound using the configured routing_key parameter. For DIRECT exchanges, routing keys must match exactly:
- Queue binding: routing_key="test-app"
- Message publishing: routing_key="script_name:test_task"
- Result: "test-app" ≠ "script_name:test_task" → no delivery
For TOPIC exchange type everything works cause routing_key is set as wildcard by default.
Expected Fix
# In kick() method
if self._exchange_type == ExchangeType.DIRECT:
routing_key = self._routing_key
else:
routing_key = message.task_name
# inside if delay is None:
await exchange.publish(rmq_message, routing_key=routing_key)
With this fix everything works fine.
FelixYe0509FelixYe0509
Metadata
Metadata
Assignees
Labels
No labels