Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions fastapi_streaming_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""
FastAPI Streaming Integration Example for CrewAI

This example demonstrates how to integrate CrewAI with FastAPI to stream
crew execution events in real-time using Server-Sent Events (SSE).

Installation:
pip install crewai fastapi uvicorn

Usage:
python fastapi_streaming_example.py

Then visit:
http://localhost:8000/docs for the API documentation
http://localhost:8000/stream?topic=AI to see streaming in action
"""

import json
from typing import AsyncGenerator

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from crewai import Agent, Crew, Task

app = FastAPI(title="CrewAI Streaming API")


class ResearchRequest(BaseModel):
topic: str


def create_research_crew(topic: str) -> Crew:
"""Create a research crew for the given topic."""
researcher = Agent(
role="Researcher",
goal=f"Research and analyze information about {topic}",
backstory="You're an expert researcher with deep knowledge in various fields.",
verbose=True,
)

task = Task(
description=f"Research and provide a comprehensive summary about {topic}",
expected_output="A detailed summary with key insights",
agent=researcher,
)

return Crew(agents=[researcher], tasks=[task], verbose=True)


@app.get("/")
async def root():
"""Root endpoint with API information."""
return {
"message": "CrewAI Streaming API",
"endpoints": {
"/stream": "GET - Stream crew execution events (query param: topic)",
"/research": "POST - Execute crew and return final result",
},
}


@app.get("/stream")
async def stream_crew_execution(topic: str = "artificial intelligence"):
"""
Stream crew execution events in real-time using Server-Sent Events.

Args:
topic: The research topic (default: "artificial intelligence")

Returns:
StreamingResponse with text/event-stream content type
"""

async def event_generator() -> AsyncGenerator[str, None]:
"""Generate Server-Sent Events from crew execution."""
crew = create_research_crew(topic)

try:
for event in crew.kickoff_stream(inputs={"topic": topic}):
event_data = json.dumps(event)
yield f"data: {event_data}\n\n"

yield "data: {\"type\": \"done\"}\n\n"

except Exception as e:
error_event = {"type": "error", "data": {"message": str(e)}}
yield f"data: {json.dumps(error_event)}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)


@app.post("/research")
async def research_topic(request: ResearchRequest):
"""
Execute crew research and return the final result.

Args:
request: ResearchRequest with topic field

Returns:
JSON response with the research result
"""
crew = create_research_crew(request.topic)

try:
result = crew.kickoff(inputs={"topic": request.topic})
return {
"success": True,
"topic": request.topic,
"result": result.raw,
"usage_metrics": (
result.token_usage.model_dump() if result.token_usage else None
),
}
except Exception as e:
return {"success": False, "error": str(e)}


@app.get("/stream-filtered")
async def stream_filtered_events(
topic: str = "artificial intelligence", event_types: str = "llm_stream_chunk"
):
"""
Stream only specific event types.

Args:
topic: The research topic
event_types: Comma-separated list of event types to include

Returns:
StreamingResponse with filtered events
"""
allowed_types = set(event_types.split(","))

async def event_generator() -> AsyncGenerator[str, None]:
crew = create_research_crew(topic)

try:
for event in crew.kickoff_stream(inputs={"topic": topic}):
if event["type"] in allowed_types:
event_data = json.dumps(event)
yield f"data: {event_data}\n\n"

yield "data: {\"type\": \"done\"}\n\n"

except Exception as e:
error_event = {"type": "error", "data": {"message": str(e)}}
yield f"data: {json.dumps(error_event)}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)


if __name__ == "__main__":
import uvicorn

print("Starting CrewAI Streaming API...")
print("Visit http://localhost:8000/docs for API documentation")
print("Try: http://localhost:8000/stream?topic=quantum%20computing")

uvicorn.run(app, host="0.0.0.0", port=8000)
112 changes: 112 additions & 0 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,118 @@ async def run_crew(crew, input_data):
self._task_output_handler.reset()
return results

def kickoff_stream(self, inputs: dict[str, Any] | None = None):
"""
Stream crew execution events in real-time.

This method yields events as they occur during crew execution, making it
easy to integrate with streaming frameworks like FastAPI's StreamingResponse.

Args:
inputs: Optional dictionary of inputs for the crew execution

Yields:
dict: Event dictionaries containing event type and data

Example:
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.get("/stream")
async def stream_crew():
def event_generator():
for event in crew.kickoff_stream(inputs={"topic": "AI"}):
yield f"data: {json.dumps(event)}\\n\\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
```
"""
import queue
import threading
from crewai.events.base_events import BaseEvent

event_queue: queue.Queue = queue.Queue()
completion_event = threading.Event()
exception_holder = {"exception": None}

def event_handler(source: Any, event: BaseEvent):
event_dict = {
"type": event.type,
"data": event.model_dump(exclude={"from_task", "from_agent"}),
}
event_queue.put(event_dict)

from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
)
from crewai.events.types.task_events import (
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
)
from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
LLMCallStartedEvent,
LLMCallCompletedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageStartedEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
)

crewai_event_bus.register_handler(CrewKickoffStartedEvent, event_handler)
crewai_event_bus.register_handler(CrewKickoffCompletedEvent, event_handler)
crewai_event_bus.register_handler(CrewKickoffFailedEvent, event_handler)
crewai_event_bus.register_handler(TaskStartedEvent, event_handler)
crewai_event_bus.register_handler(TaskCompletedEvent, event_handler)
crewai_event_bus.register_handler(TaskFailedEvent, event_handler)
crewai_event_bus.register_handler(AgentExecutionStartedEvent, event_handler)
crewai_event_bus.register_handler(AgentExecutionCompletedEvent, event_handler)
crewai_event_bus.register_handler(LLMStreamChunkEvent, event_handler)
crewai_event_bus.register_handler(LLMCallStartedEvent, event_handler)
crewai_event_bus.register_handler(LLMCallCompletedEvent, event_handler)
crewai_event_bus.register_handler(ToolUsageStartedEvent, event_handler)
crewai_event_bus.register_handler(ToolUsageFinishedEvent, event_handler)
crewai_event_bus.register_handler(ToolUsageErrorEvent, event_handler)

def run_kickoff():
try:
result = self.kickoff(inputs=inputs)
event_queue.put({"type": "final_output", "data": {"output": result.raw}})
except Exception as e:
exception_holder["exception"] = e
finally:
completion_event.set()

thread = threading.Thread(target=run_kickoff, daemon=True)
thread.start()

try:
while not completion_event.is_set() or not event_queue.empty():
event = event_queue.get(timeout=0.1) if not event_queue.empty() else None
if event is not None:
yield event

if exception_holder["exception"]:
raise exception_holder["exception"]

finally:
thread.join(timeout=1)

def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
Expand Down
78 changes: 78 additions & 0 deletions tests/test_crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -4744,3 +4744,81 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
assert "Researcher" in messages[0]["content"]
assert messages[1]["role"] == "user"
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]


@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_kickoff_stream(researcher):
"""Test that crew.kickoff_stream() yields events during execution."""
task = Task(
description="Research a topic about AI",
expected_output="A brief summary about AI",
agent=researcher,
)

crew = Crew(agents=[researcher], tasks=[task])

events = list(crew.kickoff_stream())

assert len(events) > 0

event_types = [event["type"] for event in events]
assert "crew_kickoff_started" in event_types
assert "final_output" in event_types

final_output_event = next(e for e in events if e["type"] == "final_output")
assert "output" in final_output_event["data"]
assert isinstance(final_output_event["data"]["output"], str)
assert len(final_output_event["data"]["output"]) > 0


@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_kickoff_stream_with_inputs(researcher):
"""Test that crew.kickoff_stream() works with inputs."""
task = Task(
description="Research about {topic}",
expected_output="A brief summary about {topic}",
agent=researcher,
)

crew = Crew(agents=[researcher], tasks=[task])

events = list(crew.kickoff_stream(inputs={"topic": "machine learning"}))

assert len(events) > 0

event_types = [event["type"] for event in events]
assert "crew_kickoff_started" in event_types
assert "final_output" in event_types


@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_kickoff_stream_includes_llm_chunks(researcher):
"""Test that crew.kickoff_stream() includes LLM stream chunks."""
task = Task(
description="Write a short poem about AI",
expected_output="A 2-line poem",
agent=researcher,
)

crew = Crew(agents=[researcher], tasks=[task])

events = list(crew.kickoff_stream())

event_types = [event["type"] for event in events]

assert "task_started" in event_types or "agent_execution_started" in event_types


def test_crew_kickoff_stream_handles_errors(researcher):
"""Test that crew.kickoff_stream() properly handles errors."""
task = Task(
description="This task will fail",
expected_output="Should not complete",
agent=researcher,
)

crew = Crew(agents=[researcher], tasks=[task])

with patch("crewai.crew.Crew.kickoff", side_effect=Exception("Test error")):
with pytest.raises(Exception, match="Test error"):
list(crew.kickoff_stream())
Loading