Skip to content

Conversation

dandavison
Copy link
Contributor

@dandavison dandavison commented Sep 12, 2025

Fixes #796

  • New with_context API for creating context-aware data converters
  • Make serialization context available for all serde / codec operations
  • Best-effort test suite: it's not feasible to provide complete coverage for this change

@dandavison dandavison force-pushed the dan-9986-serialization-context branch from 56db57e to 3e53f35 Compare September 12, 2025 22:22
@dandavison dandavison changed the title Dan 9986 serialization context Serialization context Sep 12, 2025
@dandavison dandavison force-pushed the dan-9986-serialization-context branch 11 times, most recently from 8e0e193 to 4a0b661 Compare September 15, 2025 09:15
@dandavison dandavison marked this pull request as ready for review September 15, 2025 09:17
@dandavison dandavison requested a review from a team as a code owner September 15, 2025 09:17
@dandavison dandavison force-pushed the dan-9986-serialization-context branch from 4a0b661 to 6000b0d Compare September 15, 2025 10:44
timeout=input.rpc_timeout,
)

def _async_activity_data_converter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems odd. Is this when the client comes back to complete an activity out of band, they don't give us a lot of the information?

workflow = _RunningWorkflow(
self._create_workflow_instance(act, init_job)
)
workflow_instance, det = self._create_workflow_instance(act, init_job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems potentially problematic. The act given to create workflow instance is now given prior to decoding. Maybe not a problem, but is there a reason to change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't currently need to be decoded at this stage but I think you're right that this can be done less intrusively: we can get the workflow ID from init_job.workflow_id. I'll make that change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does technically need to be decoded at this stage because info needs decoded memo and headers

WorkflowExecution._from_raw_info(v, self._client.data_converter)
WorkflowExecution._from_raw_info(
v,
self._client.data_converter._with_context(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to create a new context-specific converter for each page here? No strong opinion.

) -> None:
"""Create workflow handle."""
self._client = client
self._data_converter = client.data_converter._with_context(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is often created by people that don't care about it (e.g. they start a workflow and don't care about the handle). Are there concerns about creating a context-specific data converter in all cases even if it's never used? I wonder if we should build the converter each call when they make the call, same as top-level client calls.

return self._client.data_converter._with_context(
ActivitySerializationContext(
namespace=self._client.namespace,
workflow_id=(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we did here in Java and .NET is had this async activity client/handle implement WithSerializationContext since we don't always have the workflow ID. This way, users who know some of this information can do a "with context" to get context-specific async activity client, and they can choose which fields they are ok being empty and such. The task token approach is by far the most common approach (though in general async activity completion is not that common), so I think we may need to just put this in front of users to let them set the context.

Comment on lines +126 to +127
namespace: str
workflow_id: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In .NET and Java we had a common interface for both workflow and activity serialization context to show they both had these two fields. No problem not doing here, just noting it if you wanted to.

during serialization and deserialization.
"""

def with_context(self, context: Optional[SerializationContext]) -> Self:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in other SDKs, I am not sure there is ever expected to be a situation where this is called with None. With context always assumes it will be with a context and developers don't have to code around the absence of one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed Optional here. In a previous version of the PR I was passing None to Nexus contexts but not any longer.

Comment on lines +255 to +265
data_converter = self._data_converter
if activity.info:
context = temporalio.converter.ActivitySerializationContext(
namespace=activity.info.workflow_namespace,
workflow_id=activity.info.workflow_id,
workflow_type=activity.info.workflow_type,
activity_type=activity.info.activity_type,
activity_task_queue=self._task_queue,
is_local=activity.info.is_local,
)
data_converter = data_converter._with_context(context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get this off the running activity instead of recreating every heartbeat? I know we store the payload converter on the activity context which is being accessed here, maybe we should store the data converter instead and just return its payload converter from activity.payload_converter()? I am unsure if this affects how multiprocessing and pickling work.

workflow = _RunningWorkflow(
self._create_workflow_instance(act, init_job)
)
workflow_instance, det = self._create_workflow_instance(act, init_job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned with the refactoring that act (and its init_job) has not run through the codec by this point where it had before. I think the logic needs to stay using the codec before workflow instance creation code, but you need to extract the workflow ID from the init_job or the running workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we're doing that now.

act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
init: temporalio.bridge.proto.workflow_activation.InitializeWorkflow,
) -> WorkflowInstance:
) -> tuple[WorkflowInstance, WorkflowInstanceDetails]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to change the entire return type here just to get workflow ID. Caller just extract it out of the init job and we don't have to mutate this code at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the changes to this function have been reverted in connection to discussion above.

Comment on lines 213 to 214
self._payload_converter_class = det.payload_converter_class
self._failure_converter_class = det.failure_converter_class
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to store these. The "with context" can be called on the already-created converters, we should not re-instantiate converters more than once per instance IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and resolved in recent commits

Comment on lines 2066 to 2067
payload_converter = self._payload_converter_class()
failure_converter = self._failure_converter_class()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned above, but I don't believe we need to reinstantiate converters multiple times in this instance, just call the "with context" on the already existing ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and resolved in recent commits

@dandavison dandavison force-pushed the dan-9986-serialization-context branch from 6000b0d to a0f4c63 Compare September 16, 2025 03:12
@dandavison dandavison force-pushed the dan-9986-serialization-context branch from adf8bc3 to d3311e6 Compare September 19, 2025 12:17
cursor[bot]

This comment was marked as outdated.

)
workflow_id = init_job.workflow_id
else:
workflow_id = workflow.workflow_id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Workflow ID Missing in Running Workflow Instances

Accessing workflow.workflow_id on existing _RunningWorkflow instances can cause an AttributeError. This occurs because instances created with the previous constructor lack the workflow_id attribute, which is now expected during workflow activation.

Fix in Cursor Fix in Web

@dandavison dandavison marked this pull request as draft September 22, 2025 17:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Serialization context for codecs and converters
3 participants