Skip to content

Commit 792abc5

Browse files
authored
feat: Rust SDK based implementation of - accumulator, batchmap, map, mapstream, reduce, session_reduce (#273)
Signed-off-by: Vigith Maurice <[email protected]>
1 parent ceed3fc commit 792abc5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+7105
-0
lines changed

.gitignore

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
# Generated by Cargo
2+
# will have compiled files and executables
3+
debug/
4+
target/
5+
6+
7+
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
8+
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
9+
Cargo.lock
10+
11+
# These are backup files generated by rustfmt
12+
**/*.rs.bk
13+
14+
# Skip wheel
15+
**/pynumaflow_lite-*.whl
16+
117
# Byte-compiled / optimized / DLL files
218
__pycache__/
319
*.py[cod]
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
[package]
2+
name = "pynumaflow-lite"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
[lib]
8+
name = "pynumaflow_lite"
9+
crate-type = ["cdylib", "rlib"]
10+
11+
[dependencies]
12+
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "export-accum-items" }
13+
pyo3 = { version = "0.26.0", features = ["chrono", "experimental-inspect"] }
14+
tokio = "1.47.1"
15+
tonic = "0.14.2"
16+
tokio-stream = "0.1.17"
17+
tower = "0.5.2"
18+
hyper-util = "0.1.16"
19+
prost-types = "0.14.1"
20+
chrono = "0.4.42"
21+
pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
22+
futures-core = "0.3.31"
23+
pin-project = "1.1.10"
24+
25+
26+
[[bin]]
27+
name = "test_map"
28+
path = "tests/bin/map.rs"
29+
30+
[[bin]]
31+
name = "test_batchmap"
32+
path = "tests/bin/batchmap.rs"
33+
34+
35+
[[bin]]
36+
name = "test_mapstream"
37+
path = "tests/bin/mapstream.rs"
38+
39+
40+
[[bin]]
41+
name = "test_reduce"
42+
path = "tests/bin/reduce.rs"
43+
44+
[[bin]]
45+
name = "test_session_reduce"
46+
path = "tests/bin/session_reduce.rs"
47+
48+
[[bin]]
49+
name = "test_accumulator"
50+
path = "tests/bin/accumulator.rs"

packages/pynumaflow-lite/Makefile

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
.PHONY: help build develop stubgen clean test test-rust
2+
3+
# Default Python package/module name
4+
MODULE ?= pynumaflow_lite
5+
6+
# Optional args to pass through to cargo test, e.g., ARGS="--lib pyiterables::tests::py_async_iter_stream_yields_incrementally"
7+
ARGS ?=
8+
9+
10+
help:
11+
@echo "Targets:"
12+
@echo " build - cargo build the Rust library"
13+
@echo " develop - maturin develop (install in current Python env)"
14+
@echo " test - run end-to-end pytest (depends on develop)"
15+
@echo " test-rust - cargo test with PYTHONHOME set; pass args via ARGS=\"...\""
16+
17+
@echo " clean - cargo clean"
18+
19+
build:
20+
cargo build
21+
22+
# Installs the extension into the active Python environment.
23+
# You can then discover the installed .so path to run stubgen against it if preferred.
24+
develop:
25+
maturin develop
26+
27+
# Run pytest end-to-end tests. Assumes a working Python env with pytest installed.
28+
# Example: (cd pynumaflow-lite && make test)
29+
# Note: we do not install pytest here to avoid mutating global envs.
30+
test: develop
31+
pytest -v
32+
33+
34+
# Run cargo tests with PYTHONHOME pointed at base_prefix so embedded CPython finds stdlib
35+
# Usage examples:
36+
# make test-rust ARGS="--lib"
37+
# make test-rust ARGS="--lib pyiterables::tests::py_async_iter_stream_yields_incrementally"
38+
test-rust:
39+
@export PYTHONHOME="$(shell python -c 'import sys; print(sys.base_prefix)')" && \
40+
cargo test $(ARGS)
41+
42+
43+
clean:
44+
cargo clean

packages/pynumaflow-lite/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
## Development Setup
2+
3+
```bash
4+
# new venv
5+
uv venv
6+
7+
# activate venv
8+
source venv/bin/activate
9+
10+
uv pip install maturin
11+
12+
# install dependencies
13+
uv sync
14+
```
15+
16+
### Testing
17+
18+
```bash
19+
make test
20+
```
21+
22+
### HOWTO create .whl
23+
24+
Go to `pynumaflow-lite` (top level) directory and run the below command.
25+
26+
```bash
27+
docker run --rm -v $(pwd):/io ghcr.io/pyo3/maturin build -i python3.11 --release
28+
```
29+
30+
This will create the `wheel` file in `target/wheels/` directory. You should copy it over to where we
31+
are writing the python code referencing this library.
32+
33+
e.g.,
34+
35+
```bash
36+
cp target/wheels/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl manifests/simple-async-map/
37+
```
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
FROM python:3.11-slim-bullseye AS builder
2+
3+
ENV PYTHONFAULTHANDLER=1 \
4+
PYTHONUNBUFFERED=1 \
5+
PYTHONHASHSEED=random \
6+
PIP_NO_CACHE_DIR=on \
7+
PIP_DISABLE_PIP_VERSION_CHECK=on \
8+
PIP_DEFAULT_TIMEOUT=100 \
9+
POETRY_HOME="/opt/poetry" \
10+
POETRY_VIRTUALENVS_IN_PROJECT=true \
11+
POETRY_NO_INTERACTION=1 \
12+
PYSETUP_PATH="/opt/pysetup"
13+
14+
ENV PATH="$POETRY_HOME/bin:$PATH"
15+
16+
RUN apt-get update \
17+
&& apt-get install --no-install-recommends -y \
18+
curl \
19+
wget \
20+
# deps for building python deps
21+
build-essential \
22+
&& apt-get install -y git \
23+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
24+
&& curl -sSL https://install.python-poetry.org | python3 -
25+
26+
FROM builder AS udf
27+
28+
WORKDIR $PYSETUP_PATH
29+
COPY ./ ./
30+
31+
# NOTE: place the built wheel in this directory before building the image
32+
RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
33+
34+
RUN poetry lock
35+
RUN poetry install --no-cache --no-root && \
36+
rm -rf ~/.cache/pypoetry/
37+
38+
CMD ["python", "accumulator_stream_sorter.py"]
39+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
To create the `wheel` file, refer [root](../../README.md)
2+
3+
## HOWTO build Image
4+
5+
```bash
6+
docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1 --load
7+
```
8+
9+
Load it now to `k3d`
10+
11+
### `k3d`
12+
13+
```bash
14+
k3d image import quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1
15+
```
16+
17+
### Minikube
18+
19+
```bash
20+
minikube image load quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1
21+
```
22+
23+
#### Delete image from minikube
24+
25+
`minikube` doesn't like pushing the same image over, delete and load if you are using
26+
the same tag.
27+
28+
```bash
29+
minikube image rm quay.io/numaio/numaflow/pynumaflow-lite-accumulator-stream-sorter:v1
30+
```
31+
32+
## Run the pipeline
33+
34+
```bash
35+
kubectl apply -f pipeline.yaml
36+
```
37+
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""
2+
Stream sorter accumulator example.
3+
4+
This accumulator buffers incoming data and sorts it by event time,
5+
flushing sorted data when the watermark advances.
6+
"""
7+
import asyncio
8+
from datetime import datetime
9+
from typing import AsyncIterator
10+
11+
from pynumaflow_lite.accumulator import Datum, Message, AccumulatorAsyncServer, Accumulator
12+
13+
14+
class StreamSorter(Accumulator):
15+
"""
16+
A stream sorter that buffers and sorts data by event time,
17+
flushing when watermark advances.
18+
"""
19+
20+
def __init__(self):
21+
from datetime import timezone
22+
# Initialize with a very old timestamp (timezone-aware)
23+
self.latest_wm = datetime.fromtimestamp(-1, tz=timezone.utc)
24+
self.sorted_buffer: list[Datum] = []
25+
print("StreamSorter initialized")
26+
27+
async def handler(self, datums: AsyncIterator[Datum]) -> AsyncIterator[Message]:
28+
"""
29+
Buffer and sort datums, yielding sorted messages when watermark advances.
30+
"""
31+
print("Handler started, waiting for datums...")
32+
datum_count = 0
33+
34+
async for datum in datums:
35+
datum_count += 1
36+
print(f"Received datum #{datum_count}: event_time={datum.event_time}, "
37+
f"watermark={datum.watermark}, value={datum.value}")
38+
39+
# If watermark has moved forward
40+
if datum.watermark and datum.watermark > self.latest_wm:
41+
old_wm = self.latest_wm
42+
self.latest_wm = datum.watermark
43+
print(f"Watermark advanced from {old_wm} to {self.latest_wm}")
44+
45+
# Flush buffer
46+
flushed = 0
47+
async for msg in self.flush_buffer():
48+
flushed += 1
49+
yield msg
50+
51+
if flushed > 0:
52+
print(f"Flushed {flushed} messages from buffer")
53+
54+
# Insert into sorted buffer
55+
self.insert_sorted(datum)
56+
print(f"Buffer size: {len(self.sorted_buffer)}")
57+
58+
print(f"Handler finished. Total datums processed: {datum_count}")
59+
print(f"Remaining in buffer: {len(self.sorted_buffer)}")
60+
61+
# Flush any remaining items in the buffer at the end
62+
if self.sorted_buffer:
63+
print("Flushing remaining buffer at end...")
64+
for datum in self.sorted_buffer:
65+
print(f" Flushing: event_time={datum.event_time}, value={datum.value}")
66+
# Use Message.from_datum to preserve all metadata
67+
yield Message.from_datum(datum)
68+
self.sorted_buffer = []
69+
70+
def insert_sorted(self, datum: Datum):
71+
"""Binary insert to keep sorted buffer in order by event_time."""
72+
left, right = 0, len(self.sorted_buffer)
73+
while left < right:
74+
mid = (left + right) // 2
75+
if self.sorted_buffer[mid].event_time > datum.event_time:
76+
right = mid
77+
else:
78+
left = mid + 1
79+
self.sorted_buffer.insert(left, datum)
80+
81+
async def flush_buffer(self) -> AsyncIterator[Message]:
82+
"""Flush all items from buffer that are before or at the watermark."""
83+
i = 0
84+
for datum in self.sorted_buffer:
85+
if datum.event_time > self.latest_wm:
86+
break
87+
print(f" Flushing: event_time={datum.event_time}, value={datum.value}")
88+
# Use Message.from_datum to preserve all metadata (id, headers, event_time, watermark)
89+
yield Message.from_datum(datum)
90+
i += 1
91+
92+
# Remove flushed items
93+
self.sorted_buffer = self.sorted_buffer[i:]
94+
95+
96+
async def main():
97+
"""
98+
Start the accumulator server.
99+
"""
100+
import signal
101+
102+
server = AccumulatorAsyncServer()
103+
104+
# Set up signal handlers for graceful shutdown
105+
loop = asyncio.get_running_loop()
106+
try:
107+
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
108+
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
109+
except (NotImplementedError, RuntimeError):
110+
pass
111+
112+
try:
113+
print("Starting Stream Sorter Accumulator Server...")
114+
await server.start(StreamSorter)
115+
print("Shutting down gracefully...")
116+
except asyncio.CancelledError:
117+
try:
118+
server.stop()
119+
except Exception:
120+
pass
121+
return
122+
123+
124+
# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
125+
import signal
126+
signal.signal(signal.SIGINT, signal.default_int_handler)
127+
try:
128+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
129+
except AttributeError:
130+
pass
131+
132+
if __name__ == "__main__":
133+
asyncio.run(main())

0 commit comments

Comments
 (0)