Skip to content

Commit d33ddd0

Browse files
authored
feat: udf reduce (#40)
Signed-off-by: jyu6 <[email protected]>
1 parent 801bc36 commit d33ddd0

File tree

21 files changed

+426
-93
lines changed

21 files changed

+426
-93
lines changed

README.md

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,53 @@ and [UDSinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/)
55

66
## Implement a User Defined Function (UDF)
77

8-
```python
98

9+
### Map
10+
11+
```python
1012
from pynumaflow.function import Messages, Message, Datum, UserDefinedFunctionServicer
1113

1214

13-
def function_handler(key: str, datum: Datum) -> Messages:
14-
"""
15-
Simple UDF that relays an incoming message.
16-
"""
15+
def my_handler(key: str, datum: Datum) -> Messages:
1716
val = datum.value
1817
_ = datum.event_time
1918
_ = datum.watermark
20-
messages = Messages(Message(key=key, value=val))
19+
messages = Messages(Message.to_vtx(key, val))
2120
return messages
2221

2322

2423
if __name__ == "__main__":
25-
grpc_server = UserDefinedFunctionServicer(function_handler)
24+
grpc_server = UserDefinedFunctionServicer(map_handler=my_handler)
2625
grpc_server.start()
2726
```
2827

29-
### Sample Image (TODO)
28+
### Reduce
29+
30+
```python
31+
from typing import Iterator
32+
from pynumaflow.function import Messages, Message, Datum, Metadata, UserDefinedFunctionServicer
33+
34+
35+
def my_handler(key: str, datums: Iterator[Datum], md: Metadata) -> Messages:
36+
interval_window = md.interval_window
37+
counter = 0
38+
for _ in datums:
39+
counter += 1
40+
msg = (
41+
f"counter:{counter} interval_window_start:{interval_window.start} "
42+
f"interval_window_end:{interval_window.end}"
43+
)
44+
return Messages(Message.to_vtx(key, str.encode(msg)))
45+
46+
47+
if __name__ == "__main__":
48+
grpc_server = UserDefinedFunctionServicer(reduce_handler=my_handler)
49+
grpc_server.start()
50+
```
51+
52+
### Sample Image
53+
A sample UDF [Dockerfile](examples/function/forward_message/Dockerfile) is provided
54+
under [examples](examples/function/forward_message).
3055

3156
## Implement a User Defined Sink (UDSink)
3257

@@ -35,16 +60,16 @@ from typing import Iterator
3560
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer
3661

3762

38-
def udsink_handler(datums: Iterator[Datum]) -> Responses:
63+
def my_handler(datums: Iterator[Datum]) -> Responses:
3964
responses = Responses()
4065
for msg in datums:
41-
print("User Defined Sink", msg)
66+
print("User Defined Sink", msg.value.decode("utf-8"))
4267
responses.append(Response.as_success(msg.id))
4368
return responses
4469

4570

4671
if __name__ == "__main__":
47-
grpc_server = UserDefinedSinkServicer(udsink_handler)
72+
grpc_server = UserDefinedSinkServicer(my_handler)
4873
grpc_server.start()
4974
```
5075

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup" \
18+
VENV_PATH="/opt/pysetup/.venv"
19+
20+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
21+
22+
RUN apt-get update \
23+
&& apt-get install --no-install-recommends -y \
24+
curl \
25+
wget \
26+
# deps for building python deps
27+
build-essential \
28+
&& apt-get install -y git \
29+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
30+
\
31+
# install dumb-init
32+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
33+
&& chmod +x /dumb-init \
34+
&& curl -sSL https://install.python-poetry.org | python3 -
35+
36+
####################################################################################################
37+
# udf: used for running the udf vertices
38+
####################################################################################################
39+
FROM builder AS udf
40+
41+
WORKDIR $PYSETUP_PATH
42+
COPY ./pyproject.toml ./
43+
RUN poetry install --no-cache --no-root && \
44+
rm -rf ~/.cache/pypoetry/
45+
46+
ADD . /app
47+
WORKDIR /app
48+
49+
RUN chmod +x entry.sh
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["/app/entry.sh"]
53+
54+
EXPOSE 5000

examples/function/counter/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.PHONY: image
2+
image:
3+
docker build -t "quay.io/numaio/numaflow-python/reduce-counter:latest" .
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Counter Example
2+
3+
## Build image and push
4+
5+
```shell
6+
make image
7+
# Privilege needed.
8+
docker push quay.io/numaio/numaflow-python/reduce-counter:latest
9+
```
10+

examples/function/counter/entry.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from typing import Iterator
2+
from pynumaflow.function import Messages, Message, Datum, Metadata, UserDefinedFunctionServicer
3+
4+
5+
def reduce_handler(key: str, datums: Iterator[Datum], md: Metadata) -> Messages:
6+
interval_window = md.interval_window
7+
counter = 0
8+
for _ in datums:
9+
counter += 1
10+
msg = (
11+
f"counter:{counter} interval_window_start:{interval_window.start} "
12+
f"interval_window_end:{interval_window.end}"
13+
)
14+
return Messages(Message.to_vtx(key, str.encode(msg)))
15+
16+
17+
if __name__ == "__main__":
18+
grpc_server = UserDefinedFunctionServicer(reduce_handler=reduce_handler)
19+
grpc_server.start()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "reduce-counter"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = "~0.3.0"
10+
11+
[tool.poetry.dev-dependencies]
12+
13+
[build-system]
14+
requires = ["poetry-core>=1.0.0"]
15+
build-backend = "poetry.core.masonry.api"
Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
FROM python:3.9.12-slim
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
26

37
ENV PYTHONFAULTHANDLER=1 \
48
PYTHONUNBUFFERED=1 \
59
PYTHONHASHSEED=random \
6-
PIP_NO_CACHE_DIR=off \
10+
PIP_NO_CACHE_DIR=on \
711
PIP_DISABLE_PIP_VERSION_CHECK=on \
812
PIP_DEFAULT_TIMEOUT=100 \
9-
POETRY_VERSION=1.2.0 \
13+
POETRY_VERSION=1.2.2 \
1014
POETRY_HOME="/opt/poetry" \
1115
POETRY_VIRTUALENVS_IN_PROJECT=true \
1216
POETRY_NO_INTERACTION=1 \
@@ -20,24 +24,31 @@ RUN apt-get update \
2024
curl \
2125
wget \
2226
# deps for building python deps
23-
build-essential
24-
25-
# install poetry - respects $POETRY_VERSION & $POETRY_HOME
26-
RUN curl -sSL https://install.python-poetry.org | python3 -
27+
build-essential \
28+
&& apt-get install -y git \
29+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
30+
\
31+
# install dumb-init
32+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
33+
&& chmod +x /dumb-init \
34+
&& curl -sSL https://install.python-poetry.org | python3 -
35+
36+
####################################################################################################
37+
# udf: used for running the udf vertices
38+
####################################################################################################
39+
FROM builder AS udf
2740

2841
WORKDIR $PYSETUP_PATH
29-
ADD . $PYSETUP_PATH
30-
WORKDIR $PYSETUP_PATH
31-
RUN poetry install --without dev
42+
COPY ./pyproject.toml ./poetry.lock ./
43+
RUN poetry install --without mlflowserver --no-cache --no-root && \
44+
rm -rf ~/.cache/pypoetry/
3245

3346
ADD . /app
34-
35-
# install dumb-init
36-
RUN wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64
37-
RUN chmod +x /dumb-init
38-
3947
WORKDIR /app
48+
4049
RUN chmod +x entry.sh
4150

4251
ENTRYPOINT ["/dumb-init", "--"]
4352
CMD ["/app/entry.sh"]
53+
54+
EXPOSE 5000

examples/function/flatmap/example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ def my_handler(key: str, datum: Datum) -> Messages:
1313

1414

1515
if __name__ == "__main__":
16-
grpc_server = UserDefinedFunctionServicer(my_handler)
16+
grpc_server = UserDefinedFunctionServicer(map_handler=my_handler)
1717
grpc_server.start()

examples/function/flatmap/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[tool.poetry]
2-
name = "flatmap-example-function"
2+
name = "map-flatmap"
33
version = "0.2.4"
44
description = ""
55
authors = ["Numaflow developers"]
66

77
[tool.poetry.dependencies]
8-
python = "~3.9"
8+
python = "~3.10"
99
pynumaflow = "~0.3.0"
1010

1111
[tool.poetry.dev-dependencies]

0 commit comments

Comments
 (0)