Skip to content

Conversation

@cosmic-chichu
Copy link
Contributor

@cosmic-chichu cosmic-chichu commented Jul 21, 2025

Fixes: #219

  • Implement accumulator
  • add tests

kohlisid and others added 15 commits March 24, 2025 14:02
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
- Fixed dictionary iteration issue in stream_send_eof method that was causing 'RuntimeError: dictionary changed size during iteration'
- Fixed None watermark comparison issues in write_to_global_queue method
- Fixed None watermark/event_time conversion to protobuf timestamp
- Added proper null checks for watermark and event_time handling
- Tests now run without runtime errors and produce expected message count

The main fixes:
1. Create copy of dictionary keys before iteration in stream_send_eof
2. Add null checks before watermark comparison and protobuf conversion
3. Handle None values properly in Message watermark/event_time fields

Tests are now passing the core functionality (message processing) but EOF handling still needs work.

Signed-off-by: srao12 <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
- Fix missing default value in AccumulatorRequest constructor
- Add watermark parameter handling in AccumulatorRequest
- Improve test structure and remove debug test file
- Fix task_manager.py to handle EOF counting properly
- All 253 tests now pass successfully

Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
@cosmic-chichu cosmic-chichu self-assigned this Jul 21, 2025
@codecov
Copy link

codecov bot commented Jul 21, 2025

Codecov Report

❌ Patch coverage is 90.45346% with 40 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@7faf39b). Learn more about missing BASE report.
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
pynumaflow/accumulator/servicer/task_manager.py 80.14% 20 Missing and 8 partials ⚠️
pynumaflow/accumulator/servicer/async_servicer.py 79.48% 7 Missing and 1 partial ⚠️
pynumaflow/accumulator/async_server.py 90.90% 1 Missing and 2 partials ⚠️
pynumaflow/accumulator/_dtypes.py 99.50% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #237   +/-   ##
=======================================
  Coverage        ?   93.70%           
=======================================
  Files           ?       65           
  Lines           ?     2860           
  Branches        ?      150           
=======================================
  Hits            ?     2680           
  Misses          ?      130           
  Partials        ?       50           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: srao12 <[email protected]>
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there a lot of typos, please do a self review?

srao12 added 4 commits July 20, 2025 18:24
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the proto file is stale

srao12 added 2 commits July 20, 2025 20:46
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>

if curr_task:
await self.tasks[unified_key].iterator.put(STREAM_EOF)
self.tasks.pop(unified_key)
Copy link
Contributor

@kohlisid kohlisid Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you test a couple of small things for me please,

  1. Before calling the pop on the the task, can you await on the _consumer_future task for this. Basically, check if it has completed the write_to_global queue task which sends the eof back.
    Actually first you can wait for the future of the task
    Then await for the consumer task
    And after that we can consider this task done, and pop it out
    @cosmic-chichu @vigith

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            # Once this is done, we know that the task has written all the results
            # to the local result queue
            fut = task.future
            await fut

            # # Send an EOF message to the local result queue
            # # This will signal that the task has completed processing
            await task.result_queue.put(STREAM_EOF)

            # Wait for the local queue to write
            # all the results of this task to the global result queue
            con_future = task.consumer_future
            await con_future

Signed-off-by: srao12 <[email protected]>
srao12 added 3 commits July 24, 2025 19:00
Signed-off-by: srao12 <[email protected]>
- Add tests for OPEN, APPEND, and CLOSE window operations
- Test standard flow (OPEN→APPEND→CLOSE), append-only, and mixed operation patterns
- Add multi-key processing and error handling test scenarios

Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
@cosmic-chichu
Copy link
Contributor Author

FMEA Test Report

Test Duration: 1200 seconds

Total Checks: 80

Pods Killed: 19

Python Sink Ordering Failures: 0

Go Sink Ordering Failures: 0

Result: SUCCESS

20 min fmea test: 10 tps, 1 key, panic every 80 seconds.
Event time ordering maintained throughout test!

High Throughput FMEA Test Report

Test Configuration

  • Duration: 5 minutes
  • Target: 50 keys, ~300 TPS
  • Pod Kills: 7

Results Summary

5 min high throughput fmea test: 300tps, 50 keys, no panics introduced, frequent pod kills.

Python Sink

  • Keys Analyzed: 50
  • Events Processed: 44945
  • Ordering Failures: 0



@dataclass(init=False)
class Metadata:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The accumator signature doesn't expect metadata correct? Please confirm and remove if not required
https://github.com/numaproj/numaflow-go/blob/main/pkg/accumulator/examples/streamsorter/main.go#L23

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it, not required

@kohlisid
Copy link
Contributor

@cosmic-chichu Did first pass, please check once

srao12 added 3 commits July 26, 2025 19:52
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
self.assertEqual(2, r.test2)
self.assertEqual(1, r.test3)

def test_deep_copy(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test deep copy here?

yield datum

# Process the stream through our accumulator
await sorter.handler(data_stream(), output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is draining the iterator and sending a sorted order. Not sure if this covers accumulator functionality here much as we are calling the function directly without any RPC interations.

I guess this file can be removed in favor of actual end to end tests if that is the intention here

self.assertIsNotNone(server.servicer)

# Example 2: Accumulator with configuration parameters
class ConfigurableAccumulator(Accumulator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above
I guess we can remove this as well

Copy link
Contributor

@kohlisid kohlisid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosmic-chichu
Please check the comments above and remove those redundant tests as well before merging

As next steps after the PR lets enable the e2e tests for accumulator on the mainline numaflow repo

We would need to add the accum example path here for stream sorter if thats used in the e2e
https://github.com/numaproj/numaflow-python/blob/main/.github/workflows/build-push.yaml#L20

srao12 added 3 commits July 30, 2025 00:47
Signed-off-by: srao12 <[email protected]>
Signed-off-by: srao12 <[email protected]>
@vigith vigith merged commit 4982414 into numaproj:main Jul 30, 2025
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Accumulator in Python SDK

3 participants