Skip to content

Commit d9c9192

Browse files
authored
feat: sink integration for pynumaflow-lite (#281)
Signed-off-by: Vigith Maurice <[email protected]>
1 parent c3c3f19 commit d9c9192

File tree

16 files changed

+1071
-4
lines changed

16 files changed

+1071
-4
lines changed

packages/pynumaflow-lite/Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ name = "pynumaflow_lite"
99
crate-type = ["cdylib", "rlib"]
1010

1111
[dependencies]
12-
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "export-accum-items" }
12+
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "fde3deafea634abbc347032ff409d33d4e1514b1" }
1313
pyo3 = { version = "0.26.0", features = ["chrono", "experimental-inspect"] }
1414
tokio = "1.47.1"
1515
tonic = "0.14.2"
@@ -22,6 +22,7 @@ pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
2222
futures-core = "0.3.31"
2323
pin-project = "1.1.10"
2424

25+
## Binaries for testing
2526

2627
[[bin]]
2728
name = "test_map"
@@ -31,12 +32,10 @@ path = "tests/bin/map.rs"
3132
name = "test_batchmap"
3233
path = "tests/bin/batchmap.rs"
3334

34-
3535
[[bin]]
3636
name = "test_mapstream"
3737
path = "tests/bin/mapstream.rs"
3838

39-
4039
[[bin]]
4140
name = "test_reduce"
4241
path = "tests/bin/reduce.rs"
@@ -48,3 +47,7 @@ path = "tests/bin/session_reduce.rs"
4847
[[bin]]
4948
name = "test_accumulator"
5049
path = "tests/bin/accumulator.rs"
50+
51+
[[bin]]
52+
name = "test_sink"
53+
path = "tests/bin/sink.rs"
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
FROM python:3.11-slim-bullseye AS builder
2+
3+
ENV PYTHONFAULTHANDLER=1 \
4+
PYTHONUNBUFFERED=1 \
5+
PYTHONHASHSEED=random \
6+
PIP_NO_CACHE_DIR=on \
7+
PIP_DISABLE_PIP_VERSION_CHECK=on \
8+
PIP_DEFAULT_TIMEOUT=100 \
9+
POETRY_HOME="/opt/poetry" \
10+
POETRY_VIRTUALENVS_IN_PROJECT=true \
11+
POETRY_NO_INTERACTION=1 \
12+
PYSETUP_PATH="/opt/pysetup"
13+
14+
ENV PATH="$POETRY_HOME/bin:$PATH"
15+
16+
RUN apt-get update \
17+
&& apt-get install --no-install-recommends -y \
18+
curl \
19+
wget \
20+
# deps for building python deps
21+
build-essential \
22+
&& apt-get install -y git \
23+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
24+
&& curl -sSL https://install.python-poetry.org | python3 -
25+
26+
FROM builder AS udf
27+
28+
WORKDIR $PYSETUP_PATH
29+
COPY ./ ./
30+
31+
RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
32+
33+
RUN poetry lock
34+
RUN poetry install --no-cache --no-root && \
35+
rm -rf ~/.cache/pypoetry/
36+
37+
CMD ["python", "sink_log.py"]
38+
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
To create the `wheel` file, refer [root](../../README.md)
2+
3+
## HOWTO build Image
4+
5+
```bash
6+
docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-sink-log:v1 --load
7+
```
8+
9+
### `k3d`
10+
11+
Load it now to `k3d`
12+
13+
```bash
14+
k3d image import quay.io/numaio/numaflow/pynumaflow-lite-sink-log:v1
15+
```
16+
17+
### Minikube
18+
19+
```bash
20+
minikube image load quay.io/numaio/numaflow/pynumaflow-lite-sink-log:v1
21+
```
22+
23+
## Run the pipeline
24+
25+
```bash
26+
kubectl apply -f pipeline.yaml
27+
```
28+
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: simple-sink-log
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
# A self data generating source
10+
generator:
11+
rpu: 100
12+
duration: 1s
13+
msgSize: 8
14+
- name: log-sink
15+
scale:
16+
min: 1
17+
sink:
18+
udsink:
19+
container:
20+
image: quay.io/numaio/numaflow/pynumaflow-lite-sink-log:v1
21+
imagePullPolicy: Never
22+
edges:
23+
- from: in
24+
to: log-sink
25+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[project]
2+
name = "sink-log"
3+
version = "0.1.0"
4+
description = "User-defined sink example using pynumaflow-lite"
5+
authors = [
6+
{ name = "Vigith Maurice", email = "[email protected]" }
7+
]
8+
readme = "README.md"
9+
requires-python = ">=3.11"
10+
dependencies = [
11+
]
12+
13+
14+
[build-system]
15+
requires = ["poetry-core>=2.0.0,<3.0.0"]
16+
build-backend = "poetry.core.masonry.api"
17+
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import asyncio
2+
import collections
3+
import logging
4+
import signal
5+
from collections.abc import AsyncIterable
6+
7+
from pynumaflow_lite import sinker
8+
from pynumaflow_lite._sink_dtypes import Sinker
9+
10+
# Configure logging
11+
logging.basicConfig(level=logging.INFO)
12+
_LOGGER = logging.getLogger(__name__)
13+
14+
15+
class SimpleLogSink(Sinker):
16+
"""
17+
Simple log sink that logs each message and returns success responses.
18+
"""
19+
20+
async def handler(self, datums: AsyncIterable[sinker.Datum]) -> sinker.Responses:
21+
responses = sinker.Responses()
22+
async for msg in datums:
23+
_LOGGER.info("User Defined Sink: %s", msg.value.decode("utf-8"))
24+
responses.append(sinker.Response.as_success(msg.id))
25+
# if we are not able to write to sink and if we have a fallback sink configured
26+
# we can use Response.as_fallback(msg.id) to write the message to fallback sink
27+
return responses
28+
29+
30+
# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
31+
signal.signal(signal.SIGINT, signal.default_int_handler)
32+
try:
33+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
34+
except AttributeError:
35+
pass
36+
37+
38+
async def start(f: collections.abc.Callable):
39+
server = sinker.SinkAsyncServer()
40+
41+
# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
42+
loop = asyncio.get_running_loop()
43+
try:
44+
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
45+
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
46+
except (NotImplementedError, RuntimeError):
47+
pass
48+
49+
try:
50+
await server.start(f)
51+
print("Shutting down gracefully...")
52+
except asyncio.CancelledError:
53+
try:
54+
server.stop()
55+
except Exception:
56+
pass
57+
return
58+
59+
60+
if __name__ == "__main__":
61+
async_handler = SimpleLogSink()
62+
asyncio.run(start(async_handler))
63+

packages/pynumaflow-lite/pynumaflow_lite/__init__.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ from . import mapstreamer as mapstreamer
88
from . import reducer as reducer
99
from . import session_reducer as session_reducer
1010
from . import accumulator as accumulator
11+
from . import sinker as sinker
1112

12-
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator']
13+
__all__ = ['mapper', 'batchmapper', 'mapstreamer', 'reducer', 'session_reducer', 'accumulator', 'sinker']
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from abc import ABCMeta, abstractmethod
2+
from pynumaflow_lite.sinker import Datum, Responses
3+
from collections.abc import AsyncIterable
4+
5+
6+
class Sinker(metaclass=ABCMeta):
7+
"""
8+
Provides an interface to write a Sink servicer.
9+
"""
10+
11+
def __call__(self, *args, **kwargs):
12+
return self.handler(*args, **kwargs)
13+
14+
@abstractmethod
15+
async def handler(self, datums: AsyncIterable[Datum]) -> Responses:
16+
"""
17+
Implement this handler function for sink.
18+
Process the stream of datums and return responses.
19+
"""
20+
pass
21+
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from __future__ import annotations
2+
3+
from typing import Optional, List, Dict, Callable, Awaitable, Any, AsyncIterator
4+
import datetime as _dt
5+
6+
7+
class KeyValueGroup:
8+
key_value: Dict[str, bytes]
9+
10+
def __init__(self, key_value: Optional[Dict[str, bytes]] = ...) -> None: ...
11+
12+
@staticmethod
13+
def from_dict(key_value: Dict[str, bytes]) -> KeyValueGroup: ...
14+
15+
16+
class Message:
17+
keys: Optional[List[str]]
18+
value: bytes
19+
user_metadata: Optional[Dict[str, KeyValueGroup]]
20+
21+
def __init__(
22+
self,
23+
value: bytes,
24+
keys: Optional[List[str]] = ...,
25+
user_metadata: Optional[Dict[str, KeyValueGroup]] = ...,
26+
) -> None: ...
27+
28+
29+
class Response:
30+
id: str
31+
32+
@staticmethod
33+
def as_success(id: str) -> Response: ...
34+
35+
@staticmethod
36+
def as_failure(id: str, err_msg: str) -> Response: ...
37+
38+
@staticmethod
39+
def as_fallback(id: str) -> Response: ...
40+
41+
@staticmethod
42+
def as_serve(id: str, payload: bytes) -> Response: ...
43+
44+
@staticmethod
45+
def as_on_success(id: str, message: Optional[Message] = ...) -> Response: ...
46+
47+
48+
class Responses:
49+
def __init__(self) -> None: ...
50+
51+
def append(self, response: Response) -> None: ...
52+
53+
54+
class Datum:
55+
keys: List[str]
56+
value: bytes
57+
watermark: _dt.datetime
58+
eventtime: _dt.datetime
59+
id: str
60+
headers: Dict[str, str]
61+
62+
def __repr__(self) -> str: ...
63+
64+
def __str__(self) -> str: ...
65+
66+
67+
class SinkAsyncServer:
68+
def __init__(
69+
self,
70+
sock_file: str | None = ...,
71+
info_file: str | None = ...,
72+
) -> None: ...
73+
74+
def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ...
75+
76+
def stop(self) -> None: ...
77+
78+
79+
class Sinker:
80+
async def handler(self, datums: AsyncIterator[Datum]) -> Responses: ...
81+
82+
83+
__all__ = [
84+
"KeyValueGroup",
85+
"Message",
86+
"Response",
87+
"Responses",
88+
"Datum",
89+
"SinkAsyncServer",
90+
"Sinker",
91+
]
92+

packages/pynumaflow-lite/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod pyiterables;
66
pub mod pyrs;
77
pub mod reduce;
88
pub mod session_reduce;
9+
pub mod sink;
910

1011
use pyo3::prelude::*;
1112

@@ -51,6 +52,13 @@ fn accumulator(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
5152
Ok(())
5253
}
5354

55+
/// Submodule: pynumaflow_lite.sinker
56+
#[pymodule]
57+
fn sinker(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
58+
crate::sink::populate_py_module(m)?;
59+
Ok(())
60+
}
61+
5462
/// Top-level Python module `pynumaflow_lite` with submodules like `mapper`, `batchmapper`, and `mapstreamer`.
5563
#[pymodule]
5664
fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
@@ -61,6 +69,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
6169
m.add_wrapped(pyo3::wrap_pymodule!(reducer))?;
6270
m.add_wrapped(pyo3::wrap_pymodule!(session_reducer))?;
6371
m.add_wrapped(pyo3::wrap_pymodule!(accumulator))?;
72+
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
6473

6574
// Ensure it's importable as `pynumaflow_lite.mapper` as well as attribute access
6675
let binding = m.getattr("mapper")?;
@@ -116,5 +125,14 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
116125
.getattr("modules")?
117126
.set_item(fullname, &sub)?;
118127

128+
// Ensure it's importable as `pynumaflow_lite.sinker` as well
129+
let binding = m.getattr("sinker")?;
130+
let sub = binding.downcast::<PyModule>()?;
131+
let fullname = "pynumaflow_lite.sinker";
132+
sub.setattr("__name__", fullname)?;
133+
py.import("sys")?
134+
.getattr("modules")?
135+
.set_item(fullname, &sub)?;
136+
119137
Ok(())
120138
}

0 commit comments

Comments
 (0)