Skip to content

Commit fbb745a

Browse files
committed
Add docstrings
1 parent 32a26ee commit fbb745a

File tree

2 files changed

+112
-92
lines changed

2 files changed

+112
-92
lines changed

temporalio/contrib/opentelemetry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ def workflow_interceptor_class(
142142
def intercept_nexus_operation(
143143
self, next: temporalio.worker.NexusOperationInboundInterceptor
144144
) -> temporalio.worker.NexusOperationInboundInterceptor:
145+
"""Implementation of
146+
:py:meth:`temporalio.worker.Interceptor.intercept_nexus_operation`.
147+
"""
145148
return _TracingNexusOperationInboundInterceptor(next, self)
146149

147150
def _context_to_headers(

temporalio/worker/_interceptor.py

Lines changed: 109 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -85,98 +85,6 @@ def intercept_nexus_operation(
8585
return next
8686

8787

88-
@dataclass
89-
class NexusOperationStartInput:
90-
ctx: nexusrpc.handler.StartOperationContext
91-
input: Any
92-
93-
94-
@dataclass
95-
class NexusOperationCancelInput:
96-
ctx: nexusrpc.handler.CancelOperationContext
97-
token: str
98-
99-
100-
class NexusOperationInboundInterceptor:
101-
def __init__(self, next: NexusOperationInboundInterceptor) -> None:
102-
self.next = next
103-
104-
async def start_operation(
105-
self, input: NexusOperationStartInput
106-
) -> (
107-
nexusrpc.handler.StartOperationResultSync[Any]
108-
| nexusrpc.handler.StartOperationResultAsync
109-
):
110-
return await self.next.start_operation(input)
111-
112-
async def cancel_operation(self, input: NexusOperationCancelInput) -> None:
113-
return await self.next.cancel_operation(input)
114-
115-
116-
class _NexusOperationHandlerForInterceptor(
117-
nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any]
118-
):
119-
def __init__(self, next_interceptor: NexusOperationInboundInterceptor):
120-
self._next_interceptor = next_interceptor
121-
122-
async def start(
123-
self, ctx: nexusrpc.handler.StartOperationContext, input: Any
124-
) -> (
125-
nexusrpc.handler.StartOperationResultSync[Any]
126-
| nexusrpc.handler.StartOperationResultAsync
127-
):
128-
return await self._next_interceptor.start_operation(
129-
NexusOperationStartInput(ctx, input)
130-
)
131-
132-
async def cancel(
133-
self, ctx: nexusrpc.handler.CancelOperationContext, token: str
134-
) -> None:
135-
return await self._next_interceptor.cancel_operation(
136-
NexusOperationCancelInput(ctx, token)
137-
)
138-
139-
140-
class _NexusOperationInboundInterceptorImpl(NexusOperationInboundInterceptor):
141-
def __init__(
142-
self,
143-
handler: nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any],
144-
):
145-
self._handler = handler
146-
147-
async def start_operation(
148-
self, input: NexusOperationStartInput
149-
) -> (
150-
nexusrpc.handler.StartOperationResultSync[Any]
151-
| nexusrpc.handler.StartOperationResultAsync
152-
):
153-
return await self._handler.start(input.ctx, input.input)
154-
155-
async def cancel_operation(self, input: NexusOperationCancelInput) -> None:
156-
return await self._handler.cancel(input.ctx, input.token)
157-
158-
159-
class _NexusMiddlewareForInterceptors(nexusrpc.handler.OperationHandlerMiddleware):
160-
def __init__(self, interceptors: Sequence[Interceptor]) -> None:
161-
self._interceptors = interceptors
162-
163-
def intercept(
164-
self,
165-
ctx: nexusrpc.handler.OperationContext,
166-
next: nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any],
167-
) -> nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any]:
168-
inbound = reduce(
169-
lambda impl, _next: _next.intercept_nexus_operation(impl),
170-
reversed(self._interceptors),
171-
cast(
172-
NexusOperationInboundInterceptor,
173-
_NexusOperationInboundInterceptorImpl(next),
174-
),
175-
)
176-
177-
return _NexusOperationHandlerForInterceptor(inbound)
178-
179-
18088
@dataclass(frozen=True)
18189
class WorkflowInterceptorClassInput:
18290
"""Input for :py:meth:`Interceptor.workflow_interceptor_class`."""
@@ -578,3 +486,112 @@ async def start_nexus_operation(
578486
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
579487
"""Called for every :py:func:`temporalio.workflow.start_nexus_operation` call."""
580488
return await self.next.start_nexus_operation(input)
489+
490+
491+
@dataclass
492+
class NexusOperationStartInput:
493+
"""Input for :pyt:meth:`NexusOperationInboundInterceptor.start_operation"""
494+
495+
ctx: nexusrpc.handler.StartOperationContext
496+
input: Any
497+
498+
499+
@dataclass
500+
class NexusOperationCancelInput:
501+
"""Input for :pyt:meth:`NexusOperationInboundInterceptor.cancel_operation"""
502+
503+
ctx: nexusrpc.handler.CancelOperationContext
504+
token: str
505+
506+
507+
class NexusOperationInboundInterceptor:
508+
"""Inbound interceptor to wrap Nexus operation starting and cancelling.
509+
510+
This should be extended by any Nexus operation inbound interceptors.
511+
"""
512+
513+
def __init__(self, next: NexusOperationInboundInterceptor) -> None:
514+
"""Create the inbound interceptor.
515+
516+
Args:
517+
next: The next interceptor in the chain. The default implementation
518+
of all calls is to delegate to the next interceptor.
519+
"""
520+
self.next = next
521+
522+
async def start_operation(
523+
self, input: NexusOperationStartInput
524+
) -> (
525+
nexusrpc.handler.StartOperationResultSync[Any]
526+
| nexusrpc.handler.StartOperationResultAsync
527+
):
528+
"""Called to start a Nexus operation"""
529+
return await self.next.start_operation(input)
530+
531+
async def cancel_operation(self, input: NexusOperationCancelInput) -> None:
532+
"""Called to cancel an in progress Nexus operation"""
533+
return await self.next.cancel_operation(input)
534+
535+
536+
class _NexusOperationHandlerForInterceptor(
537+
nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any]
538+
):
539+
def __init__(self, next_interceptor: NexusOperationInboundInterceptor):
540+
self._next_interceptor = next_interceptor
541+
542+
async def start(
543+
self, ctx: nexusrpc.handler.StartOperationContext, input: Any
544+
) -> (
545+
nexusrpc.handler.StartOperationResultSync[Any]
546+
| nexusrpc.handler.StartOperationResultAsync
547+
):
548+
return await self._next_interceptor.start_operation(
549+
NexusOperationStartInput(ctx, input)
550+
)
551+
552+
async def cancel(
553+
self, ctx: nexusrpc.handler.CancelOperationContext, token: str
554+
) -> None:
555+
return await self._next_interceptor.cancel_operation(
556+
NexusOperationCancelInput(ctx, token)
557+
)
558+
559+
560+
class _NexusOperationInboundInterceptorImpl(NexusOperationInboundInterceptor):
561+
def __init__(
562+
self,
563+
handler: nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any],
564+
):
565+
self._handler = handler
566+
567+
async def start_operation(
568+
self, input: NexusOperationStartInput
569+
) -> (
570+
nexusrpc.handler.StartOperationResultSync[Any]
571+
| nexusrpc.handler.StartOperationResultAsync
572+
):
573+
return await self._handler.start(input.ctx, input.input)
574+
575+
async def cancel_operation(self, input: NexusOperationCancelInput) -> None:
576+
return await self._handler.cancel(input.ctx, input.token)
577+
578+
579+
class _NexusMiddlewareForInterceptors(nexusrpc.handler.OperationHandlerMiddleware):
580+
def __init__(self, interceptors: Sequence[Interceptor]) -> None:
581+
self._interceptors = interceptors
582+
583+
def intercept(
584+
self,
585+
ctx: nexusrpc.handler.OperationContext,
586+
next: nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any],
587+
) -> nexusrpc.handler.MiddlewareSafeOperationHandler[Any, Any]:
588+
inbound = reduce(
589+
lambda impl, _next: _next.intercept_nexus_operation(impl),
590+
reversed(self._interceptors),
591+
cast(
592+
NexusOperationInboundInterceptor,
593+
_NexusOperationInboundInterceptorImpl(next),
594+
),
595+
)
596+
597+
return _NexusOperationHandlerForInterceptor(inbound)

0 commit comments

Comments
 (0)