Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
50 changes: 50 additions & 0 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from __future__ import annotations

import concurrent.futures
import dataclasses
import functools
import inspect
from contextlib import contextmanager
from dataclasses import dataclass
from typing import (
Expand Down Expand Up @@ -292,9 +296,55 @@ async def execute_activity(
},
kind=opentelemetry.trace.SpanKind.SERVER,
):
# Propagate trace_context into synchronous activities running in
# ProcessPoolExecutor
is_async = inspect.iscoroutinefunction(
input.fn
) or inspect.iscoroutinefunction(
input.fn.__call__ # type: ignore
)
is_threadpool_executor = isinstance(
input.executor, concurrent.futures.ThreadPoolExecutor
)
if not (is_async or is_threadpool_executor):
carrier: _CarrierDict = {}
default_text_map_propagator.inject(carrier)
input.fn = ActivityFnWithTraceContext(input.fn, carrier)

return await super().execute_activity(input)


@dataclasses.dataclass
class ActivityFnWithTraceContext:
Copy link
Member

@cretz cretz Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I understand what's happening here. Basically you have to make a picklable top-level class that can go over the process boundary, because unlike asyncio (which copies contextvars implicitly) and threaded (which we do a copy_context for you), there is no implicit context propagation across process boundaries.

This may not be the last time a process pool user (or even thread pool user), wants to provide initialization code for inside the executor instead of outside where the interceptor code runs. Also, there could be an option where one does this at the executor level. So here are the options as I see it:

  1. Provide an additional, optional field on ExecuteActivityInput called initializer: Optional[Callable] that if set we run inside the executor before the actual activity function
  2. Inside the opentelemetry module here, provide some kind of OpenTelemetryProcessPoolExecutor that extends ProcessPoolExecutor and overrides submit to do basically what the interceptor is doing here
  3. Do as this PR does (but slight changes such as making this class called _PicklableActivityWithTraceContext to clarify specific use and private, and having an opt-out option to return to today's no-propagate behavior)

I can see all three options as reasonable but having their own tradeoffs. I'm leaning towards 1 just so other interceptor implementers can also run code on the other side of the executor without wrapping user functions.

@dandavison or @tconley1428 - any additional thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some discussion, we may not be able to confirm/adjust the design in the near term for priority reasons. But as a workaround in the meantime, you can make your own interceptor that runs after the OTel one that does this "copy span into process", at least until we can confer on this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback on this @cretz. Patching the behaviour with a second interceptor for the time being would be a bit easier for users than extending TracingInterceptor, like I did in my own project. I'll be sure to contribute it back to the samples repo for others to use. I hope to see this fixed here when priorities allow.

Option 1 sounds like the most correct approach to me too, as it avoids manipulating the user's function. But I think both options 1 and 2 would need to apply a sort of "reverse chain of responsibility" pattern, so multiple interceptors can stack together and not override the previous interceptor's setup/clean logic. The pattern in this PR is extensible in this way, but the functools.wraps part would need to be clearly documented, maybe in the custom interceptor guide, if accepted.

"""Wraps an activity function to inject trace context from a carrier.

This wrapper is intended for sync activities executed in a process pool executor
to ensure tracing features like child spans, trace events, and log-correlation
works properly in the user's activity implementation.
"""

fn: Callable[..., Any]
carrier: _CarrierDict

def __post_init__(self):
"""Post-initialization to ensure the function is wrapped correctly.

Ensures the original function's metadata is preserved for reflection.
Downstream interceptors that may inspect the function's attributes,
like `__module__`, `__name__`, etc. (e.g. the `SentryInterceptor`
in the Python Samples.)
"""
functools.wraps(self.fn)(self)

def __call__(self, *args: Any, **kwargs: Any): # noqa: D102
trace_context = default_text_map_propagator.extract(self.carrier)
token = opentelemetry.context.attach(trace_context)
try:
return self.fn(*args, **kwargs)
finally:
opentelemetry.context.detach(token)


class _InputWithHeaders(Protocol):
headers: Mapping[str, temporalio.api.common.v1.Payload]

Expand Down
Empty file.
77 changes: 77 additions & 0 deletions tests/contrib/opentelemetry/helpers/reflection_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import dataclasses
import logging
import typing

import temporalio.worker

logger = logging.getLogger(__name__)


@dataclasses.dataclass(frozen=True)
class InterceptedActivity:
class_name: str
name: typing.Optional[str]
qualname: typing.Optional[str]
module: typing.Optional[str]
annotations: typing.Dict[str, typing.Any]
docstring: typing.Optional[str]


class ReflectionInterceptor(temporalio.worker.Interceptor):
"""Interceptor to check we haven't broken reflection when wrapping the activity."""

def __init__(self) -> None:
self._intercepted_activities: list[InterceptedActivity] = []

def get_intercepted_activities(self) -> typing.List[InterceptedActivity]:
"""Get the list of intercepted activities."""
return self._intercepted_activities

def intercept_activity(
self, next: temporalio.worker.ActivityInboundInterceptor
) -> temporalio.worker.ActivityInboundInterceptor:
"""Method called for intercepting an activity.

Args:
next: The underlying inbound interceptor this interceptor should
delegate to.

Returns:
The new interceptor that will be used to for the activity.
"""
return _ReflectionActivityInboundInterceptor(next, self)


class _ReflectionActivityInboundInterceptor(
temporalio.worker.ActivityInboundInterceptor
):
def __init__(
self,
next: temporalio.worker.ActivityInboundInterceptor,
root: ReflectionInterceptor,
) -> None:
super().__init__(next)
self.root = root

async def execute_activity(
self, input: temporalio.worker.ExecuteActivityInput
) -> typing.Any:
"""Called to invoke the activity."""

try:
self.root._intercepted_activities.append(
InterceptedActivity(
class_name=input.fn.__class__.__name__,
name=getattr(input.fn, "__name__", None),
qualname=getattr(input.fn, "__qualname__", None),
module=getattr(input.fn, "__module__", None),
docstring=getattr(input.fn, "__doc__", None),
annotations=getattr(input.fn, "__annotations__", {}),
)
)
except AttributeError:
logger.exception(
"Activity function does not have expected attributes, skipping reflection."
)

return await self.next.execute_activity(input)
156 changes: 156 additions & 0 deletions tests/contrib/opentelemetry/helpers/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from __future__ import annotations

import multiprocessing
import multiprocessing.managers
import threading
import typing
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union

import opentelemetry.trace
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import (
SpanExporter,
SpanExportResult,
)


@dataclass(frozen=True)
class SerialisableSpan:
"""A serialisable, incomplete representation of a span for testing purposes."""

@dataclass(frozen=True)
class SpanContext:
trace_id: int
span_id: int

@classmethod
def from_span_context(
cls, context: opentelemetry.trace.SpanContext
) -> "SerialisableSpan.SpanContext":
return cls(
trace_id=context.trace_id,
span_id=context.span_id,
)

@classmethod
def from_optional_span_context(
cls, context: Optional[opentelemetry.trace.SpanContext]
) -> Optional["SerialisableSpan.SpanContext"]:
if context is None:
return None
return cls.from_span_context(context)

@dataclass(frozen=True)
class Link:
context: SerialisableSpan.SpanContext
attributes: Dict[str, Any]

name: str
context: Optional[SpanContext]
parent: Optional[SpanContext]
attributes: Dict[str, Any]
links: Sequence[Link]

@classmethod
def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan":
return cls(
name=span.name,
context=cls.SpanContext.from_optional_span_context(span.context),
parent=cls.SpanContext.from_optional_span_context(span.parent),
attributes=dict(span.attributes or {}),
links=tuple(
cls.Link(
context=cls.SpanContext.from_span_context(link.context),
attributes=dict(span.attributes or {}),
)
for link in span.links
),
)


def make_span_proxy_list(
manager: multiprocessing.managers.SyncManager,
) -> multiprocessing.managers.ListProxy[SerialisableSpan]:
"""Create a list proxy to share `SerialisableSpan` across processes."""
return manager.list()


class _ListProxySpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that exports spans to a
list proxy created by a multiprocessing manager.

This class is used for testing multiprocessing setups, as we can get access
to the finished spans from the parent process.

In production, you would use `OTLPSpanExporter` or similar to export spans.
Tracing is designed to be distributed, the child process can push collected
spans directly to a collector or backend, which can reassemble the spans
into a single trace.
"""

def __init__(
self, finished_spans: multiprocessing.managers.ListProxy[SerialisableSpan]
) -> None:
self._finished_spans = finished_spans
self._stopped = False
self._lock = threading.Lock()

def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
if self._stopped:
return SpanExportResult.FAILURE
with self._lock:
# Note: ReadableSpan is not picklable, so convert to a DTO
# Note: we could use `span.to_json()` but there isn't a `from_json`
# and the serialisation isn't easily reversible, e.g. `parent` context
# is lost, span/trace IDs are transformed into strings
self._finished_spans.extend(
[SerialisableSpan.from_readable_span(span) for span in spans]
)
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
self._stopped = True

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


def dump_spans(
spans: Iterable[Union[ReadableSpan, SerialisableSpan]],
*,
parent_id: Optional[int] = None,
with_attributes: bool = True,
indent_depth: int = 0,
) -> List[str]:
ret: List[str] = []
for span in spans:
if (not span.parent and parent_id is None) or (
span.parent and span.parent.span_id == parent_id
):
span_str = f"{' ' * indent_depth}{span.name}"
if with_attributes:
span_str += f" (attributes: {dict(span.attributes or {})})"
# Add links
if span.links:
span_links: List[str] = []
for link in span.links:
for link_span in spans:
if (
link_span.context
and link_span.context.span_id == link.context.span_id
):
span_links.append(link_span.name)
span_str += f" (links: {', '.join(span_links)})"
# Signals can duplicate in rare situations, so we make sure not to
# re-add
if "Signal" in span_str and span_str in ret:
continue
ret.append(span_str)
ret += dump_spans(
spans,
parent_id=(span.context.span_id if span.context else None),
with_attributes=with_attributes,
indent_depth=indent_depth + 1,
)
return ret
Loading
Loading