Skip to content

Commit 71c2b52

Browse files
authored
feat: Add User Defined Source support (#114)
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent 7b90d4d commit 71c2b52

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2000
-426
lines changed

.codecov.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,4 @@ ignore:
1717
- "pynumaflow/reducer/proto/*"
1818
- "pynumaflow/sourcetransformer/proto/*"
1919
- "pynumaflow/sideinput/proto/*"
20-
- "pynumaflow/map/_udfunction_pb2.pyi"
21-
- "pynumaflow/sink/_udsink_pb2.pyi"
20+
- "pynumaflow/sourcer/proto/*"

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ proto:
3333
python3 -m grpc_tools.protoc -I=pynumaflow/reducer/proto --python_out=pynumaflow/reducer/proto --grpc_python_out=pynumaflow/reducer/proto pynumaflow/reducer/proto/*.proto
3434
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransformer/proto --python_out=pynumaflow/sourcetransformer/proto --grpc_python_out=pynumaflow/sourcetransformer/proto pynumaflow/sourcetransformer/proto/*.proto
3535
python3 -m grpc_tools.protoc -I=pynumaflow/sideinput/proto --python_out=pynumaflow/sideinput/proto --grpc_python_out=pynumaflow/sideinput/proto pynumaflow/sideinput/proto/*.proto
36+
python3 -m grpc_tools.protoc -I=pynumaflow/sourcer/proto --python_out=pynumaflow/sourcer/proto --grpc_python_out=pynumaflow/sourcer/proto pynumaflow/sourcer/proto/*.proto
3637

3738

3839
sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup" \
18+
VENV_PATH="/opt/pysetup/.venv"
19+
20+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
21+
22+
RUN apt-get update \
23+
&& apt-get install --no-install-recommends -y \
24+
curl \
25+
wget \
26+
# deps for building python deps
27+
build-essential \
28+
&& apt-get install -y git \
29+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
30+
\
31+
# install dumb-init
32+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
33+
&& chmod +x /dumb-init \
34+
&& curl -sSL https://install.python-poetry.org | python3 -
35+
36+
####################################################################################################
37+
# udf: used for running the udf vertices
38+
####################################################################################################
39+
FROM builder AS udf
40+
41+
WORKDIR $PYSETUP_PATH
42+
COPY pyproject.toml ./
43+
RUN poetry install --no-cache --no-root && \
44+
rm -rf ~/.cache/pypoetry/
45+
46+
ADD . /app
47+
WORKDIR /app
48+
49+
RUN chmod +x entry.sh
50+
51+
ENTRYPOINT ["/dumb-init", "--"]
52+
CMD ["/app/entry.sh"]
53+
54+
EXPOSE 5000
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.PHONY: image
2+
image:
3+
docker build -t "quay.io/numaio/numaflow-python/simple-source:v0.5.3" .
4+
# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work
5+
# under the CI E2E test environment.
6+
# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command
7+
# docker buildx build -t "quay.io/numaio/numaflow-python/simple-source:v0.5.3" --platform linux/amd64,linux/arm64 . --push
8+
# If command failed, refer to https://billglover.me/notes/build-multi-arch-docker-images/ to fix
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Example Python User Defined Source
2+
A simple example of a user-defined source. The source maintains an array of messages and implements the Read,
3+
Ack, and Pending methods.
4+
The Read(x) method returns the next x number of messages in the array.
5+
The Ack() method acknowledges the last batch of messages returned by Read().
6+
The Pending() method returns 0 to indicate that the simple source always has 0 pending messages.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from collections.abc import Iterable
2+
from datetime import datetime
3+
4+
from pynumaflow.sourcer import (
5+
ReadRequest,
6+
Message,
7+
Sourcer,
8+
AckRequest,
9+
PendingResponse,
10+
Offset,
11+
)
12+
13+
14+
class SimpleSource:
15+
"""
16+
SimpleSource is a class for User Defined Source implementation.
17+
"""
18+
19+
def __init__(self):
20+
"""
21+
to_ack_set: Set to maintain a track of the offsets yet to be acknowledged
22+
read_idx : the offset idx till where the messages have been read
23+
"""
24+
self.to_ack_set = set()
25+
self.read_idx = 0
26+
27+
def read_handler(self, datum: ReadRequest) -> Iterable[Message]:
28+
"""
29+
read_handler is used to read the data from the source and send the data forward
30+
for each read request we process num_records and increment the read_idx to indicate that
31+
the message has been read and the same is added to the ack set
32+
"""
33+
if self.to_ack_set:
34+
return
35+
36+
for x in range(datum.num_records):
37+
yield Message(
38+
payload=str(self.read_idx).encode(),
39+
offset=Offset(offset=str(self.read_idx).encode(), partition_id="0"),
40+
event_time=datetime.now(),
41+
)
42+
self.to_ack_set.add(str(self.read_idx))
43+
self.read_idx += 1
44+
45+
def ack_handler(self, ack_request: AckRequest):
46+
"""
47+
The ack handler is used acknowledge the offsets that have been read, and remove them
48+
from the to_ack_set
49+
"""
50+
for offset in ack_request.offset:
51+
self.to_ack_set.remove(str(offset.offset, "utf-8"))
52+
53+
def pending_handler(self) -> PendingResponse:
54+
"""
55+
The simple source always returns zero to indicate there is no pending record.
56+
"""
57+
return PendingResponse(count=0)
58+
59+
60+
if __name__ == "__main__":
61+
ud_source = SimpleSource()
62+
grpc_server = Sourcer(
63+
read_handler=ud_source.read_handler,
64+
ack_handler=ud_source.ack_handler,
65+
pending_handler=ud_source.pending_handler,
66+
)
67+
grpc_server.start()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: simple-source
5+
spec:
6+
vertices:
7+
- name: in
8+
source:
9+
udsource:
10+
container:
11+
# A simple user-defined source for e2e testing
12+
image: quay.io/numaio/numaflow-python/simple-source:v0.5.3
13+
imagePullPolicy: Always
14+
limits:
15+
readBatchSize: 2
16+
- name: out
17+
sink:
18+
log: {}
19+
edges:
20+
- from: in
21+
to: out
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[tool.poetry]
2+
name = "simple-source"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = "~0.5.3"
10+
11+
12+
[tool.poetry.dev-dependencies]
13+
14+
[build-system]
15+
requires = ["poetry-core>=1.0.0"]
16+
build-backend = "poetry.core.masonry.api"

pynumaflow/_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
MULTIPROC_MAP_SOCK_PORT = 55551
99
MULTIPROC_MAP_SOCK_ADDR = "0.0.0.0"
1010
SIDE_INPUT_SOCK_PATH = "/var/run/numaflow/sideinput.sock"
11+
SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock"
1112

1213
# TODO: need to make sure the DATUM_KEY value is the same as
1314
# https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6

0 commit comments

Comments
 (0)