diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 09c429c30..36044fb26 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -4,6 +4,7 @@ import asyncio import concurrent.futures +import dataclasses import logging import os import sys @@ -33,6 +34,7 @@ import temporalio.converter import temporalio.exceptions import temporalio.workflow +from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner from . import _command_aware_visitor from ._interceptor import ( @@ -82,6 +84,9 @@ def __init__( assert_local_activity_valid: Callable[[str], None], encode_headers: bool, ) -> None: + # Debug mode is enabled if specified or if the TEMPORAL_DEBUG env var is truthy + debug_mode = debug_mode or bool(os.environ.get("TEMPORAL_DEBUG")) + self._bridge_worker = bridge_worker self._namespace = namespace self._task_queue = task_queue @@ -93,7 +98,19 @@ def __init__( ) ) self._workflow_task_executor_user_provided = workflow_task_executor is not None + + # If debug mode is enabled, ensure that the debugpy (https://github.com/microsoft/debugpy) + # import is added as a passthrough + if debug_mode and isinstance(workflow_runner, SandboxedWorkflowRunner): + workflow_runner = dataclasses.replace( + workflow_runner, + restrictions=workflow_runner.restrictions.with_passthrough_modules( + "_pydevd_bundle" + ), + ) + self._workflow_runner = workflow_runner + self._unsandboxed_workflow_runner = unsandboxed_workflow_runner self._data_converter = data_converter # Build the interceptor classes and collect extern functions @@ -121,11 +138,9 @@ def __init__( self._encode_headers = encode_headers self._throw_after_activation: Exception | None = None - # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable - # deadlock detection, otherwise set to 2 seconds - self._deadlock_timeout_seconds = ( - None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2 - ) + # If debug mode is enabled, disable deadlock detection + # otherwise set to 2 seconds + self._deadlock_timeout_seconds = None if debug_mode else 2 # Keep track of workflows that could not be evicted self._could_not_evict_count = 0 diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 9779bc633..4b3a921a5 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -4,8 +4,10 @@ import concurrent.futures import multiprocessing import multiprocessing.context +import os import uuid from collections.abc import Awaitable, Callable, Sequence +from contextlib import contextmanager from datetime import timedelta from typing import Any, Optional from urllib.request import urlopen @@ -59,6 +61,7 @@ WorkerTuner, WorkflowSlotInfo, ) +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner from temporalio.workflow import DynamicWorkflowConfig, VersioningIntent from tests.helpers import ( assert_eventually, @@ -1264,3 +1267,57 @@ def test_fork_use_worker( nexus_service_handlers=[], ) self.run(mp_fork_ctx) + + +async def test_worker_debug_mode(client: Client): + worker = Worker( + client, + workflows=[SimpleWorkflow], + task_queue=f"task-queue-{uuid.uuid4()}", + ) + assert worker._workflow_worker + assert worker._workflow_worker._deadlock_timeout_seconds == 2 + assert isinstance(worker._workflow_worker._workflow_runner, SandboxedWorkflowRunner) + assert ( + "_pydevd_bundle" + not in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules + ) + + worker = Worker( + client, + workflows=[SimpleWorkflow], + task_queue=f"task-queue-{uuid.uuid4()}", + debug_mode=True, + ) + assert worker._workflow_worker + assert worker._workflow_worker._deadlock_timeout_seconds is None + assert isinstance(worker._workflow_worker._workflow_runner, SandboxedWorkflowRunner) + assert ( + "_pydevd_bundle" + in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules + ) + + @contextmanager + def debug_envvar(): + os.environ["TEMPORAL_DEBUG"] = "true" + try: + yield + finally: + os.environ.pop("TEMPORAL_DEBUG") + + with debug_envvar(): + worker = Worker( + client, + workflows=[SimpleWorkflow], + task_queue=f"task-queue-{uuid.uuid4()}", + ) + assert worker._workflow_worker + assert worker._workflow_worker._deadlock_timeout_seconds is None + assert isinstance( + worker._workflow_worker._workflow_runner, + SandboxedWorkflowRunner, + ) + assert ( + "_pydevd_bundle" + in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules + )