-
Notifications
You must be signed in to change notification settings - Fork 126
Description
Hello,
What are you really trying to do?
I want to see proper OTEL metadata (spans and traces) propagated from the client invocation site all the way through workflows and activities, so I can visualize the full path in something like Datalust Seq.
Describe the bug
Propagation works correctly for most workflow client operations, but fails for execute_update_with_start_workflow
(and likely also for start_update_with_start_workflow
).
From inspecting the SDK codebase, it looks like start_update_with_start_workflow
was not included in
_TracingClientOutboundInterceptor
after this PR.
When I add it manually, propagation works as expected.
Expected: OTEL trace and span context from the client are propagated through the workflow update handler and into an activity.
Actual: Trace and span context are missing inside the activity.
Minimal Reproduction
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "temporalio[opentelemetry]>=1.16.0",
# ]
# ///
import asyncio
from typing import Any
from datetime import timedelta
import temporalio
import opentelemetry
from temporalio import activity, workflow
from temporalio.client import WithStartWorkflowOperation
from temporalio.worker import Worker
from temporalio.contrib.opentelemetry import TracingInterceptor, _TracingClientOutboundInterceptor
from temporalio.common import WorkflowIDConflictPolicy
from temporalio.testing import WorkflowEnvironment
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# ---------- OpenTelemetry setup ----------
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
@activity.defn
async def get_trace_id() -> int:
return trace.get_current_span().get_span_context().trace_id
# ---------- Workflow ----------
@workflow.defn
class DemoWorkflow:
@workflow.init
def __init__(self) -> None:
self._done = asyncio.Event()
@workflow.run
async def run(self) -> None:
await self._done.wait()
@workflow.update
async def read_trace_id_from_activity(self) -> int:
try:
return await workflow.execute_activity(
get_trace_id,
start_to_close_timeout=timedelta(seconds=5),
)
finally:
self._done.set()
class _PatchedTracingClientOutboundInterceptor(_TracingClientOutboundInterceptor):
async def start_update_with_start_workflow(
self, input: temporalio.client.StartWorkflowUpdateWithStartInput
) -> temporalio.client.WorkflowUpdateHandle[Any]:
with self.root._start_as_current_span(
f"StartUpdateWithStartWorkflow:{input.start_workflow_input.workflow}",
attributes={"temporalWorkflowID": input.start_workflow_input.id},
input=input.start_workflow_input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().start_update_with_start_workflow(input)
class _PatchedTracingInterceptor(TracingInterceptor):
def intercept_client(
self, next: temporalio.client.OutboundInterceptor
) -> temporalio.client.OutboundInterceptor:
return _PatchedTracingClientOutboundInterceptor(next, self)
# ---------- Main ----------
async def main():
async with await WorkflowEnvironment.start_local(
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
interceptors=[_PatchedTracingInterceptor()],
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# interceptors=[TracingInterceptor()],
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
) as env:
queue_name = "some-queue"
async with Worker(
env.client,
task_queue=queue_name,
workflows=[DemoWorkflow],
activities=[get_trace_id],
):
with tracer.start_as_current_span("invocation-site-span") as span:
invocation_trace_id = span.get_span_context().trace_id
activity_trace_id = await env.client.execute_update_with_start_workflow(
DemoWorkflow.read_trace_id_from_activity,
start_workflow_operation=WithStartWorkflowOperation(
DemoWorkflow.run,
id="otel-update-workflow-id-2",
task_queue=queue_name,
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
),
)
assert activity_trace_id == invocation_trace_id
if __name__ == "__main__":
asyncio.run(main())
Additional context
I would kindly like someone more familiar with the codebase to sanity-check this assessment and my fix. If those are correct, I would gladly make a PR with the fix.