Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/runners/worker/operations.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ cdef class DoOperation(Operation):
cdef dict timer_specs
cdef public object input_info
cdef object fn
cdef readonly object scoped_timer_processing_state


cdef class SdfProcessSizedElements(DoOperation):
Expand Down
52 changes: 37 additions & 15 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from apache_beam.runners.worker import opcounters
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import sideinputs
from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.data_sampler import DataSampler
from apache_beam.transforms import sideinputs as apache_sideinputs
from apache_beam.transforms import combiners
Expand Down Expand Up @@ -444,12 +445,19 @@ def __init__(
self.metrics_container = MetricsContainer(self.name_context.metrics_name())

self.state_sampler = state_sampler
self.scoped_start_state = self.state_sampler.scoped_state(
self.name_context, 'start', metrics_container=self.metrics_container)
self.scoped_process_state = self.state_sampler.scoped_state(
self.name_context, 'process', metrics_container=self.metrics_container)
self.scoped_finish_state = self.state_sampler.scoped_state(
self.name_context, 'finish', metrics_container=self.metrics_container)
if self.state_sampler:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding, when do we have the case where state_sampler is None / not available?
Trying to understand what prompted to add Noop state sampler with this particular change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.

Adding the Noop state ensures that the code is safe to execute in all contexts, both for the new timer metric and or the existing start, process, and finish states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess i still don't understand why this condition was not necessary before but became necessary now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe my change exposed a bug in the Operation class where it would crash during initialization if the state_sampler was None. This happens in batch pipelines I believe, which is why tests like the autocomplete_test.py were failing in presubmit checks.

Adding the if self.state_sampler: check and using a NoOpScopedState in the else block makes the code more robust and ensures it works correctly in both streaming and batch contexts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried reverting this branch and running the autocomplete test. it didn't fail.

Let me try running the test suite without this branch. I'd like to understand why this is necessary.

Copy link
Contributor

@tvalentyn tvalentyn Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying out in #36794 , let's see what is failing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.

I am not seeing these errors - am I missing something?

I do notice the following errors:

2025-11-13T00:34:53.8931580Z =================================== FAILURES ===================================
2025-11-13T00:34:53.8932180Z ___________ StateSamplerTest.test_process_timers_metric_is_recorded ____________
2025-11-13T00:34:53.8933040Z [gw3] darwin -- Python 3.11.9 /Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8933480Z 
2025-11-13T00:34:53.8933840Z self = <apache_beam.runners.worker.statesampler_test.StateSamplerTest testMethod=test_process_timers_metric_is_recorded>
2025-11-13T00:34:53.8934320Z 
2025-11-13T00:34:53.8934560Z     @retry(reraise=True, stop=stop_after_attempt(3))
2025-11-13T00:34:53.8934960Z     def test_process_timers_metric_is_recorded(self):
2025-11-13T00:34:53.8935330Z       """
2025-11-13T00:34:53.8936770Z       Tests that the 'process-timers-msecs' metric is correctly recorded
2025-11-13T00:34:53.8937260Z       when a state sampler is active.
2025-11-13T00:34:53.8937700Z       """
2025-11-13T00:34:53.8938000Z       # Set up a real state sampler and counter factory.
2025-11-13T00:34:53.8938440Z       counter_factory = CounterFactory()
2025-11-13T00:34:53.8938970Z       sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8939420Z           'test_stage', counter_factory, sampling_period_ms=1)
2025-11-13T00:34:53.8939770Z     
2025-11-13T00:34:53.8941420Z       state_duration_ms = 100
2025-11-13T00:34:53.8942440Z       margin_of_error = 0.25
2025-11-13T00:34:53.8943340Z     
2025-11-13T00:34:53.8944230Z       # Run a workload inside the 'process-timers' scoped state.
2025-11-13T00:34:53.8945180Z       sampler.start()
2025-11-13T00:34:53.8946500Z       with sampler.scoped_state('test_step', 'process-timers'):
2025-11-13T00:34:53.8947520Z         time.sleep(state_duration_ms / 1000.0)
2025-11-13T00:34:53.8948220Z       sampler.stop()
2025-11-13T00:34:53.8949110Z       sampler.commit_counters()
2025-11-13T00:34:53.8949860Z     
2025-11-13T00:34:53.8950580Z       if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.8950920Z         return
2025-11-13T00:34:53.8951840Z     
2025-11-13T00:34:53.8952350Z       # Verify that the counter was created with the correct name and value.
2025-11-13T00:34:53.8952790Z       expected_counter_name = CounterName(
2025-11-13T00:34:53.8953250Z           'process-timers-msecs', step_name='test_step', stage_name='test_stage')
2025-11-13T00:34:53.8953720Z     
2025-11-13T00:34:53.8954090Z       # Find the specific counter we are looking for.
2025-11-13T00:34:53.8954450Z       found_counter = None
2025-11-13T00:34:53.8954800Z       for counter in counter_factory.get_counters():
2025-11-13T00:34:53.8955480Z         if counter.name == expected_counter_name:
2025-11-13T00:34:53.8955830Z           found_counter = counter
2025-11-13T00:34:53.8956180Z           break
2025-11-13T00:34:53.8956550Z     
2025-11-13T00:34:53.8956860Z       self.assertIsNotNone(
2025-11-13T00:34:53.8957230Z           found_counter,
2025-11-13T00:34:53.8957580Z           f"The expected counter '{expected_counter_name}' was not created.")
2025-11-13T00:34:53.8957970Z     
2025-11-13T00:34:53.8959130Z       # Check that its value is approximately correct.
2025-11-13T00:34:53.8959570Z       actual_value = found_counter.value()
2025-11-13T00:34:53.8959980Z       expected_value = state_duration_ms
2025-11-13T00:34:53.8960370Z       self.assertGreater(
2025-11-13T00:34:53.8960660Z           actual_value,
2025-11-13T00:34:53.8960990Z           expected_value * (1.0 - margin_of_error),
2025-11-13T00:34:53.8961450Z           "The timer metric was lower than expected.")
2025-11-13T00:34:53.8961800Z >     self.assertLess(
2025-11-13T00:34:53.8962150Z           actual_value,
2025-11-13T00:34:53.8962460Z           expected_value * (1.0 + margin_of_error),
2025-11-13T00:34:53.8962870Z           "The timer metric was higher than expected.")
2025-11-13T00:34:53.8963770Z E     AssertionError: 189 not less than 125.0 : The timer metric was higher than expected.
2025-11-13T00:34:53.8964090Z 
2025-11-13T00:34:53.8964380Z apache_beam/runners/worker/statesampler_test.py:216: AssertionError
2025-11-13T00:34:53.8964950Z _____________________ StateSamplerTest.test_timer_sampler ______________________
2025-11-13T00:34:53.8965520Z [gw3] darwin -- Python 3.11.9 /Users/runner/work/beam/beam/sdks/python/target/.tox/py311-macos/bin/python
2025-11-13T00:34:53.8965930Z 
2025-11-13T00:34:53.8966240Z self = <apache_beam.runners.worker.statesampler_test.StateSamplerTest testMethod=test_timer_sampler>
2025-11-13T00:34:53.8966710Z 
2025-11-13T00:34:53.8966980Z     @retry(reraise=True, stop=stop_after_attempt(3))
2025-11-13T00:34:53.8967370Z     def test_timer_sampler(self):
2025-11-13T00:34:53.8967730Z       # Set up state sampler.
2025-11-13T00:34:53.8968130Z       counter_factory = CounterFactory()
2025-11-13T00:34:53.8968520Z       sampler = statesampler.StateSampler(
2025-11-13T00:34:53.8969930Z           'timer', counter_factory, sampling_period_ms=1)
2025-11-13T00:34:53.8970590Z     
2025-11-13T00:34:53.8970840Z       # Duration of the timer processing.
2025-11-13T00:34:53.9067190Z       state_duration_ms = 100
2025-11-13T00:34:53.9067910Z       margin_of_error = 0.25
2025-11-13T00:34:53.9068290Z     
2025-11-13T00:34:53.9074160Z       sampler.start()
2025-11-13T00:34:53.9143540Z       with sampler.scoped_state('step1', 'process-timers'):
2025-11-13T00:34:53.9146170Z         time.sleep(state_duration_ms / 1000)
2025-11-13T00:34:53.9146930Z       sampler.stop()
2025-11-13T00:34:53.9147550Z       sampler.commit_counters()
2025-11-13T00:34:53.9149250Z     
2025-11-13T00:34:53.9150470Z       if not statesampler.FAST_SAMPLER:
2025-11-13T00:34:53.9151410Z         # The slow sampler does not implement sampling, so we won't test it.
2025-11-13T00:34:53.9153370Z         return
2025-11-13T00:34:53.9153980Z     
2025-11-13T00:34:53.9155420Z       # Test that sampled state timings are close to their expected values.
2025-11-13T00:34:53.9156630Z       c = CounterName(
2025-11-13T00:34:53.9158070Z           'process-timers-msecs', step_name='step1', stage_name='timer')
2025-11-13T00:34:53.9160900Z       expected_counter_values = {
2025-11-13T00:34:53.9161540Z           c: state_duration_ms,
2025-11-13T00:34:53.9161870Z       }
2025-11-13T00:34:53.9198870Z       for counter in counter_factory.get_counters():
2025-11-13T00:34:53.9199820Z         self.assertIn(counter.name, expected_counter_values)
2025-11-13T00:34:53.9201380Z         expected_value = expected_counter_values[counter.name]
2025-11-13T00:34:53.9201880Z         actual_value = counter.value()
2025-11-13T00:34:53.9202310Z         deviation = float(abs(actual_value - expected_value)) / expected_value
2025-11-13T00:34:53.9203330Z         _LOGGER.info('Sampling deviation from expectation: %f', deviation)
2025-11-13T00:34:53.9203910Z         self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error))
2025-11-13T00:34:53.9204430Z >       self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))
2025-11-13T00:34:53.9204860Z E       AssertionError: 240 not less than 125.0
2025-11-13T00:34:53.9205140Z 
2025-11-13T00:34:53.9205430Z apache_beam/runners/worker/statesampler_test.py:168: AssertionError
2025-11-13T00:34:53.9205890Z ------------------------------ Captured log call -------------------------------
2025-11-13T00:34:53.9206560Z INFO     apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.340000
2025-11-13T00:34:53.9207610Z INFO     apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.480000
2025-11-13T00:34:53.9208450Z INFO     apache_beam.runners.worker.statesampler_test:statesampler_test.py:166 Sampling deviation from expectation: 1.40000

This suggests that the test_process_timers_metric_is_recorded test is flaky. PTAL

self.scoped_start_state = self.state_sampler.scoped_state(
self.name_context, 'start', metrics_container=self.metrics_container)
self.scoped_process_state = self.state_sampler.scoped_state(
self.name_context,
'process',
metrics_container=self.metrics_container)
self.scoped_finish_state = self.state_sampler.scoped_state(
self.name_context, 'finish', metrics_container=self.metrics_container)
else:
self.scoped_start_state = statesampler.NOOP_SCOPED_STATE
self.scoped_process_state = statesampler.NOOP_SCOPED_STATE
self.scoped_finish_state = statesampler.NOOP_SCOPED_STATE
# TODO(ccy): the '-abort' state can be added when the abort is supported in
# Operations.
self.receivers = [] # type: List[ConsumerSet]
Expand Down Expand Up @@ -808,8 +816,15 @@ def __init__(
self.user_state_context = user_state_context
self.tagged_receivers = None # type: Optional[_TaggedReceivers]
# A mapping of timer tags to the input "PCollections" they come in on.
# Force clean rebuild
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not forget to remove this. Note: you can close and reopen PRs to rerun a test suite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks for pointing this out!

self.input_info = None # type: Optional[OpInputInfo]

self.scoped_timer_processing_state = statesampler.NOOP_SCOPED_STATE
if self.state_sampler:
self.scoped_timer_processing_state = self.state_sampler.scoped_state(
self.name_context,
'process-timers',
metrics_container=self.metrics_container,
suffix="-millis")
Copy link
Contributor

@tvalentyn tvalentyn Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are other counters / in other sdks reported with millis precision? if not wondering if we should be sticking with msec for constitency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly used the -millis suffix based on the feedback from the design doc. The goal was to prevent potential parsing conflicts with the existing -msecs profile view and align with the naming convention used for this metric in other SDKs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on doc, there was a misalignment, should be reverted back to -msecs.

# See fn_data in dataflow_runner.py
# TODO: Store all the items from spec?
self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn))
Expand Down Expand Up @@ -971,14 +986,21 @@ def add_timer_info(self, timer_family_id, timer_info):
self.user_state_context.add_timer_info(timer_family_id, timer_info)

def process_timer(self, tag, timer_data):
timer_spec = self.timer_specs[tag]
self.dofn_runner.process_user_timer(
timer_spec,
timer_data.user_key,
timer_data.windows[0],
timer_data.fire_timestamp,
timer_data.paneinfo,
timer_data.dynamic_timer_tag)
def process_timer_logic():
timer_spec = self.timer_specs[tag]
self.dofn_runner.process_user_timer(
timer_spec,
timer_data.user_key,
timer_data.windows[0],
timer_data.fire_timestamp,
timer_data.paneinfo,
timer_data.dynamic_timer_tag)

if self.scoped_timer_processing_state:
with self.scoped_timer_processing_state:
process_timer_logic()
else:
process_timer_logic()

def finish(self):
# type: () -> None
Expand Down
20 changes: 17 additions & 3 deletions sdks/python/apache_beam/runners/worker/statesampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ def scoped_state(
name_context: Union[str, 'common.NameContext'],
state_name: str,
io_target=None,
metrics_container: Optional['MetricsContainer'] = None
) -> statesampler_impl.ScopedState:
metrics_container: Optional['MetricsContainer'] = None,
suffix: str = '-msecs') -> statesampler_impl.ScopedState:
"""Returns a ScopedState object associated to a Step and a State.

Args:
Expand All @@ -152,7 +152,7 @@ def scoped_state(
name_context = common.NameContext(name_context)

counter_name = CounterName(
state_name + '-msecs',
state_name + suffix,
stage_name=self._prefix,
step_name=name_context.metrics_name(),
io_target=io_target)
Expand All @@ -170,3 +170,17 @@ def commit_counters(self) -> None:
for state in self._states_by_name.values():
state_msecs = int(1e-6 * state.nsecs)
state.counter.update(state_msecs - state.counter.value())


class NoOpScopedState:
def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def sampled_msecs_int(self):
return 0


NOOP_SCOPED_STATE = NoOpScopedState()
35 changes: 35 additions & 0 deletions sdks/python/apache_beam/runners/worker/statesampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,41 @@ def test_sampler_transition_overhead(self):
# debug mode).
self.assertLess(overhead_us, 20.0)

@retry(reraise=True, stop=stop_after_attempt(3))
Copy link
Contributor

@tvalentyn tvalentyn Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test exercise any of the codepaths that you've added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test should cover the code paths because I test the new process-timers-millis scoped state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we exercise the same codepaths when we run

    with sampler.scoped_state('step1', 'statea'):

in the earlier test. 'process-timers' is just a string name to pass to the sampler. i don't think this test adds any new test coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I have added a few test cases that should test the code paths that I created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will take a look. let's remove this though since it is redundant in any case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, removed the test case.

def test_timer_sampler(self):
# Set up state sampler.
counter_factory = CounterFactory()
sampler = statesampler.StateSampler(
'timer', counter_factory, sampling_period_ms=1)

# Duration of the timer processing.
state_duration_ms = 100
margin_of_error = 0.25

sampler.start()
with sampler.scoped_state('step1', 'process-timers', suffix='-millis'):
time.sleep(state_duration_ms / 1000)
sampler.stop()
sampler.commit_counters()

if not statesampler.FAST_SAMPLER:
# The slow sampler does not implement sampling, so we won't test it.
return

# Test that sampled state timings are close to their expected values.
expected_counter_values = {
CounterName(
'process-timers-millis', step_name='step1', stage_name='timer'): state_duration_ms,
}
for counter in counter_factory.get_counters():
self.assertIn(counter.name, expected_counter_values)
expected_value = expected_counter_values[counter.name]
actual_value = counter.value()
deviation = float(abs(actual_value - expected_value)) / expected_value
_LOGGER.info('Sampling deviation from expectation: %f', deviation)
self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error))
self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading