This project implements a CrewAI-based workflow system with comprehensive MongoDB persistence for tracking every step of the workflow execution.
- MongoDB Persistence: Every step of the workflow is persisted to MongoDB for audit trails and debugging
- Workflow Tracking: Complete visibility into the execution flow with timestamps and step data
- Error Handling: Comprehensive error tracking and recovery mechanisms
- Chat History: Redis-based chat history management
- RAG Integration: Retrieval-Augmented Generation pipeline for data source queries
- MongoPersistence: Handles all MongoDB operations for workflow state tracking
- AgenticHivemindFlow: CrewAI flow that orchestrates the agent interactions
- run_hivemind_agent_activity: Temporal activity that manages the workflow execution
- QueryDataSources: Handles RAG queries with workflow ID tracking
The system tracks the following steps in MongoDB:
- initialization: Initial workflow setup with parameters
- chat_history_retrieval: Redis chat history retrieval (if applicable)
- no_chat_history: When no chat history is available
- flow_initialization: AgenticHivemindFlow setup
- flow_execution_start: Beginning of CrewAI flow execution
- local_model_classification: Local transformer model classification result
- question_classification: Language model question classification with reasoning
- rag_classification: RAG question classification with score and reasoning
- history_query_classification: History vs RAG query classification (if applicable)
- flow_execution_complete: Completion of CrewAI flow
- answer_processing: Processing of the final answer
- error_handling: Any error handling steps
- memory_update: Redis memory updates (if applicable)
- error_occurred: Any errors during execution
Use the .env.example to prepare your .env file.
The system now persists detailed classification reasoning and results for better audit trails and debugging:
- Step Name:
local_model_classification - Data: Result from local transformer model
- Model:
local_transformer
- Step Name:
question_classification - Data:
result: Boolean indicating if the message is a questionreasoning: Detailed explanation for the classificationmodel:language_modelquery: Original user query
- Step Name:
rag_classification - Data:
result: Boolean indicating if RAG is neededscore: Sensitivity score (0-1)reasoning: Detailed explanation for the scoremodel:language_modelquery: Original user query
- Step Name:
history_query_classification - Data:
result: Boolean indicating if it's a history querymodel:openai_gpt4query: Original user queryhasChatHistory: Boolean indicating if chat history was available
The workflow states are stored in the internal_messages collection with the following structure:
{
"_id": "ObjectId",
"communityId": "string",
"route": {
"source": "string",
"destination": {
"queue": "string",
"event": "string"
}
},
"question": {
"message": "string",
"filters": "object (optional)"
},
"response": {
"message": "string"
},
"metadata": "object",
"createdAt": "datetime",
"updatedAt": "datetime",
"steps": [
{
"stepName": "string",
"timestamp": "datetime",
"data": "object"
}
],
"currentStep": "string",
"status": "string",
"chatId": "string (optional)",
"enableAnswerSkipping": "boolean"
}python worker.pyYou can query the MongoDB collection to inspect workflow execution:
from tasks.mongo_persistence import MongoPersistence
persistence = MongoPersistence()
workflow_state = persistence.get_workflow_state("workflow_id_here")
print(workflow_state)Run the unit tests:
python -m pytest tests/unit/test_mongo_persistence.pypymongo==4.8.0: MongoDB driverredis==5.2.0: Redis clientcrewai==0.105.0: AI agent frameworktemporalio: Temporal workflow engineopenai==1.66.3: OpenAI API client
The workflow ID is passed through the entire execution chain:
- Created in
run_hivemind_agent_activity - Passed to
AgenticHivemindFlow - Passed to
RAGPipelineTool - Passed to
QueryDataSources - Included in
HivemindQueryPayloadfor theHivemindWorkflow
This ensures complete traceability from the initial query to the final response.