|
1 | | -# taskiq_valkey |
| 1 | +# TaskIQ-Valkey |
| 2 | + |
| 3 | +Taskiq-valkey is a plugin for taskiq that adds a new broker and result backend based on valkey. |
| 4 | + |
| 5 | +# Installation |
| 6 | + |
| 7 | +To use this project you must have installed core taskiq library: |
| 8 | +```bash |
| 9 | +pip install taskiq |
| 10 | +``` |
| 11 | +This project can be installed using pip: |
| 12 | +```bash |
| 13 | +pip install taskiq-valkey |
| 14 | +``` |
| 15 | + |
| 16 | +# Usage |
| 17 | + |
| 18 | +Let's see the example with the valkey broker and valkey async result: |
| 19 | + |
| 20 | +```python |
| 21 | +# broker.py |
| 22 | +import asyncio |
| 23 | + |
| 24 | +from taskiq_valkey import ValkeyAsyncResultBackend, ValkeyStreamBroker |
| 25 | + |
| 26 | +result_backend = ValkeyAsyncResultBackend( |
| 27 | + valkey_url="valkey://localhost:6379", |
| 28 | +) |
| 29 | + |
| 30 | +# Or you can use PubSubBroker if you need broadcasting |
| 31 | +broker = ValkeyStreamBroker( |
| 32 | + valkey_url="valkey://localhost:6379", |
| 33 | +).with_result_backend(result_backend) |
| 34 | + |
| 35 | + |
| 36 | +@broker.task |
| 37 | +async def best_task_ever() -> None: |
| 38 | + """Solve all problems in the world.""" |
| 39 | + await asyncio.sleep(5.5) |
| 40 | + print("All problems are solved!") |
| 41 | + |
| 42 | + |
| 43 | +async def main(): |
| 44 | + task = await best_task_ever.kiq() |
| 45 | + print(await task.wait_result()) |
| 46 | + |
| 47 | + |
| 48 | +if __name__ == "__main__": |
| 49 | + asyncio.run(main()) |
| 50 | +``` |
| 51 | + |
| 52 | +Launch the workers: |
| 53 | +`taskiq worker broker:broker` |
| 54 | +Then run the main code: |
| 55 | +`python3 broker.py` |
| 56 | + |
| 57 | + |
| 58 | +## Brokers |
| 59 | + |
| 60 | +This package contains 6 broker implementations. We have two broker types: `PubSub` and `Stream`. |
| 61 | + |
| 62 | +Each of type is implemented for each valkey architecture: |
| 63 | +* Single node |
| 64 | +* Cluster |
| 65 | +* Sentinel |
| 66 | + |
| 67 | +Here's a small breakdown of how they differ from eachother. |
| 68 | + |
| 69 | + |
| 70 | +### PubSub |
| 71 | + |
| 72 | +By default on old valkey versions PUBSUB was the way of making valkey into a queue. |
| 73 | +But using PUBSUB means that all messages delivered to all subscribed consumers. |
| 74 | + |
| 75 | +> [!WARNING] |
| 76 | +> This broker doesn't support acknowledgements. If during message processing |
| 77 | +> Worker was suddenly killed the message is going to be lost. |
| 78 | +
|
| 79 | +### Stream |
| 80 | + |
| 81 | +Stream brokers use valkey [stream type](https://valkey.io/topics/streams-intro/) to store and fetch messages. |
| 82 | + |
| 83 | +> [!TIP] |
| 84 | +> This broker **supports** acknowledgements and therefore is fine to use in cases when data durability is |
| 85 | +> required. |
| 86 | +
|
| 87 | +## ValkeyAsyncResultBackend configuration |
| 88 | + |
| 89 | +ValkeyAsyncResultBackend parameters: |
| 90 | +* `valkey_url` - url to valkey. |
| 91 | +* `keep_results` - flag to not remove results from Valkey after reading. |
| 92 | +* `result_ex_time` - expire time in seconds (by default - not specified) |
| 93 | +* `result_px_time` - expire time in milliseconds (by default - not specified) |
| 94 | +* Any other keyword arguments are passed to `valkey.asyncio.BlockingConnectionPool`. |
| 95 | + Notably, you can use `timeout` to set custom timeout in seconds for reconnects |
| 96 | + (or set it to `None` to try reconnects indefinitely). |
| 97 | + |
| 98 | +> [!WARNING] |
| 99 | +> **It is highly recommended to use expire time in ValkeyAsyncResultBackend** |
| 100 | +> If you want to add expiration, either `result_ex_time` or `result_px_time` must be set. |
| 101 | +> ```python |
| 102 | +> # First variant |
| 103 | +> valkey_async_result = ValkeyAsyncResultBackend( |
| 104 | +> valkey_url="valkey://localhost:6379", |
| 105 | +> result_ex_time=1000, |
| 106 | +> ) |
| 107 | +> |
| 108 | +> # Second variant |
| 109 | +> valkey_async_result = ValkeyAsyncResultBackend( |
| 110 | +> valkey_url="valkey://localhost:6379", |
| 111 | +> result_px_time=1000000, |
| 112 | +> ) |
| 113 | +> ``` |
| 114 | +
|
| 115 | +
|
| 116 | +## Schedule sources |
| 117 | +
|
| 118 | +
|
| 119 | +You can use this package to add dynamic schedule sources. They are used to store |
| 120 | +schedules for taskiq scheduler. |
| 121 | +
|
| 122 | +The advantage of using schedule sources from this package over default `LabelBased` source is that you can |
| 123 | +dynamically add schedules in it. |
| 124 | +
|
| 125 | +For now we have only one type of schedules - `ListValkeyScheduleSource`. |
| 126 | +
|
| 127 | +### ListValkeyScheduleSource |
| 128 | +
|
| 129 | +This source holds values in lists. |
| 130 | +
|
| 131 | +* For cron tasks it uses key `{prefix}:cron`. |
| 132 | +* For timed schedules it uses key `{prefix}:time:{time}` where `{time}` is actually time where schedules should run. |
| 133 | +
|
| 134 | +The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to valkey. |
0 commit comments