Skip to content

Commit c5a4695

Browse files
kohlisidSidhant Kohlivigith
authored
feat: add side input grpc service (#107)
Signed-off-by: Sidhant Kohli <[email protected]> Signed-off-by: Sidhant Kohli <[email protected]> Signed-off-by: Vigith Maurice <[email protected]> Co-authored-by: Sidhant Kohli <[email protected]> Co-authored-by: Vigith Maurice <[email protected]>
1 parent 0010845 commit c5a4695

File tree

20 files changed

+759
-6
lines changed

20 files changed

+759
-6
lines changed

.codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ ignore:
1616
- "pynumaflow/mapstreamer/proto/*"
1717
- "pynumaflow/reducer/proto/*"
1818
- "pynumaflow/sourcetransformer/proto/*"
19+
- "pynumaflow/sideinput/proto/*"
1920
- "pynumaflow/map/_udfunction_pb2.pyi"
2021
- "pynumaflow/sink/_udsink_pb2.pyi"

Makefile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ setup:
2727

2828

2929
proto:
30-
python3 -m grpc_tools.protoc -I=pynumaflow/function/proto --python_out=pynumaflow/function/proto --grpc_python_out=pynumaflow/function/proto pynumaflow/function/proto/*.proto
31-
python3 -m grpc_tools.protoc -I=pynumaflow/sink/proto --python_out=pynumaflow/sink/proto --grpc_python_out=pynumaflow/sink/proto pynumaflow/sink/proto/*.proto
32-
python3 -m grpc_tools.protoc -I=pynumaflow/map/proto --python_out=pynumaflow/map/proto --grpc_python_out=pynumaflow/map/proto pynumaflow/map/proto/*.proto
33-
python3 -m grpc_tools.protoc -I=pynumaflow/mapstream/proto --python_out=pynumaflow/mapstream/proto --grpc_python_out=pynumaflow/mapstream/proto pynumaflow/mapstream/proto/*.proto
34-
python3 -m grpc_tools.protoc -I=pynumaflow/reduce/proto --python_out=pynumaflow/reduce/proto --grpc_python_out=pynumaflow/reduce/proto pynumaflow/reduce/proto/*.proto
35-
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransform/proto --python_out=pynumaflow/sourcetransform/proto --grpc_python_out=pynumaflow/sourcetransform/proto pynumaflow/sourcetransform/proto/*.proto
30+
python3 -m grpc_tools.protoc -I=pynumaflow/sinker/proto --python_out=pynumaflow/sinker/proto --grpc_python_out=pynumaflow/sinker/proto pynumaflow/sinker/proto/*.proto
31+
python3 -m grpc_tools.protoc -I=pynumaflow/mapper/proto --python_out=pynumaflow/mapper/proto --grpc_python_out=pynumaflow/mapper/proto pynumaflow/mapper/proto/*.proto
32+
python3 -m grpc_tools.protoc -I=pynumaflow/mapstreamer/proto --python_out=pynumaflow/mapstreamer/proto --grpc_python_out=pynumaflow/mapstreamer/proto pynumaflow/mapstreamer/proto/*.proto
33+
python3 -m grpc_tools.protoc -I=pynumaflow/reducer/proto --python_out=pynumaflow/reducer/proto --grpc_python_out=pynumaflow/reducer/proto pynumaflow/reducer/proto/*.proto
34+
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransformer/proto --python_out=pynumaflow/sourcetransformer/proto --grpc_python_out=pynumaflow/sourcetransformer/proto pynumaflow/sourcetransformer/proto/*.proto
35+
python3 -m grpc_tools.protoc -I=pynumaflow/sideinput/proto --python_out=pynumaflow/sideinput/proto --grpc_python_out=pynumaflow/sideinput/proto pynumaflow/sideinput/proto/*.proto
36+
3637

3738
sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup" \
18+
VENV_PATH="/opt/pysetup/.venv"
19+
20+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
21+
22+
RUN apt-get update \
23+
&& apt-get install --no-install-recommends -y \
24+
curl \
25+
wget \
26+
# deps for building python deps
27+
build-essential \
28+
&& apt-get install -y git \
29+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
30+
\
31+
# install dumb-init
32+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
33+
&& chmod +x /dumb-init \
34+
&& curl -sSL https://install.python-poetry.org | python3 -
35+
36+
####################################################################################################
37+
# udf: used for running the udf vertices
38+
####################################################################################################
39+
FROM builder AS udf
40+
41+
WORKDIR $PYSETUP_PATH
42+
COPY pyproject.toml ./
43+
RUN poetry install --no-cache --no-root && \
44+
rm -rf ~/.cache/pypoetry/
45+
46+
ADD . /app
47+
WORKDIR /app
48+
49+
RUN chmod +x entry.sh
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["/app/entry.sh"]
53+
54+
EXPOSE 5000
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.PHONY: image
2+
image:
3+
docker build -t "quay.io/numaio/numaflow-python/sideinput-example:v0.5.0" .
4+
# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work
5+
# under the CI E2E test environment.
6+
# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command
7+
# docker buildx build -t "quay.io/numaio/numaflow-python/sideinput-example:v0.5.0" --platform linux/amd64,linux/arm64 . --push
8+
# If command failed, refer to https://billglover.me/notes/build-multi-arch-docker-images/ to fix
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# SideInput Example
2+
3+
An example that demonstrates how to write a `sideinput` function in python
4+
5+
### SideInput
6+
```python
7+
def my_handler() -> Response:
8+
time_now = datetime.datetime.now()
9+
# val is the value to be broadcasted
10+
val = "an example:" + str(time_now)
11+
global counter
12+
counter += 1
13+
# broadcast every other time
14+
if counter % 2 == 0:
15+
# no_broadcast_message() is used to indicate that there is no broadcast
16+
return Response.no_broadcast_message()
17+
# broadcast_message() is used to indicate that there is a broadcast
18+
return Response.broadcast_message(val.encode("utf-8"))
19+
```
20+
After performing the retrieval/update for the side input value the user can choose to either broadcast the
21+
message to other side input vertices or drop the message. The side input message is not retried.
22+
23+
For each side input there will be a file with the given path and after any update to the side input value the file will
24+
be updated.
25+
26+
The directory is fixed and can be accessed through sideinput constants `SideInput.SIDE_INPUT_DIR_PATH`.
27+
The file name is the name of the side input.
28+
```python
29+
SideInput.SIDE_INPUT_DIR_PATH -> "/var/numaflow/side-inputs"
30+
sideInputFileName -> "/var/numaflow/side-inputs/sideInputName"
31+
```
32+
33+
### User Defined Function
34+
35+
The UDF vertex will watch for changes to this file and whenever there is a change it will read the file to obtain the new side input value.
36+
37+
38+
### Pipeline spec
39+
40+
In the spec we need to define the side input vertex and the UDF vertex. The UDF vertex will have the side input vertex as a side input.
41+
42+
Side input spec:
43+
```yaml
44+
spec:
45+
sideInputs:
46+
- name: vertex
47+
udf:
48+
container:
49+
....
50+
sideInputs:
51+
- myticker
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import datetime
2+
from pynumaflow.sideinput import Response, SideInput
3+
4+
counter = 0
5+
6+
7+
def my_handler() -> Response:
8+
"""
9+
This function is called every time the side input is requested.
10+
"""
11+
time_now = datetime.datetime.now()
12+
# val is the value to be broadcasted
13+
val = "an example:" + str(time_now)
14+
global counter
15+
counter += 1
16+
# broadcast every other time
17+
if counter % 2 == 0:
18+
# no_broadcast_message() is used to indicate that there is no broadcast
19+
return Response.no_broadcast_message()
20+
# broadcast_message() is used to indicate that there is a broadcast
21+
return Response.broadcast_message(val.encode("utf-8"))
22+
23+
24+
if __name__ == "__main__":
25+
grpc_server = SideInput(handler=my_handler)
26+
grpc_server.start()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: my-pipeline
5+
spec:
6+
sideInputs:
7+
- name: myticker
8+
container:
9+
image: "quay.io/numaio/numaflow-python/sideinput-example:v0.5.0"
10+
imagePullPolicy: Always
11+
trigger:
12+
schedule: "*/2 * * * *"
13+
# timezone: America/Los_Angeles
14+
vertices:
15+
- name: in
16+
source:
17+
# A self data generating source
18+
generator:
19+
rpu: 1
20+
duration: 1s
21+
- name: si-log
22+
udf:
23+
container:
24+
image: "quay.io/numaio/numaflow-go/udf-sideinput-example:v0.5.0"
25+
imagePullPolicy: Always
26+
containerTemplate:
27+
env:
28+
- name: NUMAFLOW_DEBUG
29+
value: "true" # DO NOT forget the double quotes!!!
30+
sideInputs:
31+
- myticker
32+
- name: out
33+
sink:
34+
# A simple log printing sink
35+
log: {}
36+
edges:
37+
- from: in
38+
to: si-log
39+
- from: si-log
40+
to: out
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "map-forward-message"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = "~0.5.0"
10+
11+
[tool.poetry.dev-dependencies]
12+
13+
[build-system]
14+
requires = ["poetry-core>=1.0.0"]
15+
build-backend = "poetry.core.masonry.api"

pynumaflow/_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
SINK_SOCK_PATH = "/var/run/numaflow/sink.sock"
66
MULTIPROC_MAP_SOCK_PORT = 55551
77
MULTIPROC_MAP_SOCK_ADDR = "0.0.0.0"
8+
SIDE_INPUT_SOCK_PATH = "/var/run/numaflow/sideinput.sock"
89

910
# TODO: need to make sure the DATUM_KEY value is the same as
1011
# https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6

0 commit comments

Comments
 (0)