-
Notifications
You must be signed in to change notification settings - Fork 126
Serialization context #1102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Serialization context #1102
Conversation
56db57e
to
3e53f35
Compare
8e0e193
to
4a0b661
Compare
4a0b661
to
6000b0d
Compare
timeout=input.rpc_timeout, | ||
) | ||
|
||
def _async_activity_data_converter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one seems odd. Is this when the client comes back to complete an activity out of band, they don't give us a lot of the information?
temporalio/worker/_workflow.py
Outdated
workflow = _RunningWorkflow( | ||
self._create_workflow_instance(act, init_job) | ||
) | ||
workflow_instance, det = self._create_workflow_instance(act, init_job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems potentially problematic. The act
given to create workflow instance is now given prior to decoding. Maybe not a problem, but is there a reason to change it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't currently need to be decoded at this stage but I think you're right that this can be done less intrusively: we can get the workflow ID from init_job.workflow_id
. I'll make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does technically need to be decoded at this stage because info needs decoded memo and headers
WorkflowExecution._from_raw_info(v, self._client.data_converter) | ||
WorkflowExecution._from_raw_info( | ||
v, | ||
self._client.data_converter._with_context( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to create a new context-specific converter for each page here? No strong opinion.
) -> None: | ||
"""Create workflow handle.""" | ||
self._client = client | ||
self._data_converter = client.data_converter._with_context( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is often created by people that don't care about it (e.g. they start a workflow and don't care about the handle). Are there concerns about creating a context-specific data converter in all cases even if it's never used? I wonder if we should build the converter each call when they make the call, same as top-level client calls.
return self._client.data_converter._with_context( | ||
ActivitySerializationContext( | ||
namespace=self._client.namespace, | ||
workflow_id=( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we did here in Java and .NET is had this async activity client/handle implement WithSerializationContext
since we don't always have the workflow ID. This way, users who know some of this information can do a "with context" to get context-specific async activity client, and they can choose which fields they are ok being empty and such. The task token approach is by far the most common approach (though in general async activity completion is not that common), so I think we may need to just put this in front of users to let them set the context.
namespace: str | ||
workflow_id: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In .NET and Java we had a common interface for both workflow and activity serialization context to show they both had these two fields. No problem not doing here, just noting it if you wanted to.
temporalio/converter.py
Outdated
during serialization and deserialization. | ||
""" | ||
|
||
def with_context(self, context: Optional[SerializationContext]) -> Self: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in other SDKs, I am not sure there is ever expected to be a situation where this is called with None
. With context always assumes it will be with a context and developers don't have to code around the absence of one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed Optional
here. In a previous version of the PR I was passing None
to Nexus contexts but not any longer.
data_converter = self._data_converter | ||
if activity.info: | ||
context = temporalio.converter.ActivitySerializationContext( | ||
namespace=activity.info.workflow_namespace, | ||
workflow_id=activity.info.workflow_id, | ||
workflow_type=activity.info.workflow_type, | ||
activity_type=activity.info.activity_type, | ||
activity_task_queue=self._task_queue, | ||
is_local=activity.info.is_local, | ||
) | ||
data_converter = data_converter._with_context(context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to get this off the running activity instead of recreating every heartbeat? I know we store the payload converter on the activity
context which is being accessed here, maybe we should store the data converter instead and just return its payload converter from activity.payload_converter()
? I am unsure if this affects how multiprocessing and pickling work.
temporalio/worker/_workflow.py
Outdated
workflow = _RunningWorkflow( | ||
self._create_workflow_instance(act, init_job) | ||
) | ||
workflow_instance, det = self._create_workflow_instance(act, init_job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned with the refactoring that act
(and its init_job
) has not run through the codec by this point where it had before. I think the logic needs to stay using the codec before workflow instance creation code, but you need to extract the workflow ID from the init_job
or the running workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, we're doing that now.
temporalio/worker/_workflow.py
Outdated
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, | ||
init: temporalio.bridge.proto.workflow_activation.InitializeWorkflow, | ||
) -> WorkflowInstance: | ||
) -> tuple[WorkflowInstance, WorkflowInstanceDetails]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need to change the entire return type here just to get workflow ID. Caller just extract it out of the init job and we don't have to mutate this code at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, the changes to this function have been reverted in connection to discussion above.
self._payload_converter_class = det.payload_converter_class | ||
self._failure_converter_class = det.failure_converter_class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need to store these. The "with context" can be called on the already-created converters, we should not re-instantiate converters more than once per instance IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline and resolved in recent commits
payload_converter = self._payload_converter_class() | ||
failure_converter = self._failure_converter_class() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mentioned above, but I don't believe we need to reinstantiate converters multiple times in this instance, just call the "with context" on the already existing ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline and resolved in recent commits
6000b0d
to
a0f4c63
Compare
adf8bc3
to
d3311e6
Compare
) | ||
workflow_id = init_job.workflow_id | ||
else: | ||
workflow_id = workflow.workflow_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Workflow ID Missing in Running Workflow Instances
Accessing workflow.workflow_id
on existing _RunningWorkflow
instances can cause an AttributeError
. This occurs because instances created with the previous constructor lack the workflow_id
attribute, which is now expected during workflow activation.
Fixes #796
with_context
API for creating context-aware data converters