Skip to content
31 changes: 30 additions & 1 deletion sentry_sdk/integrations/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sentry_sdk
from sentry_sdk.integrations import Integration
from sentry_sdk.scope import use_isolation_scope, use_scope
from sentry_sdk.utils import event_from_exception

from typing import TYPE_CHECKING

Expand All @@ -17,6 +18,10 @@
class ConcurrentIntegration(Integration):
identifier = "concurrent"

def __init__(self, record_exceptions_on_futures=True):
# type: (bool) -> None
self.record_exceptions_on_futures = record_exceptions_on_futures

@staticmethod
def setup_once():
# type: () -> None
Expand Down Expand Up @@ -45,6 +50,30 @@ def wrapped_fn(*args, **kwargs):
with use_scope(current_scope):
return fn(*args, **kwargs)

return func(self, wrapped_fn, *args, **kwargs)
future = func(self, wrapped_fn, *args, **kwargs)

def report_exceptions(future):
# type: (Future[Any]) -> None
exception = future.exception()
integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration)

if (
exception is None
or integration is None
or not integration.record_exceptions_on_futures
):
return

event, hint = event_from_exception(
exception,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "concurrent", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Exception Callback Fails Context and Cancellation Handling

The report_exceptions callback doesn't use the isolation_scope and current_scope forked at submission time, so captured exception events lack proper context. Also, future.exception() can raise CancelledError if the future was cancelled, causing the callback to fail.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems valid at first sight, I'll see if I can reproduce first-hand before changing


if integration.record_exceptions_on_futures:
future.add_done_callback(report_exceptions)

return future

return sentry_submit
152 changes: 152 additions & 0 deletions tests/integrations/concurrent/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,149 @@
from concurrent import futures
from concurrent.futures import Future, ThreadPoolExecutor

import pytest

import sentry_sdk

from sentry_sdk.integrations.concurrent import ConcurrentIntegration
from sentry_sdk.integrations.dedupe import DedupeIntegration
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
from sentry_sdk.integrations.threading import ThreadingIntegration

original_submit = ThreadPoolExecutor.submit
original_set_exception = Future.set_exception


@pytest.mark.parametrize("record_exceptions_on_futures", (True, False))
def test_handles_exceptions(sentry_init, capture_events, record_exceptions_on_futures):
sentry_init(
default_integrations=False,
integrations=[
ConcurrentIntegration(
record_exceptions_on_futures=record_exceptions_on_futures
)
],
)
events = capture_events()

def crash():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(crash)
with pytest.raises(ZeroDivisionError):
future.result()

if record_exceptions_on_futures:
(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "concurrent"
assert not exception["mechanism"]["handled"]
else:
assert not events


# ThreadPoolExecutor uses threading, but catches exceptions before the Sentry threading integration
@pytest.mark.parametrize(
"potentially_conflicting_integrations",
[
[ThreadingIntegration(propagate_scope=True)],
[ThreadingIntegration(propagate_scope=False)],
[],
],
)
def test_threading_enabled_no_duplicate(
sentry_init, capture_events, potentially_conflicting_integrations
):
sentry_init(
default_integrations=False,
integrations=[
ConcurrentIntegration(),
]
+ potentially_conflicting_integrations,
)
events = capture_events()

def crash():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(crash)
with pytest.raises(ZeroDivisionError):
future.result()

(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "concurrent"
assert not exception["mechanism"]["handled"]


def test_concurrent_deduplicates(
sentry_init, capture_events, capture_record_lost_event_calls
):
sentry_init(
default_integrations=False,
integrations=[
ExcepthookIntegration(),
DedupeIntegration(),
ConcurrentIntegration(),
],
)
events = capture_events()
record_lost_event_calls = capture_record_lost_event_calls()

def crash():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(crash)
try:
future.result()
except Exception:
sentry_sdk.capture_exception()

(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"

(lost_event_call,) = record_lost_event_calls
assert lost_event_call == ("event_processor", "error", None, 1)


def test_propagates_tag(sentry_init, capture_events):
sentry_init(
default_integrations=False,
integrations=[ConcurrentIntegration()],
)
events = capture_events()

def stage1():
sentry_sdk.get_isolation_scope().set_tag("stage1", "true")
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(stage2)
with pytest.raises(ZeroDivisionError):
future.result()

def stage2():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(stage1)
future.result()

(event,) = events

(exception,) = event["exception"]["values"]

assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "concurrent"
assert not exception["mechanism"]["handled"]

assert event["tags"]["stage1"] == "true"


def test_propagates_threadpool_scope(sentry_init, capture_events):
sentry_init(
default_integrations=False,
Expand Down Expand Up @@ -65,6 +200,23 @@ def double(number):
assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"]


def test_double_patching(sentry_init, capture_events):
sentry_init(integrations=[ConcurrentIntegration()])
events = capture_events()

def run():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
for _ in range(10):
executor.submit(run)

assert len(events) == 10
for event in events:
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"


def test_scope_data_not_leaked_in_executor(sentry_init):
sentry_init(
integrations=[ConcurrentIntegration()],
Expand Down