-
Notifications
You must be signed in to change notification settings - Fork 4.4k
[BEAM-36736] Add state sampling for timer processing in the Python SDK #36737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @ktalluri456, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Apache Beam Python SDK by introducing a new metric to track the duration of timer processing. This improvement provides critical visibility into the performance of streaming pipelines, particularly those with significant timer backlogs, enabling more effective autoscaling decisions by runners. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
R: @tvalentyn |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
6205dd7 to
62643db
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #36737 +/- ##
=============================================
+ Coverage 40.21% 56.91% +16.69%
Complexity 3394 3394
=============================================
Files 1222 1222
Lines 186913 187146 +233
Branches 3545 3545
=============================================
+ Hits 75174 106511 +31337
+ Misses 108367 77263 -31104
Partials 3372 3372
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
1053031 to
c80fa85
Compare
| self.name_context, 'process', metrics_container=self.metrics_container) | ||
| self.scoped_finish_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'finish', metrics_container=self.metrics_container) | ||
| if self.state_sampler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my understanding, when do we have the case where state_sampler is None / not available?
Trying to understand what prompted to add Noop state sampler with this particular change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.
Adding the Noop state ensures that the code is safe to execute in all contexts, both for the new timer metric and or the existing start, process, and finish states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess i still don't understand why this condition was not necessary before but became necessary now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe my change exposed a bug in the Operation class where it would crash during initialization if the state_sampler was None. This happens in batch pipelines I believe, which is why tests like the autocomplete_test.py were failing in presubmit checks.
Adding the if self.state_sampler: check and using a NoOpScopedState in the else block makes the code more robust and ensures it works correctly in both streaming and batch contexts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried reverting this branch and running the autocomplete test. it didn't fail.
Let me try running the test suite without this branch. I'd like to understand why this is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying out in #36794 , let's see what is failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.
I am not seeing these errors - am I missing something?
I do notice the following errors:
2025-11-13T00:34:53.8931580Z =================================== FAILURES ===================================
2025-11-13T00:34:53.8932180Z ___________ StateSamplerTest.test_process_timers_metric_is_recorded ____________
2025-11-13T00:34:53.8933040Z [gw3] darwin -- Python 3.11.9 /Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8933480Z
2025-11-13T00:34:53.8933840Z self = <apache_beam.runners.worker.statesampler_test.StateSamplerTest testMethod=test_process_timers_metric_is_recorded>
2025-11-13T00:34:53.8934320Z
2025-11-13T00:34:53.8934560Z @retry(reraise=True, stop=stop_after_attempt(3))
2025-11-13T00:34:53.8934960Z def test_process_timers_metric_is_recorded(self):
2025-11-13T00:34:53.8935330Z """
2025-11-13T00:34:53.8936770Z Tests that the 'process-timers-msecs' metric is correctly recorded
2025-11-13T00:34:53.8937260Z when a state sampler is active.
2025-11-13T00:34:53.8937700Z """
2025-11-13T00:34:53.8938000Z # Set up a real state sampler and counter factory.
2025-11-13T00:34:53.8938440Z counter_factory = CounterFactory()
2025-11-13T00:34:53.8938970Z sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8939420Z 'test_stage', counter_factory, sampling_period_ms=1)
2025-11-13T00:34:53.8939770Z
2025-11-13T00:34:53.8941420Z state_duration_ms = 100
2025-11-13T00:34:53.8942440Z margin_of_error = 0.25
2025-11-13T00:34:53.8943340Z
2025-11-13T00:34:53.8944230Z # Run a workload inside the 'process-timers' scoped state.
2025-11-13T00:34:53.8945180Z sampler.start()
2025-11-13T00:34:53.8946500Z with sampler.scoped_state('test_step', 'process-timers'):
2025-11-13T00:34:53.8947520Z time.sleep(state_duration_ms / 1000.0)
2025-11-13T00:34:53.8948220Z sampler.stop()
2025-11-13T00:34:53.8949110Z sampler.commit_counters()
2025-11-13T00:34:53.8949860Z
2025-11-13T00:34:53.8950580Z if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.8950920Z return
2025-11-13T00:34:53.8951840Z
2025-11-13T00:34:53.8952350Z # Verify that the counter was created with the correct name and value.
2025-11-13T00:34:53.8952790Z expected_counter_name = CounterName(
2025-11-13T00:34:53.8953250Z 'process-timers-msecs', step_name='test_step', stage_name='test_stage')
2025-11-13T00:34:53.8953720Z
2025-11-13T00:34:53.8954090Z # Find the specific counter we are looking for.
2025-11-13T00:34:53.8954450Z found_counter = None
2025-11-13T00:34:53.8954800Z for counter in counter_factory.get_counters():
2025-11-13T00:34:53.8955480Z if counter.name == expected_counter_name:
2025-11-13T00:34:53.8955830Z found_counter = counter
2025-11-13T00:34:53.8956180Z break
2025-11-13T00:34:53.8956550Z
2025-11-13T00:34:53.8956860Z self.assertIsNotNone(
2025-11-13T00:34:53.8957230Z found_counter,
2025-11-13T00:34:53.8957580Z f"The expected counter '{expected_counter_name}' was not created.")
2025-11-13T00:34:53.8957970Z
2025-11-13T00:34:53.8959130Z # Check that its value is approximately correct.
2025-11-13T00:34:53.8959570Z actual_value = found_counter.value()
2025-11-13T00:34:53.8959980Z expected_value = state_duration_ms
2025-11-13T00:34:53.8960370Z self.assertGreater(
2025-11-13T00:34:53.8960660Z actual_value,
2025-11-13T00:34:53.8960990Z expected_value * (1.0 - margin_of_error),
2025-11-13T00:34:53.8961450Z "The timer metric was lower than expected.")
2025-11-13T00:34:53.8961800Z > self.assertLess(
2025-11-13T00:34:53.8962150Z actual_value,
2025-11-13T00:34:53.8962460Z expected_value * (1.0 + margin_of_error),
2025-11-13T00:34:53.8962870Z "The timer metric was higher than expected.")
2025-11-13T00:34:53.8963770Z E AssertionError: 189 not less than 125.0 : The timer metric was higher than expected.
2025-11-13T00:34:53.8964090Z
2025-11-13T00:34:53.8964380Z apache_beam/runners/worker/statesampler_test.py:216: AssertionError
2025-11-13T00:34:53.8964950Z _____________________ StateSamplerTest.test_timer_sampler ______________________
2025-11-13T00:34:53.8965520Z [gw3] darwin -- Python 3.11.9 /Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8965930Z
2025-11-13T00:34:53.8966240Z self = <apache_beam.runners.worker.statesampler_test.StateSamplerTest testMethod=test_timer_sampler>
2025-11-13T00:34:53.8966710Z
2025-11-13T00:34:53.8966980Z @retry(reraise=True, stop=stop_after_attempt(3))
2025-11-13T00:34:53.8967370Z def test_timer_sampler(self):
2025-11-13T00:34:53.8967730Z # Set up state sampler.
2025-11-13T00:34:53.8968130Z counter_factory = CounterFactory()
2025-11-13T00:34:53.8968520Z sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8969930Z 'timer', counter_factory, sampling_period_ms=1)
2025-11-13T00:34:53.8970590Z
2025-11-13T00:34:53.8970840Z # Duration of the timer processing.
2025-11-13T00:34:53.9067190Z state_duration_ms = 100
2025-11-13T00:34:53.9067910Z margin_of_error = 0.25
2025-11-13T00:34:53.9068290Z
2025-11-13T00:34:53.9074160Z sampler.start()
2025-11-13T00:34:53.9143540Z with sampler.scoped_state('step1', 'process-timers'):
2025-11-13T00:34:53.9146170Z time.sleep(state_duration_ms / 1000)
2025-11-13T00:34:53.9146930Z sampler.stop()
2025-11-13T00:34:53.9147550Z sampler.commit_counters()
2025-11-13T00:34:53.9149250Z
2025-11-13T00:34:53.9150470Z if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.9151410Z # The slow sampler does not implement sampling, so we won't test it.
2025-11-13T00:34:53.9153370Z return
2025-11-13T00:34:53.9153980Z
2025-11-13T00:34:53.9155420Z # Test that sampled state timings are close to their expected values.
2025-11-13T00:34:53.9156630Z c = CounterName(
2025-11-13T00:34:53.9158070Z 'process-timers-msecs', step_name='step1', stage_name='timer')
2025-11-13T00:34:53.9160900Z expected_counter_values = {
2025-11-13T00:34:53.9161540Z c: state_duration_ms,
2025-11-13T00:34:53.9161870Z }
2025-11-13T00:34:53.9198870Z for counter in counter_factory.get_counters():
2025-11-13T00:34:53.9199820Z self.assertIn(counter.name, expected_counter_values)
2025-11-13T00:34:53.9201380Z expected_value = expected_counter_values[counter.name]
2025-11-13T00:34:53.9201880Z actual_value = counter.value()
2025-11-13T00:34:53.9202310Z deviation = float(abs(actual_value - expected_value)) / expected_value
2025-11-13T00:34:53.9203330Z _LOGGER.info('Sampling deviation from expectation: %f', deviation)
2025-11-13T00:34:53.9203910Z self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error))
2025-11-13T00:34:53.9204430Z > self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))
2025-11-13T00:34:53.9204860Z E AssertionError: 240 not less than 125.0
2025-11-13T00:34:53.9205140Z
2025-11-13T00:34:53.9205430Z apache_beam/runners/worker/statesampler_test.py:168: AssertionError
2025-11-13T00:34:53.9205890Z ------------------------------ Captured log call -------------------------------
2025-11-13T00:34:53.9206560Z INFO apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.340000
2025-11-13T00:34:53.9207610Z INFO apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.480000
2025-11-13T00:34:53.9208450Z INFO apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.40000
This suggests that the test_process_timers_metric_is_recorded test is flaky. PTAL
| self.name_context, | ||
| 'process-timers', | ||
| metrics_container=self.metrics_container, | ||
| suffix="-millis") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are other counters / in other sdks reported with millis precision? if not wondering if we should be sticking with msec for constitency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mainly used the -millis suffix based on the feedback from the design doc. The goal was to prevent potential parsing conflicts with the existing -msecs profile view and align with the naming convention used for this metric in other SDKs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on doc, there was a misalignment, should be reverted back to -msecs.
| # debug mode). | ||
| self.assertLess(overhead_us, 20.0) | ||
|
|
||
| @retry(reraise=True, stop=stop_after_attempt(3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test exercise any of the codepaths that you've added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test should cover the code paths because I test the new process-timers-millis scoped state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we exercise the same codepaths when we run
with sampler.scoped_state('step1', 'statea'):
in the earlier test. 'process-timers' is just a string name to pass to the sampler. i don't think this test adds any new test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood, I have added a few test cases that should test the code paths that I created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will take a look. let's remove this though since it is redundant in any case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, removed the test case.
| self.user_state_context = user_state_context | ||
| self.tagged_receivers = None # type: Optional[_TaggedReceivers] | ||
| # A mapping of timer tags to the input "PCollections" they come in on. | ||
| # Force clean rebuild |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not forget to remove this. Note: you can close and reopen PRs to rerun a test suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, thanks for pointing this out!
| self.name_context, 'process', metrics_container=self.metrics_container) | ||
| self.scoped_finish_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'finish', metrics_container=self.metrics_container) | ||
| if self.state_sampler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried reverting this branch and running the autocomplete test. it didn't fail.
Let me try running the test suite without this branch. I'd like to understand why this is necessary.
| # debug mode). | ||
| self.assertLess(overhead_us, 20.0) | ||
|
|
||
| @retry(reraise=True, stop=stop_after_attempt(3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will take a look. let's remove this though since it is redundant in any case.
Fixes #36736
What is the purpose of this change?
This pull request introduces state sampling for timer processing within the Python SDK. This adds a new metric,
process-timers-msecs, which allows runners to more accurately track the time spent on timer processing. This is particularly valuable for streaming pipelines with high timer backlogs, as it provides a key signal for autoscaling algorithms to make more informed decisions.The key changes include:
process_timermethod inoperations.pynow wraps the timer processing logic in ascoped_stateto measure the execution time.Follow this checklist to help us incorporate your contribution quickly and easily:
Fixes #...above).CHANGES.mdwith noteworthy changes.