From 7e581e17e4829d6ca6c958e5f35eb1dd7f89d7c0 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 14:42:55 +0200 Subject: [PATCH 1/3] feat: Add flag to capture exceptions on futures in concurrent integration --- sentry_sdk/integrations/concurrent.py | 31 +++- .../concurrent/test_concurrent.py | 152 ++++++++++++++++++ 2 files changed, 181 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index a0b76d1d32..b5ea813d02 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -1,10 +1,11 @@ from functools import wraps -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future import sentry_sdk from sentry_sdk.integrations import Integration from sentry_sdk.scope import use_isolation_scope, use_scope +from sentry_sdk.utils import event_from_exception from typing import TYPE_CHECKING @@ -41,6 +42,32 @@ def wrapped_fn(*args, **kwargs): with use_scope(current_scope): return fn(*args, **kwargs) - return old_submit(self, wrapped_fn, *args, **kwargs) + future = old_submit(self, wrapped_fn, *args, **kwargs) + + def report_exceptions(future: Future): + # type: (Future) -> None + exception = future.exception() + integration = sentry_sdk.get_client().get_integration( + ConcurrentIntegration + ) + + if ( + exception is None + or integration is None + or not integration.record_exceptions_on_futures + ): + return + + event, hint = event_from_exception( + exception, + client_options=sentry_sdk.get_client().options, + mechanism={"type": "concurrent", "handled": False}, + ) + sentry_sdk.capture_event(event, hint=hint) + + if integration.record_exceptions_on_futures: + future.add_done_callback(report_exceptions) + + return future ThreadPoolExecutor.submit = sentry_submit diff --git a/tests/integrations/concurrent/test_concurrent.py b/tests/integrations/concurrent/test_concurrent.py index 74a0d0bcbc..99a6d6072a 100644 --- a/tests/integrations/concurrent/test_concurrent.py +++ b/tests/integrations/concurrent/test_concurrent.py @@ -2,14 +2,149 @@ from concurrent import futures from concurrent.futures import Future, ThreadPoolExecutor +import pytest + import sentry_sdk from sentry_sdk.integrations.concurrent import ConcurrentIntegration +from sentry_sdk.integrations.dedupe import DedupeIntegration +from sentry_sdk.integrations.excepthook import ExcepthookIntegration +from sentry_sdk.integrations.threading import ThreadingIntegration original_submit = ThreadPoolExecutor.submit original_set_exception = Future.set_exception +@pytest.mark.parametrize("record_exceptions_on_futures", (True, False)) +def test_handles_exceptions(sentry_init, capture_events, record_exceptions_on_futures): + sentry_init( + default_integrations=False, + integrations=[ + ConcurrentIntegration( + record_exceptions_on_futures=record_exceptions_on_futures + ) + ], + ) + events = capture_events() + + def crash(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(crash) + with pytest.raises(ZeroDivisionError): + future.result() + + if record_exceptions_on_futures: + (event,) = events + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "concurrent" + assert not exception["mechanism"]["handled"] + else: + assert not events + + +# ThreadPoolExecutor uses threading, but catches exceptions before the Sentry threading integration +@pytest.mark.parametrize( + "potentially_conflicting_integrations", + [ + [ThreadingIntegration(propagate_scope=True)], + [ThreadingIntegration(propagate_scope=False)], + [], + ], +) +def test_threading_enabled_no_duplicate( + sentry_init, capture_events, potentially_conflicting_integrations +): + sentry_init( + default_integrations=False, + integrations=[ + ConcurrentIntegration(), + ] + + potentially_conflicting_integrations, + ) + events = capture_events() + + def crash(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(crash) + with pytest.raises(ZeroDivisionError): + future.result() + + (event,) = events + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "concurrent" + assert not exception["mechanism"]["handled"] + + +def test_concurrent_deduplicates( + sentry_init, capture_events, capture_record_lost_event_calls +): + sentry_init( + default_integrations=False, + integrations=[ + ExcepthookIntegration(), + DedupeIntegration(), + ConcurrentIntegration(record_exceptions_on_futures=True), + ], + ) + events = capture_events() + record_lost_event_calls = capture_record_lost_event_calls() + + def crash(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(crash) + try: + future.result() + except Exception: + sentry_sdk.capture_exception() + + (event,) = events + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + + (lost_event_call,) = record_lost_event_calls + assert lost_event_call == ("event_processor", "error", None, 1) + + +def test_propagates_tag(sentry_init, capture_events): + sentry_init( + default_integrations=False, + integrations=[ConcurrentIntegration()], + ) + events = capture_events() + + def stage1(): + sentry_sdk.get_isolation_scope().set_tag("stage1", "true") + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(stage2) + with pytest.raises(ZeroDivisionError): + future.result() + + def stage2(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(stage1) + future.result() + + (event,) = events + + (exception,) = event["exception"]["values"] + + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "concurrent" + assert not exception["mechanism"]["handled"] + + assert event["tags"]["stage1"] == "true" + + def test_propagates_threadpool_scope(sentry_init, capture_events): sentry_init( default_integrations=False, @@ -65,6 +200,23 @@ def double(number): assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"] +def test_double_patching(sentry_init, capture_events): + sentry_init(integrations=[ConcurrentIntegration()]) + events = capture_events() + + def run(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + for _ in range(10): + executor.submit(run) + + assert len(events) == 10 + for event in events: + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + + def test_scope_data_not_leaked_in_executor(sentry_init): sentry_init( integrations=[ConcurrentIntegration()], From 37032c50091707a06d14714531d4c411d9f372ce Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 15:22:05 +0200 Subject: [PATCH 2/3] Fix typing --- sentry_sdk/integrations/concurrent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index a870393395..9a848909b5 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -52,8 +52,8 @@ def wrapped_fn(*args, **kwargs): future = func(self, wrapped_fn, *args, **kwargs) - def report_exceptions(future: Future): - # type: (Future) -> None + def report_exceptions(future): + # type: (Future[Any]) -> None exception = future.exception() integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration) From 7be052cf38b7ff8a144fbb72184d4ed52731795d Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 15:54:21 +0200 Subject: [PATCH 3/3] Style in tests --- tests/integrations/concurrent/test_concurrent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/concurrent/test_concurrent.py b/tests/integrations/concurrent/test_concurrent.py index 99a6d6072a..21f2594c5c 100644 --- a/tests/integrations/concurrent/test_concurrent.py +++ b/tests/integrations/concurrent/test_concurrent.py @@ -89,7 +89,7 @@ def test_concurrent_deduplicates( integrations=[ ExcepthookIntegration(), DedupeIntegration(), - ConcurrentIntegration(record_exceptions_on_futures=True), + ConcurrentIntegration(), ], ) events = capture_events()