@@ -7,11 +7,13 @@ For creating a reducer UDF we can use two different approaches:
77 kwargs that the custom reducer class requires.
88 - Finally we need to call the ` start ` method on the ` ReduceAsyncServer ` instance to start the reducer server.
99 ``` python
10- from numaflow import Reducer, ReduceAsyncServer
11- class Example (Reducer ):
10+ from collections.abc import AsyncIterable
11+ from pynumaflow.reducer import Reducer, ReduceAsyncServer, Datum, Message, Messages, Metadata
12+
13+ class Example (Reducer ):
1214 def __init__ (self , counter ):
1315 self .counter = counter
14-
16+
1517 async def handler (
1618 self , keys : list[str ], datums : AsyncIterable[Datum], md : Metadata
1719 ) -> Messages:
@@ -25,13 +27,13 @@ For creating a reducer UDF we can use two different approaches:
2527 )
2628 return Messages(Message(str .encode(msg), keys = keys))
2729
28- if __name__ == " __main__" :
29- # Here we are using the class instance as the reducer_instance
30- # which will be used to invoke the handler function.
31- # We are passing the init_args for the class instance.
32- grpc_server = ReduceAsyncServer(Example, init_args = (0 ,))
33- grpc_server.start()
34- ```
30+ if __name__ == " __main__" :
31+ # Here we are using the class instance as the reducer_instance
32+ # which will be used to invoke the handler function.
33+ # We are passing the init_args for the class instance.
34+ grpc_server = ReduceAsyncServer(Example, init_args = (0 ,))
35+ grpc_server.start()
36+ ```
3537
3638- Function based reducer
3739 For the function based reducer we need to create a function of the signature
@@ -43,23 +45,23 @@ For creating a reducer UDF we can use two different approaches:
4345 - Finally we need to call the `start` method on the `ReduceAsyncServer` instance to start the reducer server.
4446 - We must ensure that no init_args or init_kwargs are passed to the `ReduceAsyncServer` instance as they are not used for function based reducers.
4547 ```python
46- from numaflow import ReduceAsyncServer
47- async def handler(keys: list[str ], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
48- counter = 0
49- interval_window = md.interval_window
50- async for _ in datums:
51- counter + = 1
52- msg = (
53- f " counter: { counter} interval_window_start: { interval_window.start} "
54- f " interval_window_end: { interval_window.end} "
55- )
56- return Messages(Message(str .encode(msg), keys = keys))
57-
58- if __name__ == " __main__" :
48+ from collections.abc import AsyncIterable
49+ from pynumaflow.reducer import ReduceAsyncServer, Datum, Message, Messages, Metadata
50+
51+ async def handler(keys: list[str ], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
52+ counter = 0
53+ interval_window = md.interval_window
54+ async for _ in datums:
55+ counter + = 1
56+ msg = (
57+ f " counter: { counter} interval_window_start: { interval_window.start} "
58+ f " interval_window_end: { interval_window.end} "
59+ )
60+ return Messages(Message(str .encode(msg), keys = keys))
61+
62+ if __name__ == " __main__" :
5963 # Here we are using the function as the reducer_instance
6064 # which will be used to invoke the handler function.
6165 grpc_server = ReduceAsyncServer(handler)
6266 grpc_server.start()
6367 ```
64-
65-
0 commit comments