Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 119 additions & 108 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1509,32 +1509,34 @@ authentication, modifying connection parameters, or adding custom behavior durin
Here's an example of a client plugin that adds custom authentication:

```python
from temporalio.client import Plugin, ClientConfig
from temporalio.client import LowLevelPlugin, ClientConfig
import temporalio.service

class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: Plugin) -> None:
self.next_client_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

class AuthenticationPlugin(LowLevelPlugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: LowLevelPlugin) -> None:
self.next_client_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)


# Use the plugin when connecting
client = await Client.connect(
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
)
```

Expand All @@ -1551,53 +1553,59 @@ Here's an example of a worker plugin that adds custom monitoring:
import temporalio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
from temporalio.worker import LowLevelPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
import logging

class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: Plugin) -> None:
self.next_worker_plugin = next

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)

@asynccontextmanager
async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")

class MonitoringPlugin(LowLevelPlugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: LowLevelPlugin) -> None:
self.next_worker_plugin = next

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")


def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)


@asynccontextmanager


async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")


# Use the plugin when creating a worker
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
)
```

Expand All @@ -1607,60 +1615,63 @@ For plugins that need to work with both clients and workers, you can implement b
import temporalio
from contextlib import AbstractAsyncContextManager
from typing import AsyncIterator
from temporalio.client import Plugin as ClientPlugin, ClientConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
from temporalio.client import LowLevelPlugin as ClientPlugin, ClientConfig
from temporalio.worker import LowLevelPlugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer,

WorkflowReplayResult


class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def init_client_plugin(self, next: ClientPlugin) -> None:
self.next_client_plugin = next

def init_worker_plugin(self, next: WorkerPlugin) -> None:
self.next_worker_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return config

async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.run_replayer(replayer, histories)

def init_client_plugin(self, next: ClientPlugin) -> None:
self.next_client_plugin = next

def init_worker_plugin(self, next: WorkerPlugin) -> None:
self.next_worker_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return config

async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.run_replayer(replayer, histories)


# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[UnifiedPlugin()]
"localhost:7233",
plugins=[UnifiedPlugin()]
)

# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
```

Expand Down
18 changes: 9 additions & 9 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def connect(
namespace: str = "default",
api_key: Optional[str] = None,
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
plugins: Sequence[LowLevelPlugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand Down Expand Up @@ -190,7 +190,7 @@ async def connect(
http_connect_proxy_config=http_connect_proxy_config,
)

root_plugin: Plugin = _RootPlugin()
root_plugin: LowLevelPlugin = _RootPlugin()
for plugin in reversed(plugins):
plugin.init_client_plugin(root_plugin)
root_plugin = plugin
Expand All @@ -213,7 +213,7 @@ def __init__(
*,
namespace: str = "default",
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
plugins: Sequence[LowLevelPlugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand All @@ -235,7 +235,7 @@ def __init__(
plugins=plugins,
)

root_plugin: Plugin = _RootPlugin()
root_plugin: LowLevelPlugin = _RootPlugin()
for plugin in reversed(plugins):
plugin.init_client_plugin(root_plugin)
root_plugin = plugin
Expand Down Expand Up @@ -1540,7 +1540,7 @@ class ClientConfig(TypedDict, total=False):
Optional[temporalio.common.QueryRejectCondition]
]
header_codec_behavior: Required[HeaderCodecBehavior]
plugins: Required[Sequence[Plugin]]
plugins: Required[Sequence[LowLevelPlugin]]


class WorkflowHistoryEventFilterType(IntEnum):
Expand Down Expand Up @@ -7367,7 +7367,7 @@ async def _decode_user_metadata(
)


class Plugin(abc.ABC):
class LowLevelPlugin(abc.ABC):
"""Base class for client plugins that can intercept and modify client behavior.

Plugins allow customization of client creation and service connection processes
Expand All @@ -7387,7 +7387,7 @@ def name(self) -> str:
return type(self).__module__ + "." + type(self).__qualname__

@abstractmethod
def init_client_plugin(self, next: Plugin) -> None:
def init_client_plugin(self, next: LowLevelPlugin) -> None:
"""Initialize this plugin in the plugin chain.

This method sets up the chain of responsibility pattern by providing a reference
Expand Down Expand Up @@ -7433,8 +7433,8 @@ async def connect_service_client(
"""


class _RootPlugin(Plugin):
def init_client_plugin(self, next: Plugin) -> None:
class _RootPlugin(LowLevelPlugin):
def init_client_plugin(self, next: LowLevelPlugin) -> None:
raise NotImplementedError()

def configure_client(self, config: ClientConfig) -> ClientConfig:
Expand Down
10 changes: 6 additions & 4 deletions temporalio/contrib/openai_agents/_temporal_openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import temporalio.client
import temporalio.worker
from temporalio.client import ClientConfig, Plugin
from temporalio.client import ClientConfig, LowLevelPlugin
from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner
Expand Down Expand Up @@ -150,7 +150,9 @@ def __init__(self) -> None:
super().__init__(ToJsonOptions(exclude_unset=True))


class OpenAIAgentsPlugin(temporalio.client.Plugin, temporalio.worker.Plugin):
class OpenAIAgentsPlugin(
temporalio.client.LowLevelPlugin, temporalio.worker.LowLevelPlugin
):
"""Temporal plugin for integrating OpenAI agents with Temporal workflows.

.. warning::
Expand Down Expand Up @@ -233,7 +235,7 @@ def __init__(
self._model_params = model_params
self._model_provider = model_provider

def init_client_plugin(self, next: temporalio.client.Plugin) -> None:
def init_client_plugin(self, next: temporalio.client.LowLevelPlugin) -> None:
"""Set the next client plugin"""
self.next_client_plugin = next

Expand All @@ -243,7 +245,7 @@ async def connect_service_client(
"""No modifications to service client"""
return await self.next_client_plugin.connect_service_client(config)

def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None:
def init_worker_plugin(self, next: temporalio.worker.LowLevelPlugin) -> None:
"""Set the next worker plugin"""
self.next_worker_plugin = next

Expand Down
Loading
Loading