-
Notifications
You must be signed in to change notification settings - Fork 4.4k
[BEAM-36736] Add state sampling for timer processing in the Python SDK #36737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
62643db
c5cb784
1ce3c64
c80fa85
188a767
9b5a91c
f4b44db
ed86d99
a146463
87b50b9
8e1dbb3
bf55c2d
d41f76b
f85191f
d063094
02cf48c
2def297
21deb92
94cb59a
ec7fb2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,7 @@ | |
| from apache_beam.runners.worker import opcounters | ||
| from apache_beam.runners.worker import operation_specs | ||
| from apache_beam.runners.worker import sideinputs | ||
| from apache_beam.runners.worker import statesampler | ||
| from apache_beam.runners.worker.data_sampler import DataSampler | ||
| from apache_beam.transforms import sideinputs as apache_sideinputs | ||
| from apache_beam.transforms import combiners | ||
|
|
@@ -444,12 +445,19 @@ def __init__( | |
| self.metrics_container = MetricsContainer(self.name_context.metrics_name()) | ||
|
|
||
| self.state_sampler = state_sampler | ||
| self.scoped_start_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'start', metrics_container=self.metrics_container) | ||
| self.scoped_process_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'process', metrics_container=self.metrics_container) | ||
| self.scoped_finish_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'finish', metrics_container=self.metrics_container) | ||
| if self.state_sampler: | ||
| self.scoped_start_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'start', metrics_container=self.metrics_container) | ||
| self.scoped_process_state = self.state_sampler.scoped_state( | ||
| self.name_context, | ||
| 'process', | ||
| metrics_container=self.metrics_container) | ||
| self.scoped_finish_state = self.state_sampler.scoped_state( | ||
| self.name_context, 'finish', metrics_container=self.metrics_container) | ||
| else: | ||
| self.scoped_start_state = statesampler.NOOP_SCOPED_STATE | ||
| self.scoped_process_state = statesampler.NOOP_SCOPED_STATE | ||
| self.scoped_finish_state = statesampler.NOOP_SCOPED_STATE | ||
| # TODO(ccy): the '-abort' state can be added when the abort is supported in | ||
| # Operations. | ||
| self.receivers = [] # type: List[ConsumerSet] | ||
|
|
@@ -808,8 +816,15 @@ def __init__( | |
| self.user_state_context = user_state_context | ||
| self.tagged_receivers = None # type: Optional[_TaggedReceivers] | ||
| # A mapping of timer tags to the input "PCollections" they come in on. | ||
| # Force clean rebuild | ||
|
||
| self.input_info = None # type: Optional[OpInputInfo] | ||
|
|
||
| self.scoped_timer_processing_state = statesampler.NOOP_SCOPED_STATE | ||
| if self.state_sampler: | ||
| self.scoped_timer_processing_state = self.state_sampler.scoped_state( | ||
| self.name_context, | ||
| 'process-timers', | ||
| metrics_container=self.metrics_container, | ||
| suffix="-millis") | ||
|
||
| # See fn_data in dataflow_runner.py | ||
| # TODO: Store all the items from spec? | ||
| self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn)) | ||
|
|
@@ -971,14 +986,21 @@ def add_timer_info(self, timer_family_id, timer_info): | |
| self.user_state_context.add_timer_info(timer_family_id, timer_info) | ||
|
|
||
| def process_timer(self, tag, timer_data): | ||
| timer_spec = self.timer_specs[tag] | ||
| self.dofn_runner.process_user_timer( | ||
| timer_spec, | ||
| timer_data.user_key, | ||
| timer_data.windows[0], | ||
| timer_data.fire_timestamp, | ||
| timer_data.paneinfo, | ||
| timer_data.dynamic_timer_tag) | ||
| def process_timer_logic(): | ||
| timer_spec = self.timer_specs[tag] | ||
| self.dofn_runner.process_user_timer( | ||
| timer_spec, | ||
| timer_data.user_key, | ||
| timer_data.windows[0], | ||
| timer_data.fire_timestamp, | ||
| timer_data.paneinfo, | ||
| timer_data.dynamic_timer_tag) | ||
|
|
||
| if self.scoped_timer_processing_state: | ||
| with self.scoped_timer_processing_state: | ||
| process_timer_logic() | ||
| else: | ||
| process_timer_logic() | ||
|
|
||
| def finish(self): | ||
| # type: () -> None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -127,6 +127,41 @@ def test_sampler_transition_overhead(self): | |
| # debug mode). | ||
| self.assertLess(overhead_us, 20.0) | ||
|
|
||
| @retry(reraise=True, stop=stop_after_attempt(3)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this test exercise any of the codepaths that you've added?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test should cover the code paths because I test the new process-timers-millis scoped state.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we exercise the same codepaths when we run in the earlier test. 'process-timers' is just a string name to pass to the sampler. i don't think this test adds any new test coverage.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, I have added a few test cases that should test the code paths that I created.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, will take a look. let's remove this though since it is redundant in any case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack, removed the test case. |
||
| def test_timer_sampler(self): | ||
| # Set up state sampler. | ||
| counter_factory = CounterFactory() | ||
| sampler = statesampler.StateSampler( | ||
| 'timer', counter_factory, sampling_period_ms=1) | ||
|
|
||
| # Duration of the timer processing. | ||
| state_duration_ms = 100 | ||
| margin_of_error = 0.25 | ||
|
|
||
| sampler.start() | ||
| with sampler.scoped_state('step1', 'process-timers', suffix='-millis'): | ||
| time.sleep(state_duration_ms / 1000) | ||
| sampler.stop() | ||
| sampler.commit_counters() | ||
|
|
||
| if not statesampler.FAST_SAMPLER: | ||
| # The slow sampler does not implement sampling, so we won't test it. | ||
| return | ||
|
|
||
| # Test that sampled state timings are close to their expected values. | ||
| expected_counter_values = { | ||
| CounterName( | ||
| 'process-timers-millis', step_name='step1', stage_name='timer'): state_duration_ms, | ||
| } | ||
| for counter in counter_factory.get_counters(): | ||
| self.assertIn(counter.name, expected_counter_values) | ||
| expected_value = expected_counter_values[counter.name] | ||
| actual_value = counter.value() | ||
| deviation = float(abs(actual_value - expected_value)) / expected_value | ||
| _LOGGER.info('Sampling deviation from expectation: %f', deviation) | ||
| self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error)) | ||
| self.assertLess(actual_value, expected_value * (1.0 + margin_of_error)) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| logging.getLogger().setLevel(logging.INFO) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my understanding, when do we have the case where state_sampler is None / not available?
Trying to understand what prompted to add Noop state sampler with this particular change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was debugging CI failures, I noticed that the state_sampler is None when operations are run in a non-streaming context. The autocomplete_test was an example of this.
Adding the Noop state ensures that the code is safe to execute in all contexts, both for the new timer metric and or the existing start, process, and finish states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess i still don't understand why this condition was not necessary before but became necessary now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe my change exposed a bug in the Operation class where it would crash during initialization if the state_sampler was None. This happens in batch pipelines I believe, which is why tests like the autocomplete_test.py were failing in presubmit checks.
Adding the if self.state_sampler: check and using a NoOpScopedState in the else block makes the code more robust and ensures it works correctly in both streaming and batch contexts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried reverting this branch and running the autocomplete test. it didn't fail.
Let me try running the test suite without this branch. I'd like to understand why this is necessary.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying out in #36794 , let's see what is failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not seeing these errors - am I missing something?
I do notice the following errors:
This suggests that the
test_process_timers_metric_is_recordedtest is flaky. PTAL