Skip to content

Commit 167476a

Browse files
committed
Partial revert
1 parent a6b3d4d commit 167476a

File tree

1 file changed

+22
-11
lines changed

1 file changed

+22
-11
lines changed

temporalio/worker/_workflow.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -255,19 +255,36 @@ async def _handle_activation(
255255
)
256256
completion.successful.SetInParent()
257257
try:
258+
# Decode the activation if there's a codec and not cache remove job
259+
258260
if LOG_PROTOS:
259261
logger.debug("Received workflow activation:\n%s", act)
260262

261263
# If the workflow is not running yet, create it
262264
workflow = self._running_workflows.get(act.run_id)
265+
266+
if data_converter.payload_codec:
267+
await temporalio.bridge.worker.decode_activation(
268+
act,
269+
data_converter.payload_codec,
270+
decode_headers=self._encode_headers,
271+
)
263272
if not workflow:
264273
# Must have a initialize job to create instance
265274
if not init_job:
266275
raise RuntimeError(
267276
"Missing initialize workflow, workflow could have unexpectedly been removed from cache"
268277
)
269-
workflow_instance, det = self._create_workflow_instance(act, init_job)
270-
workflow = _RunningWorkflow(workflow_instance, det.info.workflow_id)
278+
data_converter = self._data_converter._with_context(
279+
temporalio.converter.WorkflowSerializationContext(
280+
namespace=self._namespace,
281+
workflow_id=init_job.workflow_id,
282+
)
283+
)
284+
workflow = _RunningWorkflow(
285+
self._create_workflow_instance(act, init_job),
286+
init_job.workflow_id,
287+
)
271288
self._running_workflows[act.run_id] = workflow
272289
elif init_job:
273290
# This should never happen
@@ -281,12 +298,6 @@ async def _handle_activation(
281298
workflow_id=workflow.workflow_id,
282299
)
283300
)
284-
if data_converter.payload_codec:
285-
await temporalio.bridge.worker.decode_activation(
286-
act,
287-
data_converter.payload_codec,
288-
decode_headers=self._encode_headers,
289-
)
290301

291302
# Run activation in separate thread so we can check if it's
292303
# deadlocked
@@ -495,7 +506,7 @@ def _create_workflow_instance(
495506
self,
496507
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
497508
init: temporalio.bridge.proto.workflow_activation.InitializeWorkflow,
498-
) -> tuple[WorkflowInstance, WorkflowInstanceDetails]:
509+
) -> WorkflowInstance:
499510
# Get the definition
500511
defn = self._workflows.get(init.workflow_type, self._dynamic_workflow)
501512
if not defn:
@@ -575,9 +586,9 @@ def _create_workflow_instance(
575586
last_failure=last_failure,
576587
)
577588
if defn.sandboxed:
578-
return self._workflow_runner.create_instance(det), det
589+
return self._workflow_runner.create_instance(det)
579590
else:
580-
return self._unsandboxed_workflow_runner.create_instance(det), det
591+
return self._unsandboxed_workflow_runner.create_instance(det)
581592

582593
def nondeterminism_as_workflow_fail(self) -> bool:
583594
return any(

0 commit comments

Comments
 (0)