Skip to content

Conversation

@ktalluri456
Copy link

@ktalluri456 ktalluri456 commented Nov 5, 2025

Fixes #36736

What is the purpose of this change?

This pull request introduces state sampling for timer processing within the Python SDK. This adds a new metric, process-timers-msecs, which allows runners to more accurately track the time spent on timer processing. This is particularly valuable for streaming pipelines with high timer backlogs, as it provides a key signal for autoscaling algorithms to make more informed decisions.

The key changes include:

  • The process_timer method in operations.py now wraps the timer processing logic in a scoped_state to measure the execution time.
  • New unit tests have been added to reflect the integration with the DoOperation class and the population of the new metric.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (see Fixes #... above).
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @ktalluri456, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Apache Beam Python SDK by introducing a new metric to track the duration of timer processing. This improvement provides critical visibility into the performance of streaming pipelines, particularly those with significant timer backlogs, enabling more effective autoscaling decisions by runners.

Highlights

  • New Metric for Timer Processing: Introduced process-timers-millis to measure the time spent on timer processing in the Python SDK, providing crucial data for autoscaling.
  • Flexible State Sampling: The scoped_state method in statesampler.py now accepts a suffix argument, allowing for more flexible and descriptive metric names.
  • Timer Processing Instrumentation: The process_timer method in operations.py is now wrapped with a scoped_state to accurately capture and report its execution time.
  • Unit Test for Timer Sampling: A new unit test, test_timer_sampler, has been added to statesampler_test.py to verify the correct functionality of the new timer processing metric.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@ktalluri456
Copy link
Author

R: @tvalentyn

@github-actions
Copy link
Contributor

github-actions bot commented Nov 5, 2025

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@ktalluri456 ktalluri456 force-pushed the streaming-timer-metric branch from 6205dd7 to 62643db Compare November 5, 2025 22:16
@codecov
Copy link

codecov bot commented Nov 6, 2025

Codecov Report

❌ Patch coverage is 94.11765% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 56.91%. Comparing base (6dedf8f) to head (1ce3c64).
⚠️ Report is 115 commits behind head on master.

Files with missing lines Patch % Lines
...ks/python/apache_beam/runners/worker/operations.py 94.11% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #36737       +/-   ##
=============================================
+ Coverage     40.21%   56.91%   +16.69%     
  Complexity     3394     3394               
=============================================
  Files          1222     1222               
  Lines        186913   187146      +233     
  Branches       3545     3545               
=============================================
+ Hits          75174   106511    +31337     
+ Misses       108367    77263    -31104     
  Partials       3372     3372               
Flag Coverage Δ
python 80.84% <94.11%> (+40.32%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ktalluri456 ktalluri456 force-pushed the streaming-timer-metric branch from 1053031 to c80fa85 Compare November 6, 2025 04:16
self.name_context, 'process', metrics_container=self.metrics_container)
self.scoped_finish_state = self.state_sampler.scoped_state(
self.name_context, 'finish', metrics_container=self.metrics_container)
if self.state_sampler:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding, when do we have the case where state_sampler is None / not available?
Trying to understand what prompted to add Noop state sampler with this particular change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.

Adding the Noop state ensures that the code is safe to execute in all contexts, both for the new timer metric and or the existing start, process, and finish states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess i still don't understand why this condition was not necessary before but became necessary now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe my change exposed a bug in the Operation class where it would crash during initialization if the state_sampler was None. This happens in batch pipelines I believe, which is why tests like the autocomplete_test.py were failing in presubmit checks.

Adding the if self.state_sampler: check and using a NoOpScopedState in the else block makes the code more robust and ensures it works correctly in both streaming and batch contexts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried reverting this branch and running the autocomplete test. it didn't fail.

Let me try running the test suite without this branch. I'd like to understand why this is necessary.

Copy link
Contributor

@tvalentyn tvalentyn Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying out in #36794 , let's see what is failing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.

I am not seeing these errors - am I missing something?

I do notice the following errors:

2025-11-13T00:34:53.8931580Z =================================== FAILURES ===================================
2025-11-13T00:34:53.8932180Z ___________ StateSamplerTest.test_process_timers_metric_is_recorded ____________
2025-11-13T00:34:53.8933040Z [gw3] darwin -- Python 3.11.9 /Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8933480Z 
2025-11-13T00:34:53.8933840Z self = <apache_beam.runners.worker.statesampler_test.StateSamplerTest testMethod=test_process_timers_metric_is_recorded>
2025-11-13T00:34:53.8934320Z 
2025-11-13T00:34:53.8934560Z     @retry(reraise=True, stop=stop_after_attempt(3))
2025-11-13T00:34:53.8934960Z     def test_process_timers_metric_is_recorded(self):
2025-11-13T00:34:53.8935330Z       """
2025-11-13T00:34:53.8936770Z       Tests that the 'process-timers-msecs' metric is correctly recorded
2025-11-13T00:34:53.8937260Z       when a state sampler is active.
2025-11-13T00:34:53.8937700Z       """
2025-11-13T00:34:53.8938000Z       # Set up a real state sampler and counter factory.
2025-11-13T00:34:53.8938440Z       counter_factory = CounterFactory()
2025-11-13T00:34:53.8938970Z       sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8939420Z           'test_stage', counter_factory, sampling_period_ms=1)
2025-11-13T00:34:53.8939770Z     
2025-11-13T00:34:53.8941420Z       state_duration_ms = 100
2025-11-13T00:34:53.8942440Z       margin_of_error = 0.25
2025-11-13T00:34:53.8943340Z     
2025-11-13T00:34:53.8944230Z       # Run a workload inside the 'process-timers' scoped state.
2025-11-13T00:34:53.8945180Z       sampler.start()
2025-11-13T00:34:53.8946500Z       with sampler.scoped_state('test_step', 'process-timers'):
2025-11-13T00:34:53.8947520Z         time.sleep(state_duration_ms / 1000.0)
2025-11-13T00:34:53.8948220Z       sampler.stop()
2025-11-13T00:34:53.8949110Z       sampler.commit_counters()
2025-11-13T00:34:53.8949860Z     
2025-11-13T00:34:53.8950580Z       if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.8950920Z         return
2025-11-13T00:34:53.8951840Z     
2025-11-13T00:34:53.8952350Z       # Verify that the counter was created with the correct name and value.
2025-11-13T00:34:53.8952790Z       expected_counter_name = CounterName(
2025-11-13T00:34:53.8953250Z           'process-timers-msecs', step_name='test_step', stage_name='test_stage')
2025-11-13T00:34:53.8953720Z     
2025-11-13T00:34:53.8954090Z       # Find the specific counter we are looking for.
2025-11-13T00:34:53.8954450Z       found_counter = None
2025-11-13T00:34:53.8954800Z       for counter in counter_factory.get_counters():
2025-11-13T00:34:53.8955480Z         if counter.name == expected_counter_name:
2025-11-13T00:34:53.8955830Z           found_counter = counter
2025-11-13T00:34:53.8956180Z           break
2025-11-13T00:34:53.8956550Z     
2025-11-13T00:34:53.8956860Z       self.assertIsNotNone(
2025-11-13T00:34:53.8957230Z           found_counter,
2025-11-13T00:34:53.8957580Z           f"The expected counter '{expected_counter_name}' was not created.")
2025-11-13T00:34:53.8957970Z     
2025-11-13T00:34:53.8959130Z       # Check that its value is approximately correct.
2025-11-13T00:34:53.8959570Z       actual_value = found_counter.value()
2025-11-13T00:34:53.8959980Z       expected_value = state_duration_ms
2025-11-13T00:34:53.8960370Z       self.assertGreater(
2025-11-13T00:34:53.8960660Z           actual_value,
2025-11-13T00:34:53.8960990Z           expected_value * (1.0 - margin_of_error),
2025-11-13T00:34:53.8961450Z           "The timer metric was lower than expected.")
2025-11-13T00:34:53.8961800Z >     self.assertLess(
2025-11-13T00:34:53.8962150Z           actual_value,
2025-11-13T00:34:53.8962460Z           expected_value * (1.0 + margin_of_error),
2025-11-13T00:34:53.8962870Z           "The timer metric was higher than expected.")
2025-11-13T00:34:53.8963770Z E     AssertionError: 189 not less than 125.0 : The timer metric was higher than expected.
2025-11-13T00:34:53.8964090Z 
2025-11-13T00:34:53.8964380Z apache_beam/runners/worker/statesampler_test.py:216: AssertionError
2025-11-13T00:34:53.8964950Z _____________________ StateSamplerTest.test_timer_sampler ______________________
2025-11-13T00:34:53.8965520Z [gw3] darwin -- Python 3.11.9 /Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8965930Z 
2025-11-13T00:34:53.8966240Z self = <apache_beam.runners.worker.statesampler_test.StateSamplerTest testMethod=test_timer_sampler>
2025-11-13T00:34:53.8966710Z 
2025-11-13T00:34:53.8966980Z     @retry(reraise=True, stop=stop_after_attempt(3))
2025-11-13T00:34:53.8967370Z     def test_timer_sampler(self):
2025-11-13T00:34:53.8967730Z       # Set up state sampler.
2025-11-13T00:34:53.8968130Z       counter_factory = CounterFactory()
2025-11-13T00:34:53.8968520Z       sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8969930Z           'timer', counter_factory, sampling_period_ms=1)
2025-11-13T00:34:53.8970590Z     
2025-11-13T00:34:53.8970840Z       # Duration of the timer processing.
2025-11-13T00:34:53.9067190Z       state_duration_ms = 100
2025-11-13T00:34:53.9067910Z       margin_of_error = 0.25
2025-11-13T00:34:53.9068290Z     
2025-11-13T00:34:53.9074160Z       sampler.start()
2025-11-13T00:34:53.9143540Z       with sampler.scoped_state('step1', 'process-timers'):
2025-11-13T00:34:53.9146170Z         time.sleep(state_duration_ms / 1000)
2025-11-13T00:34:53.9146930Z       sampler.stop()
2025-11-13T00:34:53.9147550Z       sampler.commit_counters()
2025-11-13T00:34:53.9149250Z     
2025-11-13T00:34:53.9150470Z       if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.9151410Z         # The slow sampler does not implement sampling, so we won't test it.
2025-11-13T00:34:53.9153370Z         return
2025-11-13T00:34:53.9153980Z     
2025-11-13T00:34:53.9155420Z       # Test that sampled state timings are close to their expected values.
2025-11-13T00:34:53.9156630Z       c = CounterName(
2025-11-13T00:34:53.9158070Z           'process-timers-msecs', step_name='step1', stage_name='timer')
2025-11-13T00:34:53.9160900Z       expected_counter_values = {
2025-11-13T00:34:53.9161540Z           c: state_duration_ms,
2025-11-13T00:34:53.9161870Z       }
2025-11-13T00:34:53.9198870Z       for counter in counter_factory.get_counters():
2025-11-13T00:34:53.9199820Z         self.assertIn(counter.name, expected_counter_values)
2025-11-13T00:34:53.9201380Z         expected_value = expected_counter_values[counter.name]
2025-11-13T00:34:53.9201880Z         actual_value = counter.value()
2025-11-13T00:34:53.9202310Z         deviation = float(abs(actual_value - expected_value)) / expected_value
2025-11-13T00:34:53.9203330Z         _LOGGER.info('Sampling deviation from expectation: %f', deviation)
2025-11-13T00:34:53.9203910Z         self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error))
2025-11-13T00:34:53.9204430Z >       self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))
2025-11-13T00:34:53.9204860Z E       AssertionError: 240 not less than 125.0
2025-11-13T00:34:53.9205140Z 
2025-11-13T00:34:53.9205430Z apache_beam/runners/worker/statesampler_test.py:168: AssertionError
2025-11-13T00:34:53.9205890Z ------------------------------ Captured log call -------------------------------
2025-11-13T00:34:53.9206560Z INFO     apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.340000
2025-11-13T00:34:53.9207610Z INFO     apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.480000
2025-11-13T00:34:53.9208450Z INFO     apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.40000

This suggests that the test_process_timers_metric_is_recorded test is flaky. PTAL

self.name_context,
'process-timers',
metrics_container=self.metrics_container,
suffix="-millis")
Copy link
Contributor

@tvalentyn tvalentyn Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are other counters / in other sdks reported with millis precision? if not wondering if we should be sticking with msec for constitency.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly used the -millis suffix based on the feedback from the design doc. The goal was to prevent potential parsing conflicts with the existing -msecs profile view and align with the naming convention used for this metric in other SDKs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on doc, there was a misalignment, should be reverted back to -msecs.

# debug mode).
self.assertLess(overhead_us, 20.0)

@retry(reraise=True, stop=stop_after_attempt(3))
Copy link
Contributor

@tvalentyn tvalentyn Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test exercise any of the codepaths that you've added?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test should cover the code paths because I test the new process-timers-millis scoped state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we exercise the same codepaths when we run

    with sampler.scoped_state('step1', 'statea'):

in the earlier test. 'process-timers' is just a string name to pass to the sampler. i don't think this test adds any new test coverage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I have added a few test cases that should test the code paths that I created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will take a look. let's remove this though since it is redundant in any case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, removed the test case.

self.user_state_context = user_state_context
self.tagged_receivers = None # type: Optional[_TaggedReceivers]
# A mapping of timer tags to the input "PCollections" they come in on.
# Force clean rebuild
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not forget to remove this. Note: you can close and reopen PRs to rerun a test suite.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks for pointing this out!

self.name_context, 'process', metrics_container=self.metrics_container)
self.scoped_finish_state = self.state_sampler.scoped_state(
self.name_context, 'finish', metrics_container=self.metrics_container)
if self.state_sampler:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried reverting this branch and running the autocomplete test. it didn't fail.

Let me try running the test suite without this branch. I'd like to understand why this is necessary.

# debug mode).
self.assertLess(overhead_us, 20.0)

@retry(reraise=True, stop=stop_after_attempt(3))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will take a look. let's remove this though since it is redundant in any case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Add state sampling for timer processing in the Python SDK

2 participants