Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
name: Testing package
name: Testing taskiq-valkey

on: push
on:
push:
branches:
- main
- develop
pull_request:

jobs:
lint:
Expand Down Expand Up @@ -32,10 +37,11 @@ jobs:
strategy:
matrix:
py_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
os: [ubuntu-latest, windows-latest]
runs-on: "${{ matrix.os }}"
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v4
- name: Set up Valkey instance and Valkey cluster
run: docker compose up -d
- name: Install poetry
run: pipx install poetry
- name: Set up Python
Expand All @@ -51,7 +57,7 @@ jobs:
run: poetry run coverage xml
- name: Upload coverage reports to Codecov with GitHub Action
uses: codecov/codecov-action@v3
if: matrix.os == 'ubuntu-latest' && matrix.py_version == '3.11'
if: matrix.py_version == '3.11'
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
rev: v5.0.0
hooks:
- id: check-ast
- id: trailing-whitespace
Expand Down
135 changes: 134 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,134 @@
# taskiq_valkey
# TaskIQ-Valkey

Taskiq-valkey is a plugin for taskiq that adds a new broker and result backend based on valkey.

# Installation

To use this project you must have installed core taskiq library:
```bash
pip install taskiq
```
This project can be installed using pip:
```bash
pip install taskiq-valkey
```

# Usage

Let's see the example with the valkey broker and valkey async result:

```python
# broker.py
import asyncio

from taskiq_valkey import ValkeyAsyncResultBackend, ValkeyStreamBroker

result_backend = ValkeyAsyncResultBackend(
valkey_url="valkey://localhost:6379",
)

# Or you can use PubSubBroker if you need broadcasting
broker = ValkeyStreamBroker(
valkey_url="valkey://localhost:6379",
).with_result_backend(result_backend)


@broker.task
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(5.5)
print("All problems are solved!")


async def main():
task = await best_task_ever.kiq()
print(await task.wait_result())


if __name__ == "__main__":
asyncio.run(main())
```

Launch the workers:
`taskiq worker broker:broker`
Then run the main code:
`python3 broker.py`


## Brokers

This package contains 6 broker implementations. We have two broker types: `PubSub` and `Stream`.

Each of type is implemented for each valkey architecture:
* Single node
* Cluster
* Sentinel

Here's a small breakdown of how they differ from eachother.


### PubSub

By default on old valkey versions PUBSUB was the way of making valkey into a queue.
But using PUBSUB means that all messages delivered to all subscribed consumers.

> [!WARNING]
> This broker doesn't support acknowledgements. If during message processing
> Worker was suddenly killed the message is going to be lost.

### Stream

Stream brokers use valkey [stream type](https://valkey.io/topics/streams-intro/) to store and fetch messages.

> [!TIP]
> This broker **supports** acknowledgements and therefore is fine to use in cases when data durability is
> required.

## ValkeyAsyncResultBackend configuration

ValkeyAsyncResultBackend parameters:
* `valkey_url` - url to valkey.
* `keep_results` - flag to not remove results from Valkey after reading.
* `result_ex_time` - expire time in seconds (by default - not specified)
* `result_px_time` - expire time in milliseconds (by default - not specified)
* Any other keyword arguments are passed to `valkey.asyncio.BlockingConnectionPool`.
Notably, you can use `timeout` to set custom timeout in seconds for reconnects
(or set it to `None` to try reconnects indefinitely).

> [!WARNING]
> **It is highly recommended to use expire time in ValkeyAsyncResultBackend**
> If you want to add expiration, either `result_ex_time` or `result_px_time` must be set.
> ```python
> # First variant
> valkey_async_result = ValkeyAsyncResultBackend(
> valkey_url="valkey://localhost:6379",
> result_ex_time=1000,
> )
>
> # Second variant
> valkey_async_result = ValkeyAsyncResultBackend(
> valkey_url="valkey://localhost:6379",
> result_px_time=1000000,
> )
> ```


## Schedule sources


You can use this package to add dynamic schedule sources. They are used to store
schedules for taskiq scheduler.

The advantage of using schedule sources from this package over default `LabelBased` source is that you can
dynamically add schedules in it.

For now we have only one type of schedules - `ListValkeyScheduleSource`.

### ListValkeyScheduleSource

This source holds values in lists.

* For cron tasks it uses key `{prefix}:cron`.
* For timed schedules it uses key `{prefix}:time:{time}` where `{time}` is actually time where schedules should run.

The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to valkey.
85 changes: 85 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
services:
valkey:
image: bitnami/valkey:7.2.7
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s
ports:
- 7000:6379
valkey-node-0: &valkey-node
image: docker.io/bitnami/valkey-cluster:7.2.7
environment:
ALLOW_EMPTY_PASSWORD: "yes"
VALKEY_NODES: "valkey-node-0 valkey-node-1 valkey-node-2 valkey-node-3 valkey-node-4 valkey-node-5"
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s

valkey-node-1:
<<: *valkey-node

valkey-node-2:
<<: *valkey-node

valkey-node-3:
<<: *valkey-node

valkey-node-4:
<<: *valkey-node

valkey-node-5:
image: docker.io/bitnami/valkey-cluster:7.2.7
depends_on:
- valkey-node-0
- valkey-node-1
- valkey-node-2
- valkey-node-3
- valkey-node-4
environment:
ALLOW_EMPTY_PASSWORD: "yes"
VALKEY_NODES: "valkey-node-0 valkey-node-1 valkey-node-2 valkey-node-3 valkey-node-4 valkey-node-5"
VALKEY_CLUSTER_REPLICAS: 1
VALKEY_CLUSTER_CREATOR: "yes"
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s
ports:
- 7001:6379

valkey-master:
image: bitnami/valkey:7.2.7
environment:
ALLOW_EMPTY_PASSWORD: "yes"
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s

valkey-sentinel:
image: bitnami/valkey-sentinel:7.2.7
depends_on:
- valkey-master
environment:
ALLOW_EMPTY_PASSWORD: "yes"
VALKEY_MASTER_HOST: "valkey-master"
healthcheck:
test: ["CMD", "valkey-cli", "-p", "26379", "ping"]
interval: 5s
timeout: 5s
retries: 3
start_period: 10s
ports:
- 7002:26379
Loading
Loading