-
Notifications
You must be signed in to change notification settings - Fork 127
fix(opentelemetry): trace context propagation in process-pool workers #1017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
gregbrowndev
wants to merge
7
commits into
temporalio:main
Choose a base branch
from
gregbrowndev:fix/opentelemetry-trace-context-propagation
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
e08802e
fix(opentelemetry): trace propagation in process pool activities
gregbrowndev 98ed492
wip: add impl and try to make test pass
gregbrowndev 3d2596e
wip: working test case
gregbrowndev 6d703c0
wip: refactor test helpers into separate modules
gregbrowndev c9b4b13
wip: test reflection isn't broken
gregbrowndev a203bf7
Merge remote-tracking branch 'upstream/main' into fix/opentelemetry-t…
gregbrowndev acd0bb8
wip: fix failing test on Python 3.9
gregbrowndev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"python.testing.pytestArgs": [ | ||
"tests" | ||
], | ||
"python.testing.unittestEnabled": false, | ||
"python.testing.pytestEnabled": true | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
77 changes: 77 additions & 0 deletions
77
tests/contrib/opentelemetry/helpers/reflection_interceptor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import dataclasses | ||
import logging | ||
import typing | ||
|
||
import temporalio.worker | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclasses.dataclass(frozen=True) | ||
class InterceptedActivity: | ||
class_name: str | ||
name: typing.Optional[str] | ||
qualname: typing.Optional[str] | ||
module: typing.Optional[str] | ||
annotations: typing.Dict[str, typing.Any] | ||
docstring: typing.Optional[str] | ||
|
||
|
||
class ReflectionInterceptor(temporalio.worker.Interceptor): | ||
"""Interceptor to check we haven't broken reflection when wrapping the activity.""" | ||
|
||
def __init__(self) -> None: | ||
self._intercepted_activities: list[InterceptedActivity] = [] | ||
|
||
def get_intercepted_activities(self) -> typing.List[InterceptedActivity]: | ||
"""Get the list of intercepted activities.""" | ||
return self._intercepted_activities | ||
|
||
def intercept_activity( | ||
self, next: temporalio.worker.ActivityInboundInterceptor | ||
) -> temporalio.worker.ActivityInboundInterceptor: | ||
"""Method called for intercepting an activity. | ||
|
||
Args: | ||
next: The underlying inbound interceptor this interceptor should | ||
delegate to. | ||
|
||
Returns: | ||
The new interceptor that will be used to for the activity. | ||
""" | ||
return _ReflectionActivityInboundInterceptor(next, self) | ||
|
||
|
||
class _ReflectionActivityInboundInterceptor( | ||
temporalio.worker.ActivityInboundInterceptor | ||
): | ||
def __init__( | ||
self, | ||
next: temporalio.worker.ActivityInboundInterceptor, | ||
root: ReflectionInterceptor, | ||
) -> None: | ||
super().__init__(next) | ||
self.root = root | ||
|
||
async def execute_activity( | ||
self, input: temporalio.worker.ExecuteActivityInput | ||
) -> typing.Any: | ||
"""Called to invoke the activity.""" | ||
|
||
try: | ||
self.root._intercepted_activities.append( | ||
InterceptedActivity( | ||
class_name=input.fn.__class__.__name__, | ||
name=getattr(input.fn, "__name__", None), | ||
qualname=getattr(input.fn, "__qualname__", None), | ||
module=getattr(input.fn, "__module__", None), | ||
docstring=getattr(input.fn, "__doc__", None), | ||
annotations=getattr(input.fn, "__annotations__", {}), | ||
) | ||
) | ||
except AttributeError: | ||
logger.exception( | ||
"Activity function does not have expected attributes, skipping reflection." | ||
) | ||
|
||
return await self.next.execute_activity(input) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
from __future__ import annotations | ||
|
||
import multiprocessing | ||
import multiprocessing.managers | ||
import threading | ||
import typing | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union | ||
|
||
import opentelemetry.trace | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
from opentelemetry.sdk.trace.export import ( | ||
SpanExporter, | ||
SpanExportResult, | ||
) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class SerialisableSpan: | ||
"""A serialisable, incomplete representation of a span for testing purposes.""" | ||
|
||
@dataclass(frozen=True) | ||
class SpanContext: | ||
trace_id: int | ||
span_id: int | ||
|
||
@classmethod | ||
def from_span_context( | ||
cls, context: opentelemetry.trace.SpanContext | ||
) -> "SerialisableSpan.SpanContext": | ||
return cls( | ||
trace_id=context.trace_id, | ||
span_id=context.span_id, | ||
) | ||
|
||
@classmethod | ||
def from_optional_span_context( | ||
cls, context: Optional[opentelemetry.trace.SpanContext] | ||
) -> Optional["SerialisableSpan.SpanContext"]: | ||
if context is None: | ||
return None | ||
return cls.from_span_context(context) | ||
|
||
@dataclass(frozen=True) | ||
class Link: | ||
context: SerialisableSpan.SpanContext | ||
attributes: Dict[str, Any] | ||
|
||
name: str | ||
context: Optional[SpanContext] | ||
parent: Optional[SpanContext] | ||
attributes: Dict[str, Any] | ||
links: Sequence[Link] | ||
|
||
@classmethod | ||
def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan": | ||
return cls( | ||
name=span.name, | ||
context=cls.SpanContext.from_optional_span_context(span.context), | ||
parent=cls.SpanContext.from_optional_span_context(span.parent), | ||
attributes=dict(span.attributes or {}), | ||
links=tuple( | ||
cls.Link( | ||
context=cls.SpanContext.from_span_context(link.context), | ||
attributes=dict(span.attributes or {}), | ||
) | ||
for link in span.links | ||
), | ||
) | ||
|
||
|
||
def make_span_proxy_list( | ||
manager: multiprocessing.managers.SyncManager, | ||
) -> multiprocessing.managers.ListProxy[SerialisableSpan]: | ||
"""Create a list proxy to share `SerialisableSpan` across processes.""" | ||
return manager.list() | ||
|
||
|
||
class _ListProxySpanExporter(SpanExporter): | ||
"""Implementation of :class:`SpanExporter` that exports spans to a | ||
list proxy created by a multiprocessing manager. | ||
|
||
This class is used for testing multiprocessing setups, as we can get access | ||
to the finished spans from the parent process. | ||
|
||
In production, you would use `OTLPSpanExporter` or similar to export spans. | ||
Tracing is designed to be distributed, the child process can push collected | ||
spans directly to a collector or backend, which can reassemble the spans | ||
into a single trace. | ||
""" | ||
|
||
def __init__( | ||
self, finished_spans: multiprocessing.managers.ListProxy[SerialisableSpan] | ||
) -> None: | ||
self._finished_spans = finished_spans | ||
self._stopped = False | ||
self._lock = threading.Lock() | ||
|
||
def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: | ||
if self._stopped: | ||
return SpanExportResult.FAILURE | ||
with self._lock: | ||
# Note: ReadableSpan is not picklable, so convert to a DTO | ||
# Note: we could use `span.to_json()` but there isn't a `from_json` | ||
# and the serialisation isn't easily reversible, e.g. `parent` context | ||
# is lost, span/trace IDs are transformed into strings | ||
self._finished_spans.extend( | ||
[SerialisableSpan.from_readable_span(span) for span in spans] | ||
) | ||
return SpanExportResult.SUCCESS | ||
|
||
def shutdown(self) -> None: | ||
self._stopped = True | ||
|
||
def force_flush(self, timeout_millis: int = 30000) -> bool: | ||
return True | ||
|
||
|
||
def dump_spans( | ||
spans: Iterable[Union[ReadableSpan, SerialisableSpan]], | ||
*, | ||
parent_id: Optional[int] = None, | ||
with_attributes: bool = True, | ||
indent_depth: int = 0, | ||
) -> List[str]: | ||
ret: List[str] = [] | ||
for span in spans: | ||
if (not span.parent and parent_id is None) or ( | ||
span.parent and span.parent.span_id == parent_id | ||
): | ||
span_str = f"{' ' * indent_depth}{span.name}" | ||
if with_attributes: | ||
span_str += f" (attributes: {dict(span.attributes or {})})" | ||
# Add links | ||
if span.links: | ||
span_links: List[str] = [] | ||
for link in span.links: | ||
for link_span in spans: | ||
if ( | ||
link_span.context | ||
and link_span.context.span_id == link.context.span_id | ||
): | ||
span_links.append(link_span.name) | ||
span_str += f" (links: {', '.join(span_links)})" | ||
# Signals can duplicate in rare situations, so we make sure not to | ||
# re-add | ||
if "Signal" in span_str and span_str in ret: | ||
continue | ||
ret.append(span_str) | ||
ret += dump_spans( | ||
spans, | ||
parent_id=(span.context.span_id if span.context else None), | ||
with_attributes=with_attributes, | ||
indent_depth=indent_depth + 1, | ||
) | ||
return ret |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I understand what's happening here. Basically you have to make a picklable top-level class that can go over the process boundary, because unlike asyncio (which copies
contextvars
implicitly) and threaded (which we do acopy_context
for you), there is no implicit context propagation across process boundaries.This may not be the last time a process pool user (or even thread pool user), wants to provide initialization code for inside the executor instead of outside where the interceptor code runs. Also, there could be an option where one does this at the executor level. So here are the options as I see it:
ExecuteActivityInput
calledinitializer: Optional[Callable]
that if set we run inside the executor before the actual activity functionopentelemetry
module here, provide some kind ofOpenTelemetryProcessPoolExecutor
that extendsProcessPoolExecutor
and overridessubmit
to do basically what the interceptor is doing here_PicklableActivityWithTraceContext
to clarify specific use and private, and having an opt-out option to return to today's no-propagate behavior)I can see all three options as reasonable but having their own tradeoffs. I'm leaning towards 1 just so other interceptor implementers can also run code on the other side of the executor without wrapping user functions.
@dandavison or @tconley1428 - any additional thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some discussion, we may not be able to confirm/adjust the design in the near term for priority reasons. But as a workaround in the meantime, you can make your own interceptor that runs after the OTel one that does this "copy span into process", at least until we can confer on this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback on this @cretz. Patching the behaviour with a second interceptor for the time being would be a bit easier for users than extending
TracingInterceptor
, like I did in my own project. I'll be sure to contribute it back to the samples repo for others to use. I hope to see this fixed here when priorities allow.Option 1 sounds like the most correct approach to me too, as it avoids manipulating the user's function. But I think both options 1 and 2 would need to apply a sort of "reverse chain of responsibility" pattern, so multiple interceptors can stack together and not override the previous interceptor's setup/clean logic. The pattern in this PR is extensible in this way, but the
functools.wraps
part would need to be clearly documented, maybe in the custom interceptor guide, if accepted.