|
1 | 1 | """ |
2 | | -Sample tests for AgentEx ACP agent. |
| 2 | +Tests for s000-hello-acp (sync agent) |
3 | 3 |
|
4 | | -This test suite demonstrates how to test the main AgentEx API functions: |
| 4 | +This test suite demonstrates testing a sync agent using the AgentEx testing framework. |
| 5 | +
|
| 6 | +Test coverage: |
5 | 7 | - Non-streaming message sending |
6 | 8 | - Streaming message sending |
7 | | -- Task creation via RPC |
8 | 9 |
|
9 | | -To run these tests: |
10 | | -1. Make sure the agent is running (via docker-compose or `agentex agents run`) |
11 | | -2. Set the AGENTEX_API_BASE_URL environment variable if not using default |
12 | | -3. Run: pytest test_agent.py -v |
| 10 | +Prerequisites: |
| 11 | + - AgentEx services running (make dev) |
| 12 | + - Agent running: agentex agents run --manifest manifest.yaml |
13 | 13 |
|
14 | | -Configuration: |
15 | | -- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) |
16 | | -- AGENT_NAME: Name of the agent to test (default: hello-acp) |
| 14 | +Run tests: |
| 15 | + pytest tests/test_agent.py -v |
17 | 16 | """ |
18 | 17 |
|
19 | | -import os |
| 18 | +from agentex.lib.testing import ( |
| 19 | + test_sync_agent, |
| 20 | + collect_streaming_deltas, |
| 21 | + assert_valid_agent_response, |
| 22 | +) |
20 | 23 |
|
21 | | -import pytest |
| 24 | +AGENT_NAME = "s000-hello-acp" |
22 | 25 |
|
23 | | -from agentex import Agentex |
24 | | -from agentex.types import TextDelta, TextContent, TextContentParam |
25 | | -from agentex.types.agent_rpc_params import ParamsSendMessageRequest |
26 | | -from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta |
27 | 26 |
|
28 | | -# Configuration from environment variables |
29 | | -AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") |
30 | | -AGENT_NAME = os.environ.get("AGENT_NAME", "s000-hello-acp") |
| 27 | +def test_send_simple_message(): |
| 28 | + """Test sending a simple message and receiving a response.""" |
| 29 | + with test_sync_agent(agent_name=AGENT_NAME) as test: |
| 30 | + message_content = "Hello, Agent! How are you?" |
| 31 | + response = test.send_message(message_content) |
31 | 32 |
|
| 33 | + # Validate response |
| 34 | + assert_valid_agent_response(response) |
32 | 35 |
|
33 | | -@pytest.fixture |
34 | | -def client(): |
35 | | - """Create an AgentEx client instance for testing.""" |
36 | | - client = Agentex(base_url=AGENTEX_API_BASE_URL) |
37 | | - yield client |
38 | | - # Clean up: close the client connection |
39 | | - client.close() |
| 36 | + # Check expected response format |
| 37 | + expected = f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
| 38 | + assert response.content == expected, f"Expected: {expected}\nGot: {response.content}" |
40 | 39 |
|
41 | 40 |
|
42 | | -@pytest.fixture |
43 | | -def agent_name(): |
44 | | - """Return the agent name for testing.""" |
45 | | - return AGENT_NAME |
| 41 | +def test_stream_simple_message(): |
| 42 | + """Test streaming a simple message and aggregating deltas.""" |
| 43 | + with test_sync_agent(agent_name=AGENT_NAME) as test: |
| 44 | + message_content = "Hello, Agent! Can you stream your response?" |
46 | 45 |
|
| 46 | + # Get streaming response |
| 47 | + response_gen = test.send_message_streaming(message_content) |
47 | 48 |
|
48 | | -class TestNonStreamingMessages: |
49 | | - """Test non-streaming message sending.""" |
| 49 | + # Collect streaming deltas |
| 50 | + aggregated_content, chunks = collect_streaming_deltas(response_gen) |
50 | 51 |
|
51 | | - def test_send_simple_message(self, client: Agentex, agent_name: str): |
52 | | - """Test sending a simple message and receiving a response.""" |
| 52 | + # Validate we got content |
| 53 | + assert len(chunks) > 0, "Should receive at least one chunk" |
| 54 | + assert len(aggregated_content) > 0, "Should receive content" |
53 | 55 |
|
54 | | - message_content = "Hello, Agent! How are you?" |
55 | | - response = client.agents.send_message( |
56 | | - agent_name=agent_name, |
57 | | - params=ParamsSendMessageRequest( |
58 | | - content=TextContentParam( |
59 | | - author="user", |
60 | | - content=message_content, |
61 | | - type="text", |
62 | | - ) |
63 | | - ), |
64 | | - ) |
65 | | - result = response.result |
66 | | - assert result is not None |
67 | | - assert len(result) == 1 |
68 | | - message = result[0] |
69 | | - assert isinstance(message.content, TextContent) |
70 | | - assert ( |
71 | | - message.content.content |
72 | | - == f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
73 | | - ) |
74 | | - |
75 | | - |
76 | | -class TestStreamingMessages: |
77 | | - """Test streaming message sending.""" |
78 | | - |
79 | | - def test_stream_simple_message(self, client: Agentex, agent_name: str): |
80 | | - """Test streaming a simple message and aggregating deltas.""" |
81 | | - |
82 | | - message_content = "Hello, Agent! Can you stream your response?" |
83 | | - aggregated_content = "" |
84 | | - full_content = "" |
85 | | - received_chunks = False |
86 | | - |
87 | | - for chunk in client.agents.send_message_stream( |
88 | | - agent_name=agent_name, |
89 | | - params=ParamsSendMessageRequest( |
90 | | - content=TextContentParam( |
91 | | - author="user", |
92 | | - content=message_content, |
93 | | - type="text", |
94 | | - ) |
95 | | - ), |
96 | | - ): |
97 | | - received_chunks = True |
98 | | - task_message_update = chunk.result |
99 | | - # Collect text deltas as they arrive or check full messages |
100 | | - if isinstance(task_message_update, StreamTaskMessageDelta) and task_message_update.delta is not None: |
101 | | - delta = task_message_update.delta |
102 | | - if isinstance(delta, TextDelta) and delta.text_delta is not None: |
103 | | - aggregated_content += delta.text_delta |
104 | | - |
105 | | - elif isinstance(task_message_update, StreamTaskMessageFull): |
106 | | - content = task_message_update.content |
107 | | - if isinstance(content, TextContent): |
108 | | - full_content = content.content |
109 | | - |
110 | | - if not full_content and not aggregated_content: |
111 | | - raise AssertionError("No content was received in the streaming response.") |
112 | | - if not received_chunks: |
113 | | - raise AssertionError("No streaming chunks were received, when at least 1 was expected.") |
114 | | - |
115 | | - if full_content: |
116 | | - assert ( |
117 | | - full_content |
118 | | - == f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
119 | | - ) |
120 | | - |
121 | | - if aggregated_content: |
122 | | - assert ( |
123 | | - aggregated_content |
124 | | - == f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
125 | | - ) |
| 56 | + # Check expected response format |
| 57 | + expected = f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
| 58 | + assert aggregated_content == expected, f"Expected: {expected}\nGot: {aggregated_content}" |
126 | 59 |
|
127 | 60 |
|
128 | 61 | if __name__ == "__main__": |
| 62 | + import pytest |
| 63 | + |
129 | 64 | pytest.main([__file__, "-v"]) |
0 commit comments