Skip to content

Commit 4982414

Browse files
feat: add Accumulator (#237)
Signed-off-by: Sidhant Kohli <[email protected]> Signed-off-by: srao12 <[email protected]> Co-authored-by: Sidhant Kohli <[email protected]>
1 parent 7faf39b commit 4982414

34 files changed

+2988
-21
lines changed

.github/workflows/build-push.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
"examples/reducestream/counter", "examples/reducestream/sum", "examples/sideinput/simple_sideinput",
2424
"examples/sideinput/simple_sideinput/udf", "examples/sink/async_log", "examples/sink/log",
2525
"examples/source/simple_source", "examples/sourcetransform/event_time_filter",
26-
"examples/batchmap/flatmap"
26+
"examples/batchmap/flatmap", "examples/accumulator/streamsorter"
2727
]
2828

2929
steps:

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ proto:
3232
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcetransformer -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
3333
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sideinput -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
3434
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcer -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto
35+
poetry run python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/accumulator -I=pynumaflow/proto/accumulator --python_out=pynumaflow/proto/accumulator --grpc_python_out=pynumaflow/proto/accumulator pynumaflow/proto/accumulator/*.proto
3536

3637

3738
sed -i.bak -e 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
####################################################################################################
2+
# Stage 1: Base Builder - installs core dependencies using poetry
3+
####################################################################################################
4+
FROM python:3.10-slim-bullseye AS base-builder
5+
6+
ENV PYSETUP_PATH="/opt/pysetup"
7+
WORKDIR $PYSETUP_PATH
8+
9+
# Copy only core dependency files first for better caching
10+
COPY pyproject.toml poetry.lock README.md ./
11+
COPY pynumaflow/ ./pynumaflow/
12+
RUN apt-get update && apt-get install --no-install-recommends -y \
13+
curl wget build-essential git \
14+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
15+
&& pip install poetry \
16+
&& poetry install --no-root --no-interaction
17+
18+
####################################################################################################
19+
# Stage 2: UDF Builder - adds UDF code and installs UDF-specific deps
20+
####################################################################################################
21+
FROM base-builder AS udf-builder
22+
23+
ENV EXAMPLE_PATH="/opt/pysetup/examples/accumulator/streamsorter"
24+
ENV POETRY_VIRTUALENVS_IN_PROJECT=true
25+
26+
WORKDIR $EXAMPLE_PATH
27+
COPY examples/accumulator/streamsorter/ ./
28+
RUN poetry install --no-root --no-interaction
29+
30+
####################################################################################################
31+
# Stage 3: UDF Runtime - clean container with only needed stuff
32+
####################################################################################################
33+
FROM python:3.10-slim-bullseye AS udf
34+
35+
ENV PYSETUP_PATH="/opt/pysetup"
36+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/streamsorter"
37+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
38+
ENV PATH="$VENV_PATH/bin:$PATH"
39+
40+
RUN apt-get update && apt-get install --no-install-recommends -y wget \
41+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
42+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
43+
&& chmod +x /dumb-init
44+
45+
WORKDIR $PYSETUP_PATH
46+
COPY --from=udf-builder $VENV_PATH $VENV_PATH
47+
COPY --from=udf-builder $EXAMPLE_PATH $EXAMPLE_PATH
48+
49+
WORKDIR $EXAMPLE_PATH
50+
RUN chmod +x entry.sh
51+
52+
ENTRYPOINT ["/dumb-init", "--"]
53+
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]
54+
55+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/streamsorter:${TAG}
4+
DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/streamsorter:${TAG}
4+
DOCKER_FILE_PATH = examples/accumulator/streamsorter/Dockerfile
5+
BASE_IMAGE_NAME = numaflow-python-base
6+
7+
.PHONY: base-image
8+
base-image:
9+
@echo "Building shared base image..."
10+
docker build -f Dockerfile.base -t ${BASE_IMAGE_NAME} .
11+
12+
.PHONY: update
13+
update:
14+
poetry update -vv
15+
16+
.PHONY: image-push
17+
image-push: base-image update
18+
cd ../../../ && docker buildx build \
19+
-f ${DOCKER_FILE_PATH} \
20+
-t ${IMAGE_REGISTRY} \
21+
--platform linux/amd64,linux/arm64 . --push
22+
23+
.PHONY: image
24+
image: base-image update
25+
cd ../../../ && docker build \
26+
-f ${DOCKER_FILE_PATH} \
27+
-t ${IMAGE_REGISTRY} .
28+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
29+
30+
.PHONY: image-fast
31+
image-fast: update
32+
@echo "Building with shared base image (fastest option)..."
33+
cd ../../../ && docker build \
34+
-f examples/map/even_odd/Dockerfile.shared-base \
35+
-t ${IMAGE_REGISTRY} .
36+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
37+
38+
.PHONY: clean
39+
clean:
40+
docker rmi ${BASE_IMAGE_NAME} 2>/dev/null || true
41+
docker rmi ${IMAGE_REGISTRY} 2>/dev/null || true
42+
43+
.PHONY: help
44+
help:
45+
@echo "Available targets:"
46+
@echo " base-image - Build the shared base image with pynumaflow"
47+
@echo " image - Build UDF image with optimized multi-stage build"
48+
@echo " image-fast - Build UDF image using shared base (fastest)"
49+
@echo " image-push - Build and push multi-platform image"
50+
@echo " update - Update poetry dependencies"
51+
@echo " clean - Remove built images"
52+
@echo " help - Show this help message"
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Stream Sorter
2+
3+
An example User Defined Function that sorts the incoming stream by event time.
4+
5+
### Applying the Pipeline
6+
7+
To apply the pipeline, use the following command:
8+
9+
```shell
10+
kubectl apply -f pipeline.yaml
11+
```
12+
13+
### Publish messages
14+
15+
Port-forward the HTTP endpoint, and make POST requests using curl. Remember to replace xxxx with the appropriate pod names.
16+
17+
```shell
18+
kubectl port-forward stream-sorter-http-one-0-xxxx 8444:8443
19+
20+
# Post data to the HTTP endpoint
21+
curl -kq -X POST -d "101" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 60000"
22+
curl -kq -X POST -d "102" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 61000"
23+
curl -kq -X POST -d "103" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 62000"
24+
curl -kq -X POST -d "104" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 63000"
25+
```
26+
27+
```shell
28+
kubectl port-forward stream-sorter-http-two-0-xxxx 8445:8443
29+
30+
# Post data to the HTTP endpoint
31+
curl -kq -X POST -d "105" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 70000"
32+
curl -kq -X POST -d "106" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 71000"
33+
curl -kq -X POST -d "107" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 72000"
34+
curl -kq -X POST -d "108" https://localhost:8445/vertices/http-two -H "X-Numaflow-Event-Time: 73000"
35+
```
36+
37+
### Verify the output
38+
39+
```shell
40+
kubectl logs -f stream-sorter-log-sink-0-xxxx
41+
```
42+
43+
The output should be sorted by event time.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import logging
2+
import os
3+
from collections.abc import AsyncIterable
4+
from datetime import datetime
5+
6+
from pynumaflow import setup_logging
7+
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
8+
from pynumaflow.accumulator import (
9+
Message,
10+
Datum,
11+
)
12+
from pynumaflow.shared.asynciter import NonBlockingIterator
13+
14+
_LOGGER = setup_logging(__name__)
15+
if os.getenv("PYTHONDEBUG"):
16+
_LOGGER.setLevel(logging.DEBUG)
17+
18+
19+
class StreamSorter(Accumulator):
20+
def __init__(self):
21+
_LOGGER.info("StreamSorter initialized")
22+
self.latest_wm = datetime.fromtimestamp(-1)
23+
self.sorted_buffer: list[Datum] = []
24+
25+
async def handler(
26+
self,
27+
datums: AsyncIterable[Datum],
28+
output: NonBlockingIterator,
29+
):
30+
_LOGGER.info("StreamSorter handler started")
31+
async for datum in datums:
32+
_LOGGER.info(
33+
f"Received datum with event time: {datum.event_time}, "
34+
f"Current latest watermark: {self.latest_wm}, "
35+
f"Datum watermark: {datum.watermark}"
36+
)
37+
38+
# If watermark has moved forward
39+
if datum.watermark and datum.watermark > self.latest_wm:
40+
self.latest_wm = datum.watermark
41+
await self.flush_buffer(output)
42+
43+
self.insert_sorted(datum)
44+
45+
def insert_sorted(self, datum: Datum):
46+
# Binary insert to keep sorted buffer in order
47+
left, right = 0, len(self.sorted_buffer)
48+
while left < right:
49+
mid = (left + right) // 2
50+
if self.sorted_buffer[mid].event_time > datum.event_time:
51+
right = mid
52+
else:
53+
left = mid + 1
54+
self.sorted_buffer.insert(left, datum)
55+
56+
async def flush_buffer(self, output: NonBlockingIterator):
57+
_LOGGER.info(f"Watermark updated, flushing sortedBuffer: {self.latest_wm}")
58+
i = 0
59+
for datum in self.sorted_buffer:
60+
if datum.event_time > self.latest_wm:
61+
break
62+
await output.put(Message.from_datum(datum))
63+
_LOGGER.info(f"Sent datum with event time: {datum.event_time}")
64+
i += 1
65+
# Remove flushed items
66+
self.sorted_buffer = self.sorted_buffer[i:]
67+
68+
69+
if __name__ == "__main__":
70+
grpc_server = None
71+
grpc_server = AccumulatorAsyncServer(StreamSorter)
72+
grpc_server.start()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: Pipeline
3+
metadata:
4+
name: stream-sorter
5+
spec:
6+
limits:
7+
readBatchSize: 1
8+
vertices:
9+
- name: http-one
10+
scale:
11+
min: 1
12+
max: 1
13+
source:
14+
http: {}
15+
- name: http-two
16+
scale:
17+
min: 1
18+
max: 1
19+
source:
20+
http: {}
21+
- name: py-accum
22+
udf:
23+
container:
24+
image: quay.io/numaio/numaflow-python/streamsorter:stable
25+
imagePullPolicy: Always
26+
env:
27+
- name: PYTHONDEBUG
28+
value: "true"
29+
groupBy:
30+
window:
31+
accumulator:
32+
timeout: 10s
33+
keyed: true
34+
storage:
35+
persistentVolumeClaim:
36+
volumeSize: 1Gi
37+
- name: py-sink
38+
scale:
39+
min: 1
40+
max: 1
41+
sink:
42+
log: {}
43+
edges:
44+
- from: http-one
45+
to: py-accum
46+
- from: http-two
47+
to: py-accum
48+
- from: py-accum
49+
to: py-sink
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[tool.poetry]
2+
name = "stream-sorter"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
7+
[tool.poetry.dependencies]
8+
python = "~3.10"
9+
pynumaflow = { path = "../../../"}
10+
11+
[build-system]
12+
requires = ["poetry-core>=1.0.0"]
13+
build-backend = "poetry.core.masonry.api"

0 commit comments

Comments
 (0)