diff --git a/fastapi_streaming_example.py b/fastapi_streaming_example.py new file mode 100644 index 0000000000..64f61ce860 --- /dev/null +++ b/fastapi_streaming_example.py @@ -0,0 +1,177 @@ +""" +FastAPI Streaming Integration Example for CrewAI + +This example demonstrates how to integrate CrewAI with FastAPI to stream +crew execution events in real-time using Server-Sent Events (SSE). + +Installation: + pip install crewai fastapi uvicorn + +Usage: + python fastapi_streaming_example.py + +Then visit: + http://localhost:8000/docs for the API documentation + http://localhost:8000/stream?topic=AI to see streaming in action +""" + +import json +from typing import AsyncGenerator + +from fastapi import FastAPI +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +from crewai import Agent, Crew, Task + +app = FastAPI(title="CrewAI Streaming API") + + +class ResearchRequest(BaseModel): + topic: str + + +def create_research_crew(topic: str) -> Crew: + """Create a research crew for the given topic.""" + researcher = Agent( + role="Researcher", + goal=f"Research and analyze information about {topic}", + backstory="You're an expert researcher with deep knowledge in various fields.", + verbose=True, + ) + + task = Task( + description=f"Research and provide a comprehensive summary about {topic}", + expected_output="A detailed summary with key insights", + agent=researcher, + ) + + return Crew(agents=[researcher], tasks=[task], verbose=True) + + +@app.get("/") +async def root(): + """Root endpoint with API information.""" + return { + "message": "CrewAI Streaming API", + "endpoints": { + "/stream": "GET - Stream crew execution events (query param: topic)", + "/research": "POST - Execute crew and return final result", + }, + } + + +@app.get("/stream") +async def stream_crew_execution(topic: str = "artificial intelligence"): + """ + Stream crew execution events in real-time using Server-Sent Events. + + Args: + topic: The research topic (default: "artificial intelligence") + + Returns: + StreamingResponse with text/event-stream content type + """ + + async def event_generator() -> AsyncGenerator[str, None]: + """Generate Server-Sent Events from crew execution.""" + crew = create_research_crew(topic) + + try: + for event in crew.kickoff_stream(inputs={"topic": topic}): + event_data = json.dumps(event) + yield f"data: {event_data}\n\n" + + yield "data: {\"type\": \"done\"}\n\n" + + except Exception as e: + error_event = {"type": "error", "data": {"message": str(e)}} + yield f"data: {json.dumps(error_event)}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@app.post("/research") +async def research_topic(request: ResearchRequest): + """ + Execute crew research and return the final result. + + Args: + request: ResearchRequest with topic field + + Returns: + JSON response with the research result + """ + crew = create_research_crew(request.topic) + + try: + result = crew.kickoff(inputs={"topic": request.topic}) + return { + "success": True, + "topic": request.topic, + "result": result.raw, + "usage_metrics": ( + result.token_usage.model_dump() if result.token_usage else None + ), + } + except Exception as e: + return {"success": False, "error": str(e)} + + +@app.get("/stream-filtered") +async def stream_filtered_events( + topic: str = "artificial intelligence", event_types: str = "llm_stream_chunk" +): + """ + Stream only specific event types. + + Args: + topic: The research topic + event_types: Comma-separated list of event types to include + + Returns: + StreamingResponse with filtered events + """ + allowed_types = set(event_types.split(",")) + + async def event_generator() -> AsyncGenerator[str, None]: + crew = create_research_crew(topic) + + try: + for event in crew.kickoff_stream(inputs={"topic": topic}): + if event["type"] in allowed_types: + event_data = json.dumps(event) + yield f"data: {event_data}\n\n" + + yield "data: {\"type\": \"done\"}\n\n" + + except Exception as e: + error_event = {"type": "error", "data": {"message": str(e)}} + yield f"data: {json.dumps(error_event)}\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + +if __name__ == "__main__": + import uvicorn + + print("Starting CrewAI Streaming API...") + print("Visit http://localhost:8000/docs for API documentation") + print("Try: http://localhost:8000/stream?topic=quantum%20computing") + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index ed9479bdc9..1c86842aa8 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -766,6 +766,118 @@ async def run_crew(crew, input_data): self._task_output_handler.reset() return results + def kickoff_stream(self, inputs: dict[str, Any] | None = None): + """ + Stream crew execution events in real-time. + + This method yields events as they occur during crew execution, making it + easy to integrate with streaming frameworks like FastAPI's StreamingResponse. + + Args: + inputs: Optional dictionary of inputs for the crew execution + + Yields: + dict: Event dictionaries containing event type and data + + Example: + ```python + from fastapi import FastAPI + from fastapi.responses import StreamingResponse + import json + + app = FastAPI() + + @app.get("/stream") + async def stream_crew(): + def event_generator(): + for event in crew.kickoff_stream(inputs={"topic": "AI"}): + yield f"data: {json.dumps(event)}\\n\\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream" + ) + ``` + """ + import queue + import threading + from crewai.events.base_events import BaseEvent + + event_queue: queue.Queue = queue.Queue() + completion_event = threading.Event() + exception_holder = {"exception": None} + + def event_handler(source: Any, event: BaseEvent): + event_dict = { + "type": event.type, + "data": event.model_dump(exclude={"from_task", "from_agent"}), + } + event_queue.put(event_dict) + + from crewai.events.types.crew_events import ( + CrewKickoffStartedEvent, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + ) + from crewai.events.types.task_events import ( + TaskStartedEvent, + TaskCompletedEvent, + TaskFailedEvent, + ) + from crewai.events.types.agent_events import ( + AgentExecutionStartedEvent, + AgentExecutionCompletedEvent, + ) + from crewai.events.types.llm_events import ( + LLMStreamChunkEvent, + LLMCallStartedEvent, + LLMCallCompletedEvent, + ) + from crewai.events.types.tool_usage_events import ( + ToolUsageStartedEvent, + ToolUsageFinishedEvent, + ToolUsageErrorEvent, + ) + + crewai_event_bus.register_handler(CrewKickoffStartedEvent, event_handler) + crewai_event_bus.register_handler(CrewKickoffCompletedEvent, event_handler) + crewai_event_bus.register_handler(CrewKickoffFailedEvent, event_handler) + crewai_event_bus.register_handler(TaskStartedEvent, event_handler) + crewai_event_bus.register_handler(TaskCompletedEvent, event_handler) + crewai_event_bus.register_handler(TaskFailedEvent, event_handler) + crewai_event_bus.register_handler(AgentExecutionStartedEvent, event_handler) + crewai_event_bus.register_handler(AgentExecutionCompletedEvent, event_handler) + crewai_event_bus.register_handler(LLMStreamChunkEvent, event_handler) + crewai_event_bus.register_handler(LLMCallStartedEvent, event_handler) + crewai_event_bus.register_handler(LLMCallCompletedEvent, event_handler) + crewai_event_bus.register_handler(ToolUsageStartedEvent, event_handler) + crewai_event_bus.register_handler(ToolUsageFinishedEvent, event_handler) + crewai_event_bus.register_handler(ToolUsageErrorEvent, event_handler) + + def run_kickoff(): + try: + result = self.kickoff(inputs=inputs) + event_queue.put({"type": "final_output", "data": {"output": result.raw}}) + except Exception as e: + exception_holder["exception"] = e + finally: + completion_event.set() + + thread = threading.Thread(target=run_kickoff, daemon=True) + thread.start() + + try: + while not completion_event.is_set() or not event_queue.empty(): + event = event_queue.get(timeout=0.1) if not event_queue.empty() else None + if event is not None: + yield event + + if exception_holder["exception"]: + raise exception_holder["exception"] + + finally: + thread.join(timeout=1) + def _handle_crew_planning(self): """Handles the Crew planning.""" self._logger.log("info", "Planning the crew execution") diff --git a/tests/test_crew.py b/tests/test_crew.py index 0a9b946952..3e9c962c1f 100644 --- a/tests/test_crew.py +++ b/tests/test_crew.py @@ -4744,3 +4744,81 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory(): assert "Researcher" in messages[0]["content"] assert messages[1]["role"] == "user" assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"] + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_crew_kickoff_stream(researcher): + """Test that crew.kickoff_stream() yields events during execution.""" + task = Task( + description="Research a topic about AI", + expected_output="A brief summary about AI", + agent=researcher, + ) + + crew = Crew(agents=[researcher], tasks=[task]) + + events = list(crew.kickoff_stream()) + + assert len(events) > 0 + + event_types = [event["type"] for event in events] + assert "crew_kickoff_started" in event_types + assert "final_output" in event_types + + final_output_event = next(e for e in events if e["type"] == "final_output") + assert "output" in final_output_event["data"] + assert isinstance(final_output_event["data"]["output"], str) + assert len(final_output_event["data"]["output"]) > 0 + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_crew_kickoff_stream_with_inputs(researcher): + """Test that crew.kickoff_stream() works with inputs.""" + task = Task( + description="Research about {topic}", + expected_output="A brief summary about {topic}", + agent=researcher, + ) + + crew = Crew(agents=[researcher], tasks=[task]) + + events = list(crew.kickoff_stream(inputs={"topic": "machine learning"})) + + assert len(events) > 0 + + event_types = [event["type"] for event in events] + assert "crew_kickoff_started" in event_types + assert "final_output" in event_types + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_crew_kickoff_stream_includes_llm_chunks(researcher): + """Test that crew.kickoff_stream() includes LLM stream chunks.""" + task = Task( + description="Write a short poem about AI", + expected_output="A 2-line poem", + agent=researcher, + ) + + crew = Crew(agents=[researcher], tasks=[task]) + + events = list(crew.kickoff_stream()) + + event_types = [event["type"] for event in events] + + assert "task_started" in event_types or "agent_execution_started" in event_types + + +def test_crew_kickoff_stream_handles_errors(researcher): + """Test that crew.kickoff_stream() properly handles errors.""" + task = Task( + description="This task will fail", + expected_output="Should not complete", + agent=researcher, + ) + + crew = Crew(agents=[researcher], tasks=[task]) + + with patch("crewai.crew.Crew.kickoff", side_effect=Exception("Test error")): + with pytest.raises(Exception, match="Test error"): + list(crew.kickoff_stream())