Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions examples/reduce/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ For creating a reducer UDF we can use two different approaches:
kwargs that the custom reducer class requires.
- Finally we need to call the `start` method on the `ReduceAsyncServer` instance to start the reducer server.
```python
from numaflow import Reducer, ReduceAsyncServer
class Example(Reducer):
from collections.abc import AsyncIterable
from pynumaflow.reducer import Reducer, ReduceAsyncServer, Datum, Message, Messages, Metadata

class Example(Reducer):
def __init__(self, counter):
self.counter = counter

async def handler(
self, keys: list[str], datums: AsyncIterable[Datum], md: Metadata
) -> Messages:
Expand All @@ -25,13 +27,13 @@ For creating a reducer UDF we can use two different approaches:
)
return Messages(Message(str.encode(msg), keys=keys))

if __name__ == "__main__":
# Here we are using the class instance as the reducer_instance
# which will be used to invoke the handler function.
# We are passing the init_args for the class instance.
grpc_server = ReduceAsyncServer(Example, init_args=(0,))
grpc_server.start()
```
if __name__ == "__main__":
# Here we are using the class instance as the reducer_instance
# which will be used to invoke the handler function.
# We are passing the init_args for the class instance.
grpc_server = ReduceAsyncServer(Example, init_args=(0,))
grpc_server.start()
```

- Function based reducer
For the function based reducer we need to create a function of the signature
Expand All @@ -43,23 +45,23 @@ For creating a reducer UDF we can use two different approaches:
- Finally we need to call the `start` method on the `ReduceAsyncServer` instance to start the reducer server.
- We must ensure that no init_args or init_kwargs are passed to the `ReduceAsyncServer` instance as they are not used for function based reducers.
```python
from numaflow import ReduceAsyncServer
async def handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
counter = 0
interval_window = md.interval_window
async for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys=keys))

if __name__ == "__main__":
from collections.abc import AsyncIterable
from pynumaflow.reducer import ReduceAsyncServer, Datum, Message, Messages, Metadata

async def handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
counter = 0
interval_window = md.interval_window
async for _ in datums:
counter += 1
msg = (
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message(str.encode(msg), keys=keys))

if __name__ == "__main__":
# Here we are using the function as the reducer_instance
# which will be used to invoke the handler function.
grpc_server = ReduceAsyncServer(handler)
grpc_server.start()
```