Skip to content

Commit a650aac

Browse files
authored
fix: time unit bug/dockerfile/add a guide (#44)
Signed-off-by: jyu6 <[email protected]>
1 parent d74d063 commit a650aac

File tree

12 files changed

+175
-8
lines changed

12 files changed

+175
-8
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ clean:
77
@find . -type f -name "*.py[co]" -exec rm -rf {} +
88

99
format: clean
10-
poetry run black pynumaflow/
10+
poetry run black .
1111

1212

1313
lint: format
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup" \
18+
VENV_PATH="/opt/pysetup/.venv"
19+
20+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
21+
22+
RUN apt-get update \
23+
&& apt-get install --no-install-recommends -y \
24+
curl \
25+
wget \
26+
# deps for building python deps
27+
build-essential \
28+
&& apt-get install -y git \
29+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
30+
\
31+
# install dumb-init
32+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
33+
&& chmod +x /dumb-init \
34+
&& curl -sSL https://install.python-poetry.org | python3 -
35+
36+
####################################################################################################
37+
# udf: used for running the udf vertices
38+
####################################################################################################
39+
FROM builder AS udf
40+
41+
WORKDIR $PYSETUP_PATH
42+
COPY ./pyproject.toml ./poetry.lock ./
43+
RUN poetry install --no-cache --no-root && \
44+
rm -rf ~/.cache/pypoetry/
45+
46+
ADD . /app
47+
WORKDIR /app
48+
49+
RUN chmod +x entry.sh
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["/app/entry.sh"]
53+
54+
EXPOSE 5000

examples/developer_guide/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.PHONY: image
2+
image:
3+
docker build -t "test:latest" .

examples/developer_guide/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Developer Guide
2+
3+
This example is for numaflow-python contributors/developers. The example includes how to use your branch to build the UDF image to test your code change before submitting a PR.
4+
5+
1. Install poetry before starting your test. Make sure you have the correct python version.
6+
2. Push your code change to your branch.
7+
3. Update the `pynumaflow` in `pyproject.tomal` file with your (forked) repo url and your branch name. For example, `pynumaflow = {git = "https://github.com/chromevoid/numaflow-python", rev = "test-branch"}`
8+
4. Run `poetry update -vv` from this `developer_guide` folder. You should get a `poetry.lock` file.
9+
5. Update your `example.py` and the `image name` in `Makefile` as needed.
10+
6. Run `make image` to build your image.
11+
7. Now you have the image with your customized example and your code change to test in the numaflow pipeline. Example pipeline `pipeline-numaflow.yaml` is also provided in this `developer_guide` folder. Please check [numaflow](https://numaflow.numaproj.io/) for more details.

examples/developer_guide/entry.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Iterator
2+
from pynumaflow.function import (
3+
Messages,
4+
Message,
5+
Datum,
6+
Metadata,
7+
UserDefinedFunctionServicer,
8+
)
9+
10+
11+
def map_handler(key: str, datum: Datum) -> Messages:
12+
# forward a message
13+
val = datum.value
14+
_ = datum.event_time
15+
_ = datum.watermark
16+
messages = Messages()
17+
messages.append(Message.to_vtx(key, val))
18+
return messages
19+
20+
21+
def reduce_handler(key: str, datums: Iterator[Datum], md: Metadata) -> Messages:
22+
# count the number of events
23+
interval_window = md.interval_window
24+
counter = 0
25+
for _ in datums:
26+
counter += 1
27+
msg = (
28+
f"counter:{counter} interval_window_start:{interval_window.start} "
29+
f"interval_window_end:{interval_window.end}"
30+
)
31+
return Messages(Message.to_vtx(key, str.encode(msg)))
32+
33+
34+
if __name__ == "__main__":
35+
grpc_server = UserDefinedFunctionServicer(
36+
map_handler=map_handler, reduce_handler=reduce_handler
37+
)
38+
grpc_server.start()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: pynumasdk
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
# A self data generating source
10+
generator:
11+
rpu: 10
12+
duration: 1s
13+
- name: cat
14+
udf:
15+
container:
16+
# compute the sum
17+
image: quay.io/numaio/numaflow-python/map-forward-message:latest
18+
- name: counter
19+
udf:
20+
container:
21+
# compute the sum
22+
image: quay.io/numaio/numaflow-python/reduce-counter:latest
23+
groupBy:
24+
window:
25+
fixed:
26+
length: 10s
27+
keyed: true
28+
- name: sink
29+
scale:
30+
min: 1
31+
sink:
32+
udsink:
33+
container:
34+
image: quay.io/numaio/numaflow-python/sink-log:latest
35+
edges:
36+
- from: in
37+
to: cat
38+
- from: cat
39+
to: counter
40+
parallelism: 1
41+
- from: counter
42+
to: sink
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "reduce-counter"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = {git = "https://github.com/<YOUR_ID>/numaflow-python", rev = "<YOUR_BRANCH>"}
10+
11+
[tool.poetry.dev-dependencies]
12+
13+
[build-system]
14+
requires = ["poetry-core>=1.0.0"]
15+
build-backend = "poetry.core.masonry.api"

examples/function/flatmap/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ RUN apt-get update \
3939
FROM builder AS udf
4040

4141
WORKDIR $PYSETUP_PATH
42-
COPY ./pyproject.toml ./poetry.lock ./
43-
RUN poetry install --without mlflowserver --no-cache --no-root && \
42+
COPY ./pyproject.toml ./
43+
RUN poetry install --no-cache --no-root && \
4444
rm -rf ~/.cache/pypoetry/
4545

4646
ADD . /app

pynumaflow/function/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ def ReduceFn(
152152
f"got key: {key}, start: {start}, end: {end}."
153153
)
154154

155-
start_dt = datetime.fromtimestamp(int(start), timezone.utc)
156-
end_dt = datetime.fromtimestamp(int(end), timezone.utc)
155+
start_dt = datetime.fromtimestamp(int(start) / 1e3, timezone.utc)
156+
end_dt = datetime.fromtimestamp(int(end) / 1e3, timezone.utc)
157157
interval_window = IntervalWindow(start=start_dt, end=end_dt)
158158

159159
try:

0 commit comments

Comments
 (0)