Skip to content

Commit 6d2924b

Browse files
Add Summary to Nexus operations (#1219)
* Add summary to nexus operations * Update to newer syntax for union where previously missed. Run formatter
1 parent 77f2346 commit 6d2924b

File tree

4 files changed

+156
-64
lines changed

4 files changed

+156
-64
lines changed

temporalio/worker/_interceptor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,13 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
295295

296296
endpoint: str
297297
service: str
298-
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
298+
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any]
299299
input: InputT
300-
schedule_to_close_timeout: Optional[timedelta]
300+
schedule_to_close_timeout: timedelta | None
301301
cancellation_type: temporalio.workflow.NexusOperationCancellationType
302-
headers: Optional[Mapping[str, str]]
303-
output_type: Optional[Type[OutputT]] = None
302+
headers: Mapping[str, str] | None
303+
summary: str | None
304+
output_type: Type[OutputT] | None = None
304305

305306
def __post_init__(self) -> None:
306307
"""Initialize operation-specific attributes after dataclass creation."""

temporalio/worker/_workflow_instance.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,12 +1581,13 @@ async def workflow_start_nexus_operation(
15811581
self,
15821582
endpoint: str,
15831583
service: str,
1584-
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
1584+
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any],
15851585
input: Any,
1586-
output_type: Optional[Type[OutputT]],
1587-
schedule_to_close_timeout: Optional[timedelta],
1586+
output_type: Type[OutputT] | None,
1587+
schedule_to_close_timeout: timedelta | None,
15881588
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
1589-
headers: Optional[Mapping[str, str]],
1589+
headers: Mapping[str, str] | None,
1590+
summary: str | None,
15901591
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
15911592
# start_nexus_operation
15921593
return await self._outbound.start_nexus_operation(
@@ -1599,6 +1600,7 @@ async def workflow_start_nexus_operation(
15991600
schedule_to_close_timeout=schedule_to_close_timeout,
16001601
cancellation_type=cancellation_type,
16011602
headers=headers,
1603+
summary=summary,
16021604
)
16031605
)
16041606

@@ -3330,6 +3332,11 @@ def _apply_schedule_command(self) -> None:
33303332
for key, val in self._input.headers.items():
33313333
v.nexus_header[key] = val
33323334

3335+
if self._input.summary:
3336+
command.user_metadata.summary.CopyFrom(
3337+
self._payload_converter.to_payload(self._input.summary)
3338+
)
3339+
33333340
def _apply_cancel_command(
33343341
self,
33353342
command: temporalio.bridge.proto.workflow_commands.WorkflowCommand,

temporalio/workflow.py

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -858,12 +858,13 @@ async def workflow_start_nexus_operation(
858858
self,
859859
endpoint: str,
860860
service: str,
861-
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
861+
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any],
862862
input: Any,
863-
output_type: Optional[Type[OutputT]],
864-
schedule_to_close_timeout: Optional[timedelta],
863+
output_type: Type[OutputT] | None,
864+
schedule_to_close_timeout: timedelta | None,
865865
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
866-
headers: Optional[Mapping[str, str]],
866+
headers: Mapping[str, str] | None,
867+
summary: str | None,
867868
) -> NexusOperationHandle[OutputT]: ...
868869

869870
@abstractmethod
@@ -5346,10 +5347,11 @@ async def start_operation(
53465347
operation: nexusrpc.Operation[InputT, OutputT],
53475348
input: InputT,
53485349
*,
5349-
output_type: Optional[Type[OutputT]] = None,
5350-
schedule_to_close_timeout: Optional[timedelta] = None,
5350+
output_type: Type[OutputT] | None = None,
5351+
schedule_to_close_timeout: timedelta | None = None,
53515352
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5352-
headers: Optional[Mapping[str, str]] = None,
5353+
headers: Mapping[str, str] | None = None,
5354+
summary: str | None = None,
53535355
) -> NexusOperationHandle[OutputT]: ...
53545356

53555357
# Overload for string operation name
@@ -5360,10 +5362,11 @@ async def start_operation(
53605362
operation: str,
53615363
input: Any,
53625364
*,
5363-
output_type: Optional[Type[OutputT]] = None,
5364-
schedule_to_close_timeout: Optional[timedelta] = None,
5365+
output_type: Type[OutputT] | None = None,
5366+
schedule_to_close_timeout: timedelta | None = None,
53655367
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5366-
headers: Optional[Mapping[str, str]] = None,
5368+
headers: Mapping[str, str] | None = None,
5369+
summary: str | None = None,
53675370
) -> NexusOperationHandle[OutputT]: ...
53685371

53695372
# Overload for workflow_run_operation methods
@@ -5377,10 +5380,11 @@ async def start_operation(
53775380
],
53785381
input: InputT,
53795382
*,
5380-
output_type: Optional[Type[OutputT]] = None,
5381-
schedule_to_close_timeout: Optional[timedelta] = None,
5383+
output_type: Type[OutputT] | None = None,
5384+
schedule_to_close_timeout: timedelta | None = None,
53825385
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5383-
headers: Optional[Mapping[str, str]] = None,
5386+
headers: Mapping[str, str] | None = None,
5387+
summary: str | None = None,
53845388
) -> NexusOperationHandle[OutputT]: ...
53855389

53865390
# Overload for sync_operation methods (async def)
@@ -5394,10 +5398,11 @@ async def start_operation(
53945398
],
53955399
input: InputT,
53965400
*,
5397-
output_type: Optional[Type[OutputT]] = None,
5398-
schedule_to_close_timeout: Optional[timedelta] = None,
5401+
output_type: Type[OutputT] | None = None,
5402+
schedule_to_close_timeout: timedelta | None = None,
53995403
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5400-
headers: Optional[Mapping[str, str]] = None,
5404+
headers: Mapping[str, str] | None = None,
5405+
summary: str | None = None,
54015406
) -> NexusOperationHandle[OutputT]: ...
54025407

54035408
# Overload for sync_operation methods (def)
@@ -5411,10 +5416,11 @@ async def start_operation(
54115416
],
54125417
input: InputT,
54135418
*,
5414-
output_type: Optional[Type[OutputT]] = None,
5415-
schedule_to_close_timeout: Optional[timedelta] = None,
5419+
output_type: Type[OutputT] | None = None,
5420+
schedule_to_close_timeout: timedelta | None = None,
54165421
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5417-
headers: Optional[Mapping[str, str]] = None,
5422+
headers: Mapping[str, str] | None = None,
5423+
summary: str | None = None,
54185424
) -> NexusOperationHandle[OutputT]: ...
54195425

54205426
# Overload for operation_handler
@@ -5427,10 +5433,11 @@ async def start_operation(
54275433
],
54285434
input: InputT,
54295435
*,
5430-
output_type: Optional[Type[OutputT]] = None,
5431-
schedule_to_close_timeout: Optional[timedelta] = None,
5436+
output_type: Type[OutputT] | None = None,
5437+
schedule_to_close_timeout: timedelta | None = None,
54325438
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5433-
headers: Optional[Mapping[str, str]] = None,
5439+
headers: Mapping[str, str] | None = None,
5440+
summary: str | None = None,
54345441
) -> NexusOperationHandle[OutputT]: ...
54355442

54365443
@abstractmethod
@@ -5439,10 +5446,11 @@ async def start_operation(
54395446
operation: Any,
54405447
input: Any,
54415448
*,
5442-
output_type: Optional[Type[OutputT]] = None,
5443-
schedule_to_close_timeout: Optional[timedelta] = None,
5449+
output_type: Type[OutputT] | None = None,
5450+
schedule_to_close_timeout: timedelta | None = None,
54445451
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5445-
headers: Optional[Mapping[str, str]] = None,
5452+
headers: Mapping[str, str] | None = None,
5453+
summary: str | None = None,
54465454
) -> Any:
54475455
"""Start a Nexus operation and return its handle.
54485456
@@ -5469,10 +5477,11 @@ async def execute_operation(
54695477
operation: nexusrpc.Operation[InputT, OutputT],
54705478
input: InputT,
54715479
*,
5472-
output_type: Optional[Type[OutputT]] = None,
5473-
schedule_to_close_timeout: Optional[timedelta] = None,
5480+
output_type: Type[OutputT] | None = None,
5481+
schedule_to_close_timeout: timedelta | None = None,
54745482
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5475-
headers: Optional[Mapping[str, str]] = None,
5483+
headers: Mapping[str, str] | None = None,
5484+
summary: str | None = None,
54765485
) -> OutputT: ...
54775486

54785487
# Overload for string operation name
@@ -5483,10 +5492,11 @@ async def execute_operation(
54835492
operation: str,
54845493
input: Any,
54855494
*,
5486-
output_type: Optional[Type[OutputT]] = None,
5487-
schedule_to_close_timeout: Optional[timedelta] = None,
5495+
output_type: Type[OutputT] | None = None,
5496+
schedule_to_close_timeout: timedelta | None = None,
54885497
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5489-
headers: Optional[Mapping[str, str]] = None,
5498+
headers: Mapping[str, str] | None = None,
5499+
summary: str | None = None,
54905500
) -> OutputT: ...
54915501

54925502
# Overload for workflow_run_operation methods
@@ -5500,10 +5510,11 @@ async def execute_operation(
55005510
],
55015511
input: InputT,
55025512
*,
5503-
output_type: Optional[Type[OutputT]] = None,
5504-
schedule_to_close_timeout: Optional[timedelta] = None,
5513+
output_type: Type[OutputT] | None = None,
5514+
schedule_to_close_timeout: timedelta | None = None,
55055515
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5506-
headers: Optional[Mapping[str, str]] = None,
5516+
headers: Mapping[str, str] | None = None,
5517+
summary: str | None = None,
55075518
) -> OutputT: ...
55085519

55095520
# TODO(nexus-preview): in practice, both these overloads match an async def sync
@@ -5520,10 +5531,11 @@ async def execute_operation(
55205531
],
55215532
input: InputT,
55225533
*,
5523-
output_type: Optional[Type[OutputT]] = None,
5524-
schedule_to_close_timeout: Optional[timedelta] = None,
5534+
output_type: Type[OutputT] | None = None,
5535+
schedule_to_close_timeout: timedelta | None = None,
55255536
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5526-
headers: Optional[Mapping[str, str]] = None,
5537+
headers: Mapping[str, str] | None = None,
5538+
summary: str | None = None,
55275539
) -> OutputT: ...
55285540

55295541
# Overload for sync_operation methods (def)
@@ -5537,10 +5549,11 @@ async def execute_operation(
55375549
],
55385550
input: InputT,
55395551
*,
5540-
output_type: Optional[Type[OutputT]] = None,
5541-
schedule_to_close_timeout: Optional[timedelta] = None,
5552+
output_type: Type[OutputT] | None = None,
5553+
schedule_to_close_timeout: timedelta | None = None,
55425554
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5543-
headers: Optional[Mapping[str, str]] = None,
5555+
headers: Mapping[str, str] | None = None,
5556+
summary: str | None = None,
55445557
) -> OutputT: ...
55455558

55465559
# Overload for operation_handler
@@ -5554,10 +5567,11 @@ async def execute_operation(
55545567
],
55555568
input: InputT,
55565569
*,
5557-
output_type: Optional[Type[OutputT]] = None,
5558-
schedule_to_close_timeout: Optional[timedelta] = None,
5570+
output_type: Type[OutputT] | None = None,
5571+
schedule_to_close_timeout: timedelta | None = None,
55595572
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5560-
headers: Optional[Mapping[str, str]] = None,
5573+
headers: Mapping[str, str] | None = None,
5574+
summary: str | None = None,
55615575
) -> OutputT: ...
55625576

55635577
@abstractmethod
@@ -5566,10 +5580,11 @@ async def execute_operation(
55665580
operation: Any,
55675581
input: Any,
55685582
*,
5569-
output_type: Optional[Type[OutputT]] = None,
5570-
schedule_to_close_timeout: Optional[timedelta] = None,
5583+
output_type: Type[OutputT] | None = None,
5584+
schedule_to_close_timeout: timedelta | None = None,
55715585
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5572-
headers: Optional[Mapping[str, str]] = None,
5586+
headers: Mapping[str, str] | None = None,
5587+
summary: str | None = None,
55735588
) -> Any:
55745589
"""Execute a Nexus operation and return its result.
55755590
@@ -5618,10 +5633,11 @@ async def start_operation(
56185633
operation: Any,
56195634
input: Any,
56205635
*,
5621-
output_type: Optional[Type] = None,
5622-
schedule_to_close_timeout: Optional[timedelta] = None,
5636+
output_type: Type[OutputT] | None = None,
5637+
schedule_to_close_timeout: timedelta | None = None,
56235638
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5624-
headers: Optional[Mapping[str, str]] = None,
5639+
headers: Mapping[str, str] | None = None,
5640+
summary: str | None = None,
56255641
) -> Any:
56265642
return (
56275643
await temporalio.workflow._Runtime.current().workflow_start_nexus_operation(
@@ -5633,6 +5649,7 @@ async def start_operation(
56335649
schedule_to_close_timeout=schedule_to_close_timeout,
56345650
cancellation_type=cancellation_type,
56355651
headers=headers,
5652+
summary=summary,
56365653
)
56375654
)
56385655

@@ -5641,10 +5658,11 @@ async def execute_operation(
56415658
operation: Any,
56425659
input: Any,
56435660
*,
5644-
output_type: Optional[Type] = None,
5645-
schedule_to_close_timeout: Optional[timedelta] = None,
5661+
output_type: Type[OutputT] | None = None,
5662+
schedule_to_close_timeout: timedelta | None = None,
56465663
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
5647-
headers: Optional[Mapping[str, str]] = None,
5664+
headers: Mapping[str, str] | None = None,
5665+
summary: str | None = None,
56485666
) -> Any:
56495667
handle = await self.start_operation(
56505668
operation,
@@ -5653,6 +5671,7 @@ async def execute_operation(
56535671
schedule_to_close_timeout=schedule_to_close_timeout,
56545672
cancellation_type=cancellation_type,
56555673
headers=headers,
5674+
summary=summary,
56565675
)
56575676
return await handle
56585677

0 commit comments

Comments
 (0)