Skip to content

Commit a0f4c63

Browse files
committed
Refactor activation processing
1 parent 167476a commit a0f4c63

File tree

1 file changed

+22
-30
lines changed

1 file changed

+22
-30
lines changed

temporalio/worker/_workflow.py

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -255,49 +255,43 @@ async def _handle_activation(
255255
)
256256
completion.successful.SetInParent()
257257
try:
258-
# Decode the activation if there's a codec and not cache remove job
259-
260258
if LOG_PROTOS:
261259
logger.debug("Received workflow activation:\n%s", act)
262260

263-
# If the workflow is not running yet, create it
264261
workflow = self._running_workflows.get(act.run_id)
265-
266-
if data_converter.payload_codec:
267-
await temporalio.bridge.worker.decode_activation(
268-
act,
269-
data_converter.payload_codec,
270-
decode_headers=self._encode_headers,
271-
)
272262
if not workflow:
273-
# Must have a initialize job to create instance
274263
if not init_job:
275264
raise RuntimeError(
276265
"Missing initialize workflow, workflow could have unexpectedly been removed from cache"
277266
)
278-
data_converter = self._data_converter._with_context(
279-
temporalio.converter.WorkflowSerializationContext(
280-
namespace=self._namespace,
281-
workflow_id=init_job.workflow_id,
267+
workflow_id = init_job.workflow_id
268+
else:
269+
workflow_id = workflow.workflow_id
270+
if init_job:
271+
# Should never happen
272+
logger.warning(
273+
"Cache already exists for activation with initialize job"
282274
)
283-
)
284-
workflow = _RunningWorkflow(
285-
self._create_workflow_instance(act, init_job),
286-
init_job.workflow_id,
287-
)
288-
self._running_workflows[act.run_id] = workflow
289-
elif init_job:
290-
# This should never happen
291-
logger.warning(
292-
"Cache already exists for activation with initialize job"
293-
)
294275

295276
data_converter = self._data_converter._with_context(
296277
temporalio.converter.WorkflowSerializationContext(
297278
namespace=self._namespace,
298-
workflow_id=workflow.workflow_id,
279+
workflow_id=workflow_id,
299280
)
300281
)
282+
if data_converter.payload_codec:
283+
await temporalio.bridge.worker.decode_activation(
284+
act,
285+
data_converter.payload_codec,
286+
decode_headers=self._encode_headers,
287+
)
288+
if not workflow:
289+
assert init_job
290+
workflow = _RunningWorkflow(
291+
self._create_workflow_instance(act, init_job),
292+
workflow_id,
293+
)
294+
self._running_workflows[act.run_id] = workflow
301295

302296
# Run activation in separate thread so we can check if it's
303297
# deadlocked
@@ -337,7 +331,6 @@ async def _handle_activation(
337331
"Failed handling activation on workflow with run ID %s", act.run_id
338332
)
339333

340-
# Set completion failure
341334
completion.failed.failure.SetInParent()
342335
try:
343336
data_converter.failure_converter.to_failure(
@@ -354,10 +347,9 @@ async def _handle_activation(
354347
f"Failed converting activation exception: {inner_err}"
355348
)
356349

357-
# Always set the run ID on the completion
358350
completion.run_id = act.run_id
359351

360-
# Encode the completion if there's a codec and not cache remove job
352+
# Encode completion
361353
if data_converter.payload_codec:
362354
try:
363355
await temporalio.bridge.worker.encode_completion(

0 commit comments

Comments
 (0)