From 5e4a37bdfeebcf2e8553e061ecac98b3c19257d8 Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Wed, 15 Oct 2025 21:24:50 -0400 Subject: [PATCH] Add simplified run() API for OpenAI Agents SDK integration - New run() method accepts standard Agent objects from OpenAI SDK - Automatically converts ModelSettings and FunctionTool to serializable format - Provides clean DX matching OpenAI Agents SDK with just task_id param - Works in Temporal workflows, sync agents (FastACP), and standalone scripts Example usage: from agents import Agent, function_tool, ModelSettings @function_tool def my_tool(param: str) -> str: return f"Result: {param}" agent = Agent( name="My Agent", instructions="Help users", model="gpt-4o", model_settings=ModelSettings(parallel_tool_calls=True), tools=[my_tool] ) result = await adk.providers.openai.run( agent=agent, input="Hello", task_id=task_id ) --- .../lib/adk/providers/_modules/openai.py | 212 ++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/src/agentex/lib/adk/providers/_modules/openai.py b/src/agentex/lib/adk/providers/_modules/openai.py index ea925bda..f4ae3002 100644 --- a/src/agentex/lib/adk/providers/_modules/openai.py +++ b/src/agentex/lib/adk/providers/_modules/openai.py @@ -477,3 +477,215 @@ async def run_agent_streamed_auto_send( max_turns=max_turns, previous_response_id=previous_response_id, ) + + async def run( + self, + agent: Agent, + input: str | list[dict[str, Any]], + task_id: str, + *, + trace_id: str | None = None, + parent_span_id: str | None = None, + start_to_close_timeout: timedelta = timedelta(seconds=600), + heartbeat_timeout: timedelta = timedelta(seconds=600), + retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY, + max_turns: int | None = None, + mcp_server_params: list[StdioServerParameters] | None = None, + previous_response_id: str | None = None, + ) -> SerializableRunResultStreaming | RunResultStreaming: + """ + Run an OpenAI Agent with automatic streaming to AgentEx UI. + + This is a simple wrapper that lets you use standard OpenAI Agents SDK + patterns while getting AgentEx features (streaming, tracing, TaskMessages). + + Works everywhere: Temporal workflows, sync agents (FastACP), standalone scripts. + + Example: + from agents import Agent, function_tool, ModelSettings + from openai.types.shared import Reasoning + + @function_tool + def get_weather(city: str) -> str: + return f"Weather in {city}: Sunny" + + agent = Agent( + name="Weather Bot", + instructions="Help with weather", + model="gpt-4o", + model_settings=ModelSettings( + parallel_tool_calls=True, + reasoning=Reasoning(effort="low", summary="auto") + ), + tools=[get_weather] + ) + + result = await adk.providers.openai.run( + agent=agent, + input="What's the weather in Tokyo?", + task_id=params.task.id, + trace_id=params.task.id, + parent_span_id=span.id, + ) + + Args: + agent: Standard OpenAI Agents SDK Agent object + input: User message (str) or conversation history (list of dicts) + task_id: AgentEx task ID for streaming + trace_id: Optional trace ID (defaults to task_id) + parent_span_id: Optional parent span for nested tracing + start_to_close_timeout: Maximum time allowed for the operation + heartbeat_timeout: Maximum time between heartbeats + retry_policy: Policy for retrying failed operations + max_turns: Max conversation turns (default from Runner) + mcp_server_params: Optional MCP server configurations + previous_response_id: For conversation continuity + + Returns: + RunResult with final_output and conversation history + """ + # 1. Normalize input format + if isinstance(input, str): + input_list = [{"role": "user", "content": input}] + else: + input_list = input + + # 2. Extract agent properties + agent_name = agent.name + agent_instructions = agent.instructions + + # Extract model name + if isinstance(agent.model, str): + model = agent.model + else: + model = None # Will use default + + # Extract model settings and convert to serializable format if needed + model_settings = getattr(agent, 'model_settings', None) + if model_settings and not isinstance(model_settings, dict): + # Convert OpenAI SDK ModelSettings to serializable format + from agentex.lib.core.temporal.activities.adk.providers.openai_activities import ModelSettings as SerializableModelSettings + + model_settings = SerializableModelSettings( + temperature=getattr(model_settings, 'temperature', None), + max_tokens=getattr(model_settings, 'max_tokens', None), + top_p=getattr(model_settings, 'top_p', None), + frequency_penalty=getattr(model_settings, 'frequency_penalty', None), + presence_penalty=getattr(model_settings, 'presence_penalty', None), + parallel_tool_calls=getattr(model_settings, 'parallel_tool_calls', None), + tool_choice=getattr(model_settings, 'tool_choice', None), + reasoning=getattr(model_settings, 'reasoning', None), + store=getattr(model_settings, 'store', None), + metadata=getattr(model_settings, 'metadata', None), + extra_headers=getattr(model_settings, 'extra_headers', None), + extra_body=getattr(model_settings, 'extra_body', None), + extra_args=getattr(model_settings, 'extra_args', None), + ) + + # Extract other properties and convert tools to serializable format + tools = agent.tools or [] + if tools: + # Import all tool types we need + from agents.tool import ( + FunctionTool as OAIFunctionTool, + WebSearchTool as OAIWebSearchTool, + FileSearchTool as OAIFileSearchTool, + ComputerTool as OAIComputerTool, + LocalShellTool as OAILocalShellTool, + CodeInterpreterTool as OAICodeInterpreterTool, + ImageGenerationTool as OAIImageGenerationTool, + ) + from agentex.lib.core.temporal.activities.adk.providers.openai_activities import ( + FunctionTool as SerializableFunctionTool, + WebSearchTool as SerializableWebSearchTool, + FileSearchTool as SerializableFileSearchTool, + ComputerTool as SerializableComputerTool, + LocalShellTool as SerializableLocalShellTool, + CodeInterpreterTool as SerializableCodeInterpreterTool, + ImageGenerationTool as SerializableImageGenerationTool, + ) + + # Convert tools to ensure they're serializable for Temporal + converted_tools = [] + for tool in tools: + # If already a serializable wrapper, keep as-is + if hasattr(tool, 'to_oai_function_tool'): + converted_tools.append(tool) + # Convert OpenAI SDK tool types to serializable wrappers + elif isinstance(tool, OAIFunctionTool): + # FunctionTool requires on_invoke_tool callable + if not hasattr(tool, 'on_invoke_tool') or tool.on_invoke_tool is None: + raise ValueError(f"FunctionTool '{tool.name}' missing required on_invoke_tool callable") + converted_tools.append(SerializableFunctionTool( + name=tool.name, + description=tool.description, + params_json_schema=tool.params_json_schema, + strict_json_schema=getattr(tool, 'strict_json_schema', True), + on_invoke_tool=tool.on_invoke_tool, + )) + elif isinstance(tool, OAIWebSearchTool): + converted_tools.append(SerializableWebSearchTool( + user_location=getattr(tool, 'user_location', None), + search_context_size=getattr(tool, 'search_context_size', 'medium'), + )) + elif isinstance(tool, OAIFileSearchTool): + converted_tools.append(SerializableFileSearchTool( + vector_store_ids=tool.vector_store_ids, + max_num_results=getattr(tool, 'max_num_results', None), + include_search_results=getattr(tool, 'include_search_results', False), + ranking_options=getattr(tool, 'ranking_options', None), + filters=getattr(tool, 'filters', None), + )) + elif isinstance(tool, OAIComputerTool): + converted_tools.append(SerializableComputerTool( + computer=getattr(tool, 'computer', None), + on_safety_check=getattr(tool, 'on_safety_check', None), + )) + elif isinstance(tool, OAILocalShellTool): + converted_tools.append(SerializableLocalShellTool( + executor=getattr(tool, 'executor', None), + )) + elif isinstance(tool, OAICodeInterpreterTool): + converted_tools.append(SerializableCodeInterpreterTool( + tool_config=getattr(tool, 'tool_config', {"type": "code_interpreter"}), + )) + elif isinstance(tool, OAIImageGenerationTool): + converted_tools.append(SerializableImageGenerationTool( + tool_config=getattr(tool, 'tool_config', {"type": "image_generation"}), + )) + else: + # Unknown tool type - keep as-is and let downstream handle it + converted_tools.append(tool) + tools = converted_tools + + handoffs = agent.handoffs or [] + handoff_description = getattr(agent, 'handoff_description', None) + output_type = getattr(agent, 'output_type', None) + tool_use_behavior = getattr(agent, 'tool_use_behavior', 'run_llm_again') + input_guardrails = getattr(agent, 'input_guardrails', None) + output_guardrails = getattr(agent, 'output_guardrails', None) + + # 3. Call the existing service layer + return await self.run_agent_streamed_auto_send( + task_id=task_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + input_list=input_list, + mcp_server_params=mcp_server_params or [], + agent_name=agent_name, + agent_instructions=agent_instructions, + model=model, + model_settings=model_settings, + tools=tools, + handoff_description=handoff_description, + handoffs=handoffs, + output_type=output_type, + tool_use_behavior=tool_use_behavior, + start_to_close_timeout=start_to_close_timeout, + heartbeat_timeout=heartbeat_timeout, + retry_policy=retry_policy, + input_guardrails=input_guardrails, + output_guardrails=output_guardrails, + max_turns=max_turns, + previous_response_id=previous_response_id, + )