@@ -99,8 +99,11 @@ informal introduction to the features and their implementation.
9999 - [ Interceptors] ( #interceptors )
100100 - [ Nexus] ( #nexus )
101101 - [ Plugins] ( #plugins )
102- - [ Client Plugins] ( #client-plugins )
103- - [ Worker Plugins] ( #worker-plugins )
102+ - [ Usage] ( #usage-1 )
103+ - [ Plugin Implementations] ( #plugin-implementations )
104+ - [ Advanced Plugin Implementations] ( #advanced-plugin-implementations )
105+ - [ Client Plugins] ( #client-plugins )
106+ - [ Worker Plugins] ( #worker-plugins )
104107 - [ Workflow Replay] ( #workflow-replay )
105108 - [ Observability] ( #observability )
106109 - [ Metrics] ( #metrics )
@@ -1498,10 +1501,80 @@ configuration, and worker execution. Common customizations may include but are n
149815013. Workflows
149915024. Interceptors
15001503
1504+ **Important Notes:**
1505+
1506+ - Client plugins that also implement worker plugin interfaces are automatically propagated to workers
1507+ - Avoid providing the same plugin to both client and worker to prevent double execution
1508+ - Each plugin's `name()` method returns a unique identifier for debugging purposes
1509+
1510+ #### Usage
1511+
1512+ Plugins can be provided to both `Client` and `Worker`.
1513+
1514+ ```python
1515+ # Use the plugin when connecting
1516+ client = await Client.connect(
1517+ "my-server.com:7233",
1518+ plugins=[SomePlugin()]
1519+ )
1520+ ```
1521+ ``` python
1522+ # Use the plugin when creating a worker
1523+ worker = Worker(
1524+ client,
1525+ plugins = [SomePlugin()]
1526+ )
1527+ ```
1528+ In the case of ` Client ` , any plugins will also be provided to any workers created with that client.
1529+ ``` python
1530+ # Create client with the unified plugin
1531+ client = await Client.connect(
1532+ " localhost:7233" ,
1533+ plugins = [SomePlugin()]
1534+ )
1535+
1536+ # Worker will automatically inherit the plugin from the client
1537+ worker = Worker(
1538+ client,
1539+ task_queue = " my-task-queue" ,
1540+ workflows = [MyWorkflow],
1541+ activities = [my_activity]
1542+ )
1543+ ```
1544+ #### Plugin Implementations
1545+
1546+ The easiest way to create your own plugin is to use ` SimplePlugin ` . This takes a number of possible configurations to produce
1547+ a relatively straightforward plugin.
1548+
1549+ ``` python
1550+ plugin = SimplePlugin(
1551+ " MyPlugin" ,
1552+ data_converter = converter,
1553+ )
1554+ ```
1555+
1556+ It is also possible to subclass ` SimplePlugin ` for some additional controls. This is what we do for ` OpenAIAgentsPlugin ` .
1557+
1558+ ``` python
1559+ class MediumPlugin (SimplePlugin ):
1560+ def __init__ (self ):
1561+ super ().__init__ (" MediumPlugin" , data_converter = pydantic_data_converter)
1562+
1563+ def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
1564+ config = super ().configure_worker(config)
1565+ config[" task_queue" ] = " override"
1566+ return config
1567+ ```
1568+
1569+ #### Advanced Plugin Implementations
1570+
1571+ ` SimplePlugin ` doesn't cover all possible uses of plugins. For more unusual use cases, an implementor can implement
1572+ the underlying plugin interfaces directly.
1573+
15011574A single plugin class can implement both client and worker plugin interfaces to share common logic between both
15021575contexts. When used with a client, it will automatically be propagated to any workers created with that client.
15031576
1504- #### Client Plugins
1577+ ##### Client Plugins
15051578
15061579Client plugins can intercept and modify client configuration and service connections. They are useful for adding
15071580authentication, modifying connection parameters, or adding custom behavior during client creation.
@@ -1516,29 +1589,21 @@ class AuthenticationPlugin(Plugin):
15161589 def __init__ (self , api_key : str ):
15171590 self .api_key = api_key
15181591
1519- def init_client_plugin(self, next: Plugin) -> None:
1520- self.next_client_plugin = next
1521-
15221592 def configure_client (self , config : ClientConfig) -> ClientConfig:
15231593 # Modify client configuration
15241594 config[" namespace" ] = " my-secure-namespace"
1525- return self.next_client_plugin.configure_client( config)
1595+ return config
15261596
15271597 async def connect_service_client (
1528- self, config: temporalio.service.ConnectConfig
1598+ self ,
1599+ config : temporalio.service.ConnectConfig,
1600+ next : Callable[[ConnectConfig], Awaitable[ServiceClient]]
15291601 ) -> temporalio.service.ServiceClient:
1530- # Add authentication to the connection
15311602 config.api_key = self .api_key
1532- return await self.next_client_plugin.connect_service_client(config)
1533-
1534- # Use the plugin when connecting
1535- client = await Client.connect(
1536- "my-server.com:7233",
1537- plugins=[AuthenticationPlugin("my-api-key")]
1538- )
1603+ return await next (config)
15391604```
15401605
1541- #### Worker Plugins
1606+ ##### Worker Plugins
15421607
15431608Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
15441609custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
@@ -1558,47 +1623,39 @@ class MonitoringPlugin(Plugin):
15581623 def __init__ (self ):
15591624 self .logger = logging.getLogger(__name__ )
15601625
1561- def init_worker_plugin (self , next : Plugin) -> None :
1562- self .next_worker_plugin = next
1563-
15641626 def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
15651627 # Modify worker configuration
15661628 original_task_queue = config[" task_queue" ]
15671629 config[" task_queue" ] = f " monitored- { original_task_queue} "
15681630 self .logger.info(f " Worker created for task queue: { config[' task_queue' ]} " )
1569- return self .next_worker_plugin.configure_worker( config)
1631+ return config
15701632
1571- async def run_worker (self , worker : Worker) -> None :
1633+ async def run_worker (self , worker : Worker, next : Callable[[Worker], Awaitable[ None ]] ) -> None :
15721634 self .logger.info(" Starting worker execution" )
15731635 try :
1574- await self .next_worker_plugin.run_worker (worker)
1636+ await next (worker)
15751637 finally :
15761638 self .logger.info(" Worker execution completed" )
15771639
15781640 def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
1579- return self .next_worker_plugin.configure_replayer( config)
1641+ return config
15801642
15811643 @asynccontextmanager
15821644 async def run_replayer (
15831645 self ,
15841646 replayer : Replayer,
15851647 histories : AsyncIterator[temporalio.client.WorkflowHistory],
1648+ next : Callable[
1649+ [Replayer, AsyncIterator[WorkflowHistory]],
1650+ AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
1651+ ]
15861652 ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
15871653 self .logger.info(" Starting replay execution" )
15881654 try :
1589- async with self .next_worker_plugin.run_replayer(replayer, histories) as results:
1590- yield results
1655+ async with self .next_worker_plugin.run_replayer(replayer, histories) as results:
1656+ yield results
15911657 finally :
15921658 self .logger.info(" Replay execution completed" )
1593-
1594- # Use the plugin when creating a worker
1595- worker = Worker(
1596- client,
1597- task_queue = " my-task-queue" ,
1598- workflows = [MyWorkflow],
1599- activities = [my_activity],
1600- plugins = [MonitoringPlugin()]
1601- )
16021659```
16031660
16041661For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
@@ -1612,67 +1669,43 @@ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConf
16121669
16131670
16141671class UnifiedPlugin (ClientPlugin , WorkerPlugin ):
1615- def init_client_plugin (self , next : ClientPlugin) -> None :
1616- self .next_client_plugin = next
1617-
1618- def init_worker_plugin (self , next : WorkerPlugin) -> None :
1619- self .next_worker_plugin = next
1620-
16211672 def configure_client (self , config : ClientConfig) -> ClientConfig:
16221673 # Client-side customization
16231674 config[" data_converter" ] = pydantic_data_converter
1624- return self .next_client_plugin.configure_client( config)
1675+ return config
16251676
16261677 async def connect_service_client (
1627- self , config : temporalio.service.ConnectConfig
1678+ self ,
1679+ config : temporalio.service.ConnectConfig,
1680+ next : Callable[[ConnectConfig], Awaitable[ServiceClient]]
16281681 ) -> temporalio.service.ServiceClient:
1629- # Add authentication to the connection
16301682 config.api_key = self .api_key
1631- return await self .next_client_plugin.connect_service_client (config)
1683+ return await next (config)
16321684
16331685 def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
16341686 # Worker-side customization
1635- return self .next_worker_plugin.configure_worker( config)
1687+ return config
16361688
1637- async def run_worker (self , worker : Worker) -> None :
1689+ async def run_worker (self , worker : Worker, next : Callable[[Worker], Awaitable[ None ]] ) -> None :
16381690 print (" Starting unified worker" )
1639- await self .next_worker_plugin.run_worker (worker)
1691+ await next (worker)
16401692
16411693 def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
16421694 config[" data_converter" ] = pydantic_data_converter
1643- return self .next_worker_plugin.configure_replayer( config)
1695+ return config
16441696
16451697 async def run_replayer (
16461698 self ,
16471699 replayer : Replayer,
16481700 histories : AsyncIterator[temporalio.client.WorkflowHistory],
1701+ next : Callable[
1702+ [Replayer, AsyncIterator[WorkflowHistory]],
1703+ AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
1704+ ]
16491705 ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1650- return self .next_worker_plugin.run_replayer(replayer, histories)
1651-
1652- # Create client with the unified plugin
1653- client = await Client.connect(
1654- " localhost:7233" ,
1655- plugins = [UnifiedPlugin()]
1656- )
1657-
1658- # Worker will automatically inherit the plugin from the client
1659- worker = Worker(
1660- client,
1661- task_queue = " my-task-queue" ,
1662- workflows = [MyWorkflow],
1663- activities = [my_activity]
1664- )
1706+ return next (replayer, histories)
16651707```
16661708
1667- ** Important Notes:**
1668-
1669- - Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
1670- - Client plugins that also implement worker plugin interfaces are automatically propagated to workers
1671- - Avoid providing the same plugin to both client and worker to prevent double execution
1672- - Plugin methods should call the plugin provided during initialization to maintain the plugin chain
1673- - Each plugin's ` name() ` method returns a unique identifier for debugging purposes
1674-
1675-
16761709### Workflow Replay
16771710
16781711Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
0 commit comments