Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
GOOGLE_GENAI_USE_VERTEXAI=FALSE
GOOGLE_API_KEY=******************
AGENT_BASE_URL=http://localhost:8000/a2a
94 changes: 94 additions & 0 deletions samples/python/agents/a2a-multiple-agents-on-single-host/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# a2a-multiple-agents-on-single-host

This repository demonstrates how to run **multiple A2A agents** on the **same host** using the A2A protocol.
Each agent is served at a **unique URL path**, making it possible to host different agents without requiring multiple servers or ports.

---

## 📌 Example Setup

Three agents running on the same host:

| Agent Name | Agent card URL |
|-----------------------|-------------------------------------------------------------------------------------------------------------------|
| Conversational Agent | [http://localhost:8000/a2a/conversation/agent-card.json](http://localhost:8000/a2a/conversation/agent-card.json) |
| Trending topics Agent | [http://localhost:8000/a2a/trending/agent-card.json](http://localhost:8000/a2a/trending/agent-card.json) |
| Analyzer Agent | [http://localhost:8000/a2a/analyzer/agent-card.json](http://localhost:8000/a2a/analyzer/agent-card.json) |


---

## 🚀 Running Agents Locally

1. Navigate to the sample code directory
```bash
cd samples/python/agents/a2a-multiple-agents-on-single-host
```

2. Install dependencies (using uv)
```bash
uv venv
source .venv/bin/activate
uv sync
```

3. Set environment variables
* Copy `.env-sample` to `.env`
```bash
cp .env-sample .env
```
* Update values as needed

4. Start the agents
```bash
uv run main.py
```

---

### Testing using CLI :

```shell
cd samples/python/hosts/cli
uv run . --agent http://localhost:8000/a2a/conversation/
```

---
## 📂 Project Structure

```text
├── README.md
├── a2a_client_app.py
├── main.py
├── pyproject.toml
├── src
│ ├── __init__.py
│ ├── a2a
│ │ ├── __init__.py
│ │ ├── a2a_client.py
│ │ └── a2a_fastapi_app.py
│ └── agent
│ ├── __init__.py
│ ├── analyzer_agent.py
│ ├── conversation_agent.py
│ └── trending_topics_agent.py
└── uv.lock

```

---

## ✅ Requirements

Key dependencies defined in `pyproject.toml`:

* `a2a-sdk`
* `google-adk`

---

## Notes

- This setup demonstrates hosting multiple agents via unique URL paths behind a single application server.
- If you run behind a reverse proxy, ensure the `/a2a/...` paths are forwarded to the app.
- For local development, keep your working directory at the project root so relative imports and paths resolve correctly.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import uuid

from src.a2a.a2a_client import A2ASimpleClient


async def main():
a2a_client: A2ASimpleClient = A2ASimpleClient()
agent_host_url = "http://localhost:8000/a2a"

trending_task = a2a_client.create_task(agent_url=f'{agent_host_url}/trending', message="What's trending today?",
context_id=str(uuid.uuid4()))
analysis_task = a2a_client.create_task(agent_url=f"{agent_host_url}/analyzer",
message="Analyze the trend AI in Social Media", context_id=str(uuid.uuid4()))

trending_topics, analysis = await asyncio.gather(trending_task, analysis_task)
print(trending_topics)
print(analysis)

print("###############################################################################################################")
print("Multi-turn conversation with an agent............................")
context_id = str(uuid.uuid4())
print(f"Starting conversation with context_id: {context_id}")

# Turn 1 — Start conversation
conversation_task = await a2a_client.create_task(
agent_url=f"{agent_host_url}/conversation",
message="Who is the Prime Minister of India?",
context_id=context_id
)
print(f"Turn 1 → {conversation_task} \n\n")

# Turn 2 — Follow-up using pronoun (tests context memory)
conversation_task = await a2a_client.create_task(
agent_url=f"{agent_host_url}/conversation",
message="What is his wife's name?",
context_id=context_id
)
print(f"Turn 2 → {conversation_task} \n\n")

# Turn 3 — Another contextual follow-up
conversation_task = await a2a_client.create_task(
agent_url=f"{agent_host_url}/conversation",
message="How many children do they have?",
context_id=context_id
)
print(f"Turn 3 → {conversation_task} \n\n")

# Turn 4 — A context shift
conversation_task = await a2a_client.create_task(
agent_url=f"{agent_host_url}/conversation",
message="List three major policies he introduced.",
context_id=context_id
)
print(f"Turn 4 → {conversation_task}")


if __name__ == "__main__":
asyncio.run(main())
51 changes: 51 additions & 0 deletions samples/python/agents/a2a-multiple-agents-on-single-host/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging
import os

import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI

from src.a2a.a2a_fastapi_app import A2AFastApiApp, get_agent_request_handler
from src.agent.analyzer_agent import analyzer_agent, get_analyzer_agent_card
from src.agent.conversation_agent import get_conversational_agent_card, conversational_agent
from src.agent.trending_topics_agent import trending_topics_agent, get_trending_topics_agent_card

load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

AGENT_BASE_URL = os.getenv('AGENT_BASE_URL')
logger.info(f"AGENT BASE URL {AGENT_BASE_URL}")

app: FastAPI = FastAPI(title="Run multiple agents on single host using A2A protocol.",
description="Run multiple agents on single host using A2A protocol.",
version="1.0.0",
root_path=f"/a2a")


@app.get("/health")
async def health_check() -> dict[str, str]:
return {"status": "ok"}


conversation_agent_request_handler = get_agent_request_handler(conversational_agent)
conversational_agent_card = get_conversational_agent_card(f"{AGENT_BASE_URL}/conversation/")
conversational_agent_server = A2AFastApiApp(fastapi_app=app, agent_card=conversational_agent_card, http_handler=conversation_agent_request_handler)
# {path:path} added into the agent card url path to handle both 'agent-card.json' and '/.well-known/agent-card.json'
conversational_agent_server.build(rpc_url="/conversation/", agent_card_url="/conversation/{path:path}")

trending_agent_request_handler = get_agent_request_handler(trending_topics_agent)
trending_topics_agent_card = get_trending_topics_agent_card(f"{AGENT_BASE_URL}/trending/")
trending_agent_server = A2AFastApiApp(fastapi_app=app, agent_card=trending_topics_agent_card,
http_handler=trending_agent_request_handler)
trending_agent_server.build(rpc_url="/trending/", agent_card_url="/trending/{path:path}")

analyzer_agent_request_handler = get_agent_request_handler(analyzer_agent)
analyzer_agent_card = get_analyzer_agent_card(f"{AGENT_BASE_URL}/analyzer/")
analyzer_agent_server = A2AFastApiApp(fastapi_app=app, agent_card=analyzer_agent_card,
http_handler=analyzer_agent_request_handler)
analyzer_agent_server.build(rpc_url="/analyzer/", agent_card_url="/analyzer/{path:path}")

if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[project]
name = "a2a-multiple-agents-on-single-host"
version = "0.1.0"
description = "Run multiple agents on single host using A2A protocol."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"a2a-sdk>=0.3.1",
"google-adk>=1.11.0",
]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from typing import Any


import httpx
from a2a.client import ClientConfig, ClientFactory, ClientCallContext
from a2a.types import (AgentCard, TransportProtocol, )
from a2a.types import Message, Part, Role, TextPart

from uuid import uuid4

from a2a.types import Message, Part, Role, TextPart

AGENT_CARD_PATH = '/agent-card.json'

class A2ASimpleClient:
def __init__(self, default_timeout: float = 240.0):
# Cache for agent metadata
self._agent_info_cache: dict[str, dict[str, Any] | None] = {}
self.default_timeout = default_timeout

async def create_task(self, agent_url: str, message: str, context_id:str) -> str:
"""Send a message following the official A2A SDK pattern."""
# Configure httpx client with timeout
timeout_config = httpx.Timeout(
timeout=self.default_timeout,
connect=10.0,
read=self.default_timeout,
write=10.0,
pool=5.0,
)

async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
# Check if we have cached agent card data
if (
agent_url in self._agent_info_cache
and self._agent_info_cache[agent_url] is not None
):
agent_card_data = self._agent_info_cache[agent_url]
else:
# Fetch the agent card
agent_card_response = await httpx_client.get(
f'{agent_url}{AGENT_CARD_PATH}'
)
agent_card_data = self._agent_info_cache[agent_url] = (
agent_card_response.json()
)

# Create AgentCard from data
agent_card = AgentCard(**agent_card_data)

# Create A2A client with the agent card
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[
TransportProtocol.jsonrpc,
TransportProtocol.http_json,
],
use_client_preference=True,
)

factory = ClientFactory(config)
client = factory.create(agent_card)
message_obj= Message(role=Role.user, parts=[Part(TextPart(text=message))], message_id=str(uuid4()), context_id=context_id)
responses = []
async for response in client.send_message(message_obj):
responses.append(response)
# The response is a tuple - get the first element (Task object)
if (
responses
and isinstance(responses[0], tuple)
and len(responses[0]) > 0
):
task = responses[0][0] # First element of the tuple

# Extract text: task.artifacts[0].parts[0].root.text
try:
return task.artifacts[0].parts[0].root.text
except (AttributeError, IndexError):
return str(task)

return 'No response received'
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from collections.abc import Callable
from typing import Any

from a2a.server.apps.jsonrpc.jsonrpc_app import JSONRPCApplication, CallContextBuilder
from a2a.server.context import ServerCallContext
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
AgentCard,
)
from a2a.utils.constants import (
AGENT_CARD_WELL_KNOWN_PATH,
DEFAULT_RPC_URL,
EXTENDED_AGENT_CARD_PATH,
)
from fastapi import FastAPI, APIRouter
from google.adk import Runner
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutorConfig, A2aAgentExecutor
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from starlette.applications import Starlette


def get_agent_request_handler(agent: LlmAgent):
runner = Runner(
app_name=agent.name,
agent=agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService()
)
config = A2aAgentExecutorConfig()
executor = A2aAgentExecutor(runner=runner, config=config)
return DefaultRequestHandler(agent_executor=executor, task_store=InMemoryTaskStore())


class A2AFastApiApp(JSONRPCApplication):
def __init__(self,
fastapi_app: FastAPI,
agent_card: AgentCard,
http_handler: RequestHandler,
extended_agent_card: AgentCard | None = None,
context_builder: CallContextBuilder | None = None,
card_modifier: Callable[[AgentCard], AgentCard] | None = None,
extended_card_modifier: Callable[[AgentCard, ServerCallContext], AgentCard] | None = None):
super().__init__(
agent_card=agent_card,
http_handler=http_handler,
extended_agent_card=extended_agent_card,
context_builder=context_builder,
card_modifier=card_modifier,
extended_card_modifier=extended_card_modifier
)
self.fastapi_app = fastapi_app

def build(self, agent_card_url: str = AGENT_CARD_WELL_KNOWN_PATH, rpc_url: str = DEFAULT_RPC_URL,
extended_agent_card_url: str = EXTENDED_AGENT_CARD_PATH, **kwargs: Any, ) -> Starlette:
name_prefix = rpc_url.replace("/", "")
router = APIRouter()
router.add_api_route(rpc_url, endpoint=self._handle_requests, name=f'{name_prefix}_a2a_handler',
methods=['POST'])
router.add_api_route(agent_card_url , endpoint=self._handle_get_agent_card, methods=['GET'],
name=f'{name_prefix}_agent_card')
self.fastapi_app.include_router(router)
return self.fastapi_app
Loading