-
Notifications
You must be signed in to change notification settings - Fork 24
feat: add Accumulator #237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
- Fixed dictionary iteration issue in stream_send_eof method that was causing 'RuntimeError: dictionary changed size during iteration' - Fixed None watermark comparison issues in write_to_global_queue method - Fixed None watermark/event_time conversion to protobuf timestamp - Added proper null checks for watermark and event_time handling - Tests now run without runtime errors and produce expected message count The main fixes: 1. Create copy of dictionary keys before iteration in stream_send_eof 2. Add null checks before watermark comparison and protobuf conversion 3. Handle None values properly in Message watermark/event_time fields Tests are now passing the core functionality (message processing) but EOF handling still needs work. Signed-off-by: srao12 <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
- Fix missing default value in AccumulatorRequest constructor - Add watermark parameter handling in AccumulatorRequest - Improve test structure and remove debug test file - Fix task_manager.py to handle EOF counting properly - All 253 tests now pass successfully Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #237 +/- ##
=======================================
Coverage ? 93.70%
=======================================
Files ? 65
Lines ? 2860
Branches ? 150
=======================================
Hits ? 2680
Misses ? 130
Partials ? 50 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: srao12 <[email protected]>
vigith
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there a lot of typos, please do a self review?
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
vigith
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the proto file is stale
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
|
|
||
| if curr_task: | ||
| await self.tasks[unified_key].iterator.put(STREAM_EOF) | ||
| self.tasks.pop(unified_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you test a couple of small things for me please,
- Before calling the pop on the the task, can you await on the
_consumer_futuretask for this. Basically, check if it has completed thewrite_to_globalqueue task which sends the eof back.
Actually first you can wait for the future of the task
Then await for the consumer task
And after that we can consider this task done, and pop it out
@cosmic-chichu @vigith
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Once this is done, we know that the task has written all the results
# to the local result queue
fut = task.future
await fut
# # Send an EOF message to the local result queue
# # This will signal that the task has completed processing
await task.result_queue.put(STREAM_EOF)
# Wait for the local queue to write
# all the results of this task to the global result queue
con_future = task.consumer_future
await con_future
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
- Add tests for OPEN, APPEND, and CLOSE window operations - Test standard flow (OPEN→APPEND→CLOSE), append-only, and mixed operation patterns - Add multi-key processing and error handling test scenarios Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
FMEA Test ReportTest Duration: 1200 seconds Total Checks: 80 Pods Killed: 19 Python Sink Ordering Failures: 0 Go Sink Ordering Failures: 0 Result: SUCCESS20 min fmea test: 10 tps, 1 key, panic every 80 seconds. High Throughput FMEA Test ReportTest Configuration
Results Summary5 min high throughput fmea test: 300tps, 50 keys, no panics introduced, frequent pod kills. Python Sink
|
pynumaflow/accumulator/_dtypes.py
Outdated
|
|
||
|
|
||
| @dataclass(init=False) | ||
| class Metadata: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The accumator signature doesn't expect metadata correct? Please confirm and remove if not required
https://github.com/numaproj/numaflow-go/blob/main/pkg/accumulator/examples/streamsorter/main.go#L23
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed it, not required
|
@cosmic-chichu Did first pass, please check once |
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
tests/accumulator/test_datatypes.py
Outdated
| self.assertEqual(2, r.test2) | ||
| self.assertEqual(1, r.test3) | ||
|
|
||
| def test_deep_copy(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to test deep copy here?
tests/accumulator/test_use_cases.py
Outdated
| yield datum | ||
|
|
||
| # Process the stream through our accumulator | ||
| await sorter.handler(data_stream(), output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is draining the iterator and sending a sorted order. Not sure if this covers accumulator functionality here much as we are calling the function directly without any RPC interations.
I guess this file can be removed in favor of actual end to end tests if that is the intention here
tests/accumulator/test_use_cases.py
Outdated
| self.assertIsNotNone(server.servicer) | ||
|
|
||
| # Example 2: Accumulator with configuration parameters | ||
| class ConfigurableAccumulator(Accumulator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
I guess we can remove this as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cosmic-chichu
Please check the comments above and remove those redundant tests as well before merging
As next steps after the PR lets enable the e2e tests for accumulator on the mainline numaflow repo
We would need to add the accum example path here for stream sorter if thats used in the e2e
https://github.com/numaproj/numaflow-python/blob/main/.github/workflows/build-push.yaml#L20
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Fixes: #219