@@ -38,10 +38,14 @@ async def handler(
3838 # If watermark has moved forward
3939 if datum .watermark and datum .watermark > self .latest_wm :
4040 self .latest_wm = datum .watermark
41+ _LOGGER .info (f"Watermark updated: { self .latest_wm } " )
4142 await self .flush_buffer (output )
4243
4344 self .insert_sorted (datum )
4445
46+ _LOGGER .info ("Timeout reached" )
47+ await self .flush_buffer (output , flush_all = True )
48+
4549 def insert_sorted (self , datum : Datum ):
4650 # Binary insert to keep sorted buffer in order
4751 left , right = 0 , len (self .sorted_buffer )
@@ -53,11 +57,14 @@ def insert_sorted(self, datum: Datum):
5357 left = mid + 1
5458 self .sorted_buffer .insert (left , datum )
5559
56- async def flush_buffer (self , output : NonBlockingIterator ):
57- _LOGGER .info (f"Watermark updated, flushing sortedBuffer: { self .latest_wm } " )
60+ async def flush_buffer (self , output : NonBlockingIterator , flush_all : bool = False ):
61+ if flush_all :
62+ _LOGGER .info ("Flushing entire sortedBuffer" )
63+ else :
64+ _LOGGER .info (f"Flushing sortedBuffer above watermark: { self .latest_wm } " )
5865 i = 0
5966 for datum in self .sorted_buffer :
60- if datum .event_time > self .latest_wm :
67+ if datum .event_time > self .latest_wm and not flush_all :
6168 break
6269 await output .put (Message .from_datum (datum ))
6370 _LOGGER .info (f"Sent datum with event time: { datum .event_time } " )
0 commit comments