Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import concurrent.futures
import dataclasses
import logging
import os
import sys
Expand Down Expand Up @@ -33,6 +34,7 @@
import temporalio.converter
import temporalio.exceptions
import temporalio.workflow
from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner

from . import _command_aware_visitor
from ._interceptor import (
Expand Down Expand Up @@ -82,6 +84,9 @@ def __init__(
assert_local_activity_valid: Callable[[str], None],
encode_headers: bool,
) -> None:
# Debug mode is enabled if specified or if the TEMPORAL_DEBUG env var is truthy
debug_mode = debug_mode or bool(os.environ.get("TEMPORAL_DEBUG"))

self._bridge_worker = bridge_worker
self._namespace = namespace
self._task_queue = task_queue
Expand All @@ -93,7 +98,19 @@ def __init__(
)
)
self._workflow_task_executor_user_provided = workflow_task_executor is not None

# If debug mode is enabled, ensure that the debugpy (https://github.com/microsoft/debugpy)
# import is added as a passthrough
if debug_mode and isinstance(workflow_runner, SandboxedWorkflowRunner):
workflow_runner = dataclasses.replace(
workflow_runner,
restrictions=workflow_runner.restrictions.with_passthrough_modules(
"_pydevd_bundle"
),
)

self._workflow_runner = workflow_runner

self._unsandboxed_workflow_runner = unsandboxed_workflow_runner
self._data_converter = data_converter
# Build the interceptor classes and collect extern functions
Expand Down Expand Up @@ -121,11 +138,9 @@ def __init__(
self._encode_headers = encode_headers
self._throw_after_activation: Exception | None = None

# If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable
# deadlock detection, otherwise set to 2 seconds
self._deadlock_timeout_seconds = (
None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2
)
# If debug mode is enabled, disable deadlock detection
# otherwise set to 2 seconds
self._deadlock_timeout_seconds = None if debug_mode else 2

# Keep track of workflows that could not be evicted
self._could_not_evict_count = 0
Expand Down
57 changes: 57 additions & 0 deletions tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import concurrent.futures
import multiprocessing
import multiprocessing.context
import os
import uuid
from collections.abc import Awaitable, Callable, Sequence
from contextlib import contextmanager
from datetime import timedelta
from typing import Any, Optional
from urllib.request import urlopen
Expand Down Expand Up @@ -59,6 +61,7 @@
WorkerTuner,
WorkflowSlotInfo,
)
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
from temporalio.workflow import DynamicWorkflowConfig, VersioningIntent
from tests.helpers import (
assert_eventually,
Expand Down Expand Up @@ -1264,3 +1267,57 @@ def test_fork_use_worker(
nexus_service_handlers=[],
)
self.run(mp_fork_ctx)


async def test_worker_debug_mode(client: Client):
worker = Worker(
client,
workflows=[SimpleWorkflow],
task_queue=f"task-queue-{uuid.uuid4()}",
)
assert worker._workflow_worker
assert worker._workflow_worker._deadlock_timeout_seconds == 2
assert isinstance(worker._workflow_worker._workflow_runner, SandboxedWorkflowRunner)
assert (
"_pydevd_bundle"
not in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
)

worker = Worker(
client,
workflows=[SimpleWorkflow],
task_queue=f"task-queue-{uuid.uuid4()}",
debug_mode=True,
)
assert worker._workflow_worker
assert worker._workflow_worker._deadlock_timeout_seconds is None
assert isinstance(worker._workflow_worker._workflow_runner, SandboxedWorkflowRunner)
assert (
"_pydevd_bundle"
in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
)

@contextmanager
def debug_envvar():
os.environ["TEMPORAL_DEBUG"] = "true"
try:
yield
finally:
os.environ.pop("TEMPORAL_DEBUG")

with debug_envvar():
worker = Worker(
client,
workflows=[SimpleWorkflow],
task_queue=f"task-queue-{uuid.uuid4()}",
)
assert worker._workflow_worker
assert worker._workflow_worker._deadlock_timeout_seconds is None
assert isinstance(
worker._workflow_worker._workflow_runner,
SandboxedWorkflowRunner,
)
assert (
"_pydevd_bundle"
in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
)
Loading