Skip to content

Commit 298cdd9

Browse files
committed
PR feedback, fix order of args, remove heartbeat=0 check
1 parent 3255ffd commit 298cdd9

File tree

7 files changed

+65
-13
lines changed

7 files changed

+65
-13
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,7 @@ impl WorkerRef {
675675
}
676676

677677
fn replace_client(&self, client: &client::ClientRef) -> PyResult<()> {
678+
enter_sync!(self.runtime);
678679
self.worker
679680
.as_ref()
680681
.expect("missing worker")

temporalio/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ def __init__(
242242
service_client=service_client,
243243
namespace=namespace,
244244
data_converter=data_converter,
245+
plugins=plugins,
245246
interceptors=interceptors,
246247
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
247248
header_codec_behavior=header_codec_behavior,
248-
plugins=plugins,
249249
)
250250

251251
for plugin in plugins:
@@ -1560,12 +1560,12 @@ class ClientConfig(TypedDict, total=False):
15601560
service_client: Required[temporalio.service.ServiceClient]
15611561
namespace: Required[str]
15621562
data_converter: Required[temporalio.converter.DataConverter]
1563+
plugins: Required[Sequence[Plugin]]
15631564
interceptors: Required[Sequence[Interceptor]]
15641565
default_workflow_query_reject_condition: Required[
15651566
Optional[temporalio.common.QueryRejectCondition]
15661567
]
15671568
header_codec_behavior: Required[HeaderCodecBehavior]
1568-
plugins: Required[Sequence[Plugin]]
15691569

15701570

15711571
class WorkflowHistoryEventFilterType(IntEnum):

temporalio/runtime.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def __init__(
131131
telemetry: Telemetry configuration when not supplying
132132
``runtime_options``.
133133
worker_heartbeat_interval: Interval for worker heartbeats. ``None``
134-
disables heartbeating.
134+
disables heartbeating. Interval must be between 1s and 60s.
135135
136136
Raises:
137137
ValueError: If both ```runtime_options`` is a negative value.
@@ -142,10 +142,6 @@ def __init__(
142142
if worker_heartbeat_interval <= timedelta(0):
143143
raise ValueError("worker_heartbeat_interval must be positive")
144144
heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000)
145-
if heartbeat_millis == 0:
146-
heartbeat_millis = 1
147-
148-
self._heartbeat_millis = heartbeat_millis
149145

150146
runtime_options = temporalio.bridge.runtime.RuntimeOptions(
151147
telemetry=telemetry._to_bridge_config(),

temporalio/worker/_replayer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,12 @@ def on_eviction_hook(
273273
),
274274
),
275275
nonsticky_to_sticky_poll_ratio=1,
276+
no_remote_activities=True,
276277
task_types=temporalio.bridge.worker.WorkerTaskTypes(
277-
True, False, False, False
278+
enable_workflows=True,
279+
enable_local_activities=False,
280+
enable_remote_activities=False,
281+
enable_nexus=False,
278282
),
279283
sticky_queue_schedule_to_start_timeout_millis=1000,
280284
max_heartbeat_throttle_interval_millis=1000,

temporalio/worker/_worker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,8 @@ def check_activity(activity):
583583
or not config["activities"],
584584
task_types=temporalio.bridge.worker.WorkerTaskTypes(
585585
enable_workflows=self._workflow_worker is not None,
586-
enable_local_activities=self._activity_worker is not None,
586+
enable_local_activities=self._activity_worker is not None
587+
and self._workflow_worker is not None,
587588
enable_remote_activities=self._activity_worker is not None
588589
and not config["no_remote_activities"],
589590
enable_nexus=self._nexus_worker is not None,
@@ -885,6 +886,7 @@ class WorkerConfig(TypedDict, total=False):
885886
nexus_task_executor: Optional[concurrent.futures.Executor]
886887
workflow_runner: WorkflowRunner
887888
unsandboxed_workflow_runner: WorkflowRunner
889+
plugins: Sequence[Plugin]
888890
interceptors: Sequence[Interceptor]
889891
build_id: Optional[str]
890892
identity: Optional[str]
@@ -915,7 +917,6 @@ class WorkerConfig(TypedDict, total=False):
915917
workflow_task_poller_behavior: PollerBehavior
916918
activity_task_poller_behavior: PollerBehavior
917919
nexus_task_poller_behavior: PollerBehavior
918-
plugins: Sequence[Plugin]
919920

920921

921922
def _warn_if_activity_executor_max_workers_is_inconsistent(

tests/contrib/openai_agents/test_openai.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2615,8 +2615,7 @@ async def test_split_workers(client: Client):
26152615

26162616
# Workflow worker
26172617
async with new_worker(
2618-
workflow_client,
2619-
HelloWorldAgent,
2618+
workflow_client, HelloWorldAgent, no_remote_activities=True
26202619
) as worker:
26212620
activity_plugin = openai_agents.OpenAIAgentsPlugin(
26222621
model_params=ModelActivityParameters(

tests/worker/test_workflow.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5210,7 +5210,58 @@ async def run(self) -> None:
52105210
await asyncio.sleep(0.1)
52115211

52125212

5213-
async def test_workflow_replace_worker_client(client: Client):
5213+
async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment):
5214+
if env.supports_time_skipping:
5215+
pytest.skip("Only testing against two real servers")
5216+
# We are going to start a second ephemeral server and then replace the
5217+
# client. So we will start a no-cache ticking workflow with the current
5218+
# client and confirm it has accomplished at least one task. Then we will
5219+
# start another on the other client, and confirm it gets started too. Then
5220+
# we will terminate both. We have to use a ticking workflow with only one
5221+
# poller to force a quick re-poll to recognize our client change quickly (as
5222+
# opposed to just waiting the minute for poll timeout).
5223+
async with await WorkflowEnvironment.start_local(
5224+
runtime=client.service_client.config.runtime,
5225+
dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION,
5226+
) as other_env:
5227+
# Start both workflows on different servers
5228+
task_queue = f"tq-{uuid.uuid4()}"
5229+
handle1 = await client.start_workflow(
5230+
TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue
5231+
)
5232+
handle2 = await other_env.client.start_workflow(
5233+
TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue
5234+
)
5235+
5236+
async def any_task_completed(handle: WorkflowHandle) -> bool:
5237+
async for e in handle.fetch_history_events():
5238+
if e.HasField("workflow_task_completed_event_attributes"):
5239+
return True
5240+
return False
5241+
5242+
# Now start the worker on the first env
5243+
async with Worker(
5244+
client,
5245+
task_queue=task_queue,
5246+
workflows=[TickingWorkflow],
5247+
max_cached_workflows=0,
5248+
max_concurrent_workflow_task_polls=1,
5249+
) as worker:
5250+
# Confirm the first ticking workflow has completed a task but not
5251+
# the second
5252+
await assert_eq_eventually(True, lambda: any_task_completed(handle1))
5253+
assert not await any_task_completed(handle2)
5254+
# Now replace the client, which should be used fairly quickly
5255+
# because we should have timer-done poll completions every 100ms
5256+
worker.client = other_env.client
5257+
# Now confirm the other workflow has started
5258+
await assert_eq_eventually(True, lambda: any_task_completed(handle2))
5259+
# Terminate both
5260+
await handle1.terminate()
5261+
await handle2.terminate()
5262+
5263+
5264+
async def test_workflow_replace_worker_client_diff_runtimes_fail(client: Client):
52145265
other_runtime = Runtime(telemetry=TelemetryConfig())
52155266
other_client = await Client.connect(
52165267
client.service_client.config.target_host,

0 commit comments

Comments
 (0)