Skip to content

Commit dc6b988

Browse files
committed
merge
1 parent 79acf6f commit dc6b988

File tree

7 files changed

+385
-0
lines changed

7 files changed

+385
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
3+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
4+
5+
from project.workflow import ExampleTutorialWorkflow
6+
from agentex.lib.utils.debug import setup_debug_if_enabled
7+
from agentex.lib.utils.logging import make_logger
8+
from agentex.lib.environment_variables import EnvironmentVariables
9+
from agentex.lib.core.temporal.activities import get_all_activities
10+
from agentex.lib.core.temporal.workers.worker import AgentexWorker
11+
12+
environment_variables = EnvironmentVariables.refresh()
13+
14+
logger = make_logger(__name__)
15+
16+
17+
async def main():
18+
# Setup debug mode if enabled
19+
setup_debug_if_enabled()
20+
21+
task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
22+
if task_queue_name is None:
23+
raise ValueError("WORKFLOW_TASK_QUEUE is not set")
24+
25+
# Add activities to the worker
26+
all_activities = get_all_activities() + [] # add your own activities here
27+
28+
# Create a worker with automatic tracing
29+
# We are also adding the Open AI Agents SDK plugin to the worker.
30+
worker = AgentexWorker(task_queue=task_queue_name, plugins=[OpenAIAgentsPlugin()])
31+
32+
await worker.run(
33+
activities=all_activities,
34+
workflow=ExampleTutorialWorkflow,
35+
)
36+
37+
38+
if __name__ == "__main__":
39+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""
2+
Tests for example-tutorial (OpenAI Agents SDK Hello World)
3+
4+
Prerequisites:
5+
- AgentEx services running (make dev)
6+
- Temporal server running
7+
- Agent running: agentex agents run --manifest manifest.yaml
8+
9+
Run: pytest tests/test_agent.py -v
10+
"""
11+
12+
import pytest
13+
14+
from agentex.lib.testing import test_agentic_agent, assert_valid_agent_response
15+
16+
AGENT_NAME = "example-tutorial"
17+
18+
19+
@pytest.mark.asyncio
20+
async def test_agent_basic():
21+
"""Test basic agent functionality."""
22+
async with test_agentic_agent(agent_name=AGENT_NAME) as test:
23+
response = await test.send_event("Test message", timeout_seconds=60.0)
24+
assert_valid_agent_response(response)
25+
26+
27+
@pytest.mark.asyncio
28+
async def test_agent_streaming():
29+
"""Test streaming responses."""
30+
async with test_agentic_agent(agent_name=AGENT_NAME) as test:
31+
events = []
32+
async for event in test.send_event_and_stream("Stream test", timeout_seconds=60.0):
33+
events.append(event)
34+
if event.get("type") == "done":
35+
break
36+
assert len(events) > 0
37+
38+
39+
if __name__ == "__main__":
40+
pytest.main([__file__, "-v"])
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
5+
6+
from project.workflow import ExampleTutorialWorkflow
7+
from project.activities import get_weather, deposit_money, withdraw_money
8+
from agentex.lib.utils.debug import setup_debug_if_enabled
9+
from agentex.lib.utils.logging import make_logger
10+
from agentex.lib.environment_variables import EnvironmentVariables
11+
from agentex.lib.core.temporal.activities import get_all_activities
12+
from agentex.lib.core.temporal.workers.worker import AgentexWorker
13+
14+
environment_variables = EnvironmentVariables.refresh()
15+
16+
logger = make_logger(__name__)
17+
18+
19+
async def main():
20+
# Setup debug mode if enabled
21+
setup_debug_if_enabled()
22+
23+
task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
24+
if task_queue_name is None:
25+
raise ValueError("WORKFLOW_TASK_QUEUE is not set")
26+
27+
# Add activities to the worker
28+
all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather] # add your own activities here
29+
30+
# Create a worker with automatic tracing
31+
# We are also adding the Open AI Agents SDK plugin to the worker.
32+
worker = AgentexWorker(
33+
task_queue=task_queue_name,
34+
plugins=[OpenAIAgentsPlugin(model_params=ModelActivityParameters(start_to_close_timeout=timedelta(days=1)))],
35+
)
36+
37+
await worker.run(
38+
activities=all_activities,
39+
workflow=ExampleTutorialWorkflow,
40+
)
41+
42+
43+
if __name__ == "__main__":
44+
asyncio.run(main())
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""
2+
Tests for example-tutorial (OpenAI Agents SDK Tools)
3+
4+
Prerequisites:
5+
- AgentEx services running (make dev)
6+
- Temporal server running
7+
- Agent running: agentex agents run --manifest manifest.yaml
8+
9+
Run: pytest tests/test_agent.py -v
10+
"""
11+
12+
import pytest
13+
14+
from agentex.lib.testing import test_agentic_agent, assert_valid_agent_response
15+
16+
AGENT_NAME = "example-tutorial"
17+
18+
19+
@pytest.mark.asyncio
20+
async def test_agent_basic():
21+
"""Test basic agent functionality."""
22+
async with test_agentic_agent(agent_name=AGENT_NAME) as test:
23+
response = await test.send_event("Test message", timeout_seconds=60.0)
24+
assert_valid_agent_response(response)
25+
26+
27+
@pytest.mark.asyncio
28+
async def test_agent_streaming():
29+
"""Test streaming responses."""
30+
async with test_agentic_agent(agent_name=AGENT_NAME) as test:
31+
events = []
32+
async for event in test.send_event_and_stream("Stream test", timeout_seconds=60.0):
33+
events.append(event)
34+
if event.get("type") == "done":
35+
break
36+
assert len(events) > 0
37+
38+
39+
if __name__ == "__main__":
40+
pytest.main([__file__, "-v"])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
5+
6+
from project.workflow import ChildWorkflow, ExampleTutorialWorkflow
7+
from project.activities import confirm_order, deposit_money, withdraw_money
8+
from agentex.lib.utils.debug import setup_debug_if_enabled
9+
from agentex.lib.utils.logging import make_logger
10+
from agentex.lib.environment_variables import EnvironmentVariables
11+
from agentex.lib.core.temporal.activities import get_all_activities
12+
from agentex.lib.core.temporal.workers.worker import AgentexWorker
13+
14+
environment_variables = EnvironmentVariables.refresh()
15+
16+
logger = make_logger(__name__)
17+
18+
19+
async def main():
20+
# Setup debug mode if enabled
21+
setup_debug_if_enabled()
22+
23+
task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
24+
if task_queue_name is None:
25+
raise ValueError("WORKFLOW_TASK_QUEUE is not set")
26+
27+
# Add activities to the worker
28+
all_activities = get_all_activities() + [
29+
withdraw_money,
30+
deposit_money,
31+
confirm_order,
32+
] # add your own activities here
33+
34+
# Create a worker with automatic tracing
35+
# We are also adding the Open AI Agents SDK plugin to the worker.
36+
worker = AgentexWorker(
37+
task_queue=task_queue_name,
38+
plugins=[OpenAIAgentsPlugin(model_params=ModelActivityParameters(start_to_close_timeout=timedelta(days=1)))],
39+
)
40+
41+
await worker.run(activities=all_activities, workflows=[ExampleTutorialWorkflow, ChildWorkflow])
42+
43+
44+
if __name__ == "__main__":
45+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""
2+
OpenAI Agents SDK + Temporal Integration: Human-in-the-Loop Tutorial
3+
4+
This tutorial demonstrates how to pause agent execution and wait for human approval
5+
using Temporal's child workflows and signals.
6+
7+
KEY CONCEPTS:
8+
- Child workflows: Independent workflows spawned by parent for human interaction
9+
- Signals: External systems can send messages to running workflows
10+
- Durable waiting: Agents can wait indefinitely for human input without losing state
11+
12+
WHY THIS MATTERS:
13+
Without Temporal, if your system crashes while waiting for human approval, you lose
14+
all context. With Temporal, the agent resumes exactly where it left off after
15+
system failures, making human-in-the-loop workflows production-ready.
16+
17+
PATTERN:
18+
1. Agent calls wait_for_confirmation tool
19+
2. Tool spawns child workflow that waits for signal
20+
3. Human approves via CLI/web app
21+
4. Child workflow completes, agent continues
22+
23+
Usage: `temporal workflow signal --workflow-id="child-workflow-id" --name="fulfill_order_signal" --input=true`
24+
"""
25+
26+
import json
27+
import asyncio
28+
29+
from agents import Agent, Runner
30+
from temporalio import workflow
31+
32+
from agentex.lib import adk
33+
from project.tools import wait_for_confirmation
34+
from agentex.lib.types.acp import SendEventParams, CreateTaskParams
35+
from agentex.lib.utils.logging import make_logger
36+
from agentex.types.text_content import TextContent
37+
from agentex.lib.environment_variables import EnvironmentVariables
38+
from agentex.lib.core.temporal.types.workflow import SignalName
39+
from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
40+
41+
environment_variables = EnvironmentVariables.refresh()
42+
43+
if environment_variables.WORKFLOW_NAME is None:
44+
raise ValueError("Environment variable WORKFLOW_NAME is not set")
45+
46+
if environment_variables.AGENT_NAME is None:
47+
raise ValueError("Environment variable AGENT_NAME is not set")
48+
49+
logger = make_logger(__name__)
50+
51+
52+
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
53+
class ExampleTutorialWorkflow(BaseWorkflow):
54+
"""
55+
Human-in-the-Loop Temporal Workflow
56+
57+
Demonstrates agents that can pause execution and wait for human approval.
58+
When approval is needed, the agent spawns a child workflow that waits for
59+
external signals (human input) before continuing.
60+
61+
Benefits: Durable waiting, survives system failures, scalable to millions of workflows.
62+
"""
63+
64+
def __init__(self):
65+
super().__init__(display_name=environment_variables.AGENT_NAME)
66+
self._complete_task = False
67+
self._pending_confirmation: asyncio.Queue[str] = asyncio.Queue()
68+
69+
@workflow.signal(name=SignalName.RECEIVE_EVENT)
70+
async def on_task_event_send(self, params: SendEventParams) -> None:
71+
"""
72+
Handle user messages with human-in-the-loop approval capability.
73+
74+
When the agent needs human approval, it calls wait_for_confirmation which spawns
75+
a child workflow that waits for external signals before continuing.
76+
"""
77+
logger.info(f"Received task message instruction: {params}")
78+
79+
# Echo user message back to UI
80+
await adk.messages.create(task_id=params.task.id, content=params.event.content)
81+
82+
# Create agent with human-in-the-loop capability
83+
# The wait_for_confirmation tool spawns a child workflow that waits for external signals
84+
confirm_order_agent = Agent(
85+
name="Confirm Order",
86+
instructions="You are a helpful confirm order agent. When a user asks you to confirm an order, use the wait_for_confirmation tool to wait for confirmation.",
87+
tools=[
88+
wait_for_confirmation,
89+
],
90+
)
91+
92+
# Run agent - when human approval is needed, it will spawn child workflow and wait
93+
result = await Runner.run(confirm_order_agent, params.event.content.content)
94+
95+
# Send response back to user (includes result of any human approval process)
96+
await adk.messages.create(
97+
task_id=params.task.id,
98+
content=TextContent(
99+
author="agent",
100+
content=result.final_output,
101+
),
102+
)
103+
104+
@workflow.run
105+
async def on_task_create(self, params: CreateTaskParams) -> str:
106+
"""
107+
Workflow entry point - starts the long-running human-in-the-loop agent.
108+
109+
Handles both automated decisions and human approval workflows durably.
110+
To approve waiting actions: temporal workflow signal --workflow-id="child-workflow-id" --name="fulfill_order_signal" --input=true
111+
"""
112+
logger.info(f"Received task create params: {params}")
113+
114+
# Send welcome message when task is created
115+
await adk.messages.create(
116+
task_id=params.task.id,
117+
content=TextContent(
118+
author="agent",
119+
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
120+
),
121+
)
122+
123+
# Keep workflow running indefinitely to handle user messages and human approvals
124+
# This survives system failures and can resume exactly where it left off
125+
await workflow.wait_condition(
126+
lambda: self._complete_task,
127+
timeout=None, # No timeout for long-running human-in-the-loop workflows
128+
)
129+
return "Task completed"
130+
131+
# TEMPORAL UI (localhost:8080):
132+
# - Main workflow shows agent activities + ChildWorkflow activity when approval needed
133+
# - Child workflow appears as separate "child-workflow-id" that waits for signal
134+
# - Timeline: invoke_model_activity → ChildWorkflow (waiting) → invoke_model_activity (after approval)
135+
#
136+
# To approve: temporal workflow signal --workflow-id="child-workflow-id" --name="fulfill_order_signal" --input=true
137+
# Production: Replace CLI with web dashboards/APIs that send signals programmatically
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""
2+
Tests for example-tutorial (OpenAI Agents SDK Human in the Loop)
3+
4+
Prerequisites:
5+
- AgentEx services running (make dev)
6+
- Temporal server running
7+
- Agent running: agentex agents run --manifest manifest.yaml
8+
9+
Run: pytest tests/test_agent.py -v
10+
"""
11+
12+
import pytest
13+
14+
from agentex.lib.testing import test_agentic_agent, assert_valid_agent_response
15+
16+
AGENT_NAME = "example-tutorial"
17+
18+
19+
@pytest.mark.asyncio
20+
async def test_agent_basic():
21+
"""Test basic agent functionality."""
22+
async with test_agentic_agent(agent_name=AGENT_NAME) as test:
23+
response = await test.send_event("Test message", timeout_seconds=60.0)
24+
assert_valid_agent_response(response)
25+
26+
27+
@pytest.mark.asyncio
28+
async def test_agent_streaming():
29+
"""Test streaming responses."""
30+
async with test_agentic_agent(agent_name=AGENT_NAME) as test:
31+
events = []
32+
async for event in test.send_event_and_stream("Stream test", timeout_seconds=60.0):
33+
events.append(event)
34+
if event.get("type") == "done":
35+
break
36+
assert len(events) > 0
37+
38+
39+
if __name__ == "__main__":
40+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)