|
1 | 1 | # temporal-worker-python |
2 | 2 |
|
3 | | -This repository contains TogetherCrew's Temporal Python workflows. Follow the steps below to set up the project locally: |
| 3 | +This repository contains TogetherCrew's Temporal Python workflows for data processing and analysis. It leverages the Temporal workflow engine to orchestrate ETL processes and data summarization tasks. |
| 4 | + |
| 5 | +## Project Components |
| 6 | + |
| 7 | +### Hivemind ETL |
| 8 | + |
| 9 | +- **Website Ingestion**: Extracts, transforms, and loads data from websites defined in the platform configuration. |
| 10 | +- **MediaWiki Ingestion**: Processes content from MediaWiki instances, including extraction of pages, revisions, and content. |
| 11 | + |
| 12 | +### Hivemind Summarizer |
| 13 | + |
| 14 | +- **Telegram Summaries**: Retrieves and processes summaries from Telegram data stored in Qdrant, with options to fetch by date or date range. |
| 15 | + |
| 16 | +## Architecture |
| 17 | + |
| 18 | +The project uses Temporal for workflow orchestration with the following components: |
| 19 | + |
| 20 | +- **Temporal Server**: Manages workflow execution and task queues |
| 21 | +- **MongoDB**: Stores platform and community configuration |
| 22 | +- **Qdrant**: Vector database for storing and retrieving summary content |
| 23 | +- **Redis**: Caching and state management |
| 24 | +- **PostgreSQL**: Used by Temporal for workflow history and state |
4 | 25 |
|
5 | 26 | ## Setup Instructions |
6 | 27 |
|
7 | 28 | 1. Configure Environment Variables |
8 | 29 | - Copy the example environment file: |
9 | 30 |
|
10 | 31 | ```bash |
11 | | - cp .env.example .env |
| 32 | + cp .env.example .env |
12 | 33 | ``` |
13 | 34 |
|
14 | 35 | Update the `.env` file with your own values, referencing the services defined in `docker-compose.dev.yml`. |
15 | 36 |
|
| 37 | + Required variables: |
| 38 | + - `TEMPORAL_TASK_QUEUE`: Queue name for the worker |
| 39 | + - Database connection parameters for MongoDB, Qdrant, etc. |
| 40 | + |
16 | 41 | 2. Start Services |
17 | | - - Use the following command to set up and run the required services: |
| 42 | + - Use the following command to set up and run the required services: |
| 43 | + |
| 44 | + ```bash |
| 45 | + docker compose -f docker-compose.dev.yml up -d |
| 46 | + ``` |
| 47 | + |
| 48 | +3. Open [localhost:8080](http://localhost:8080/) to access the Temporal dashboard. |
| 49 | + |
| 50 | +## Usage Examples |
| 51 | + |
| 52 | +### Running a Telegram Summary Workflow |
| 53 | + |
| 54 | +To fetch summaries for a specific community and date range: |
| 55 | + |
| 56 | +```python |
| 57 | +from temporalio.client import Client |
| 58 | +from hivemind_summarizer.workflows import TelegramSummariesWorkflow |
| 59 | +from hivemind_summarizer.schema import TelegramFetchSummariesWorkflowInput |
| 60 | + |
| 61 | +async def run_telegram_workflow(): |
| 62 | + client = await Client.connect("localhost:7233") |
| 63 | + |
| 64 | + # Create workflow input |
| 65 | + input_data = TelegramFetchSummariesWorkflowInput( |
| 66 | + platform_id="your_platform_id", |
| 67 | + community_id="your_community_id", |
| 68 | + start_date="2023-05-01", |
| 69 | + end_date="2023-05-07", |
| 70 | + extract_text_only=True |
| 71 | + ) |
| 72 | + |
| 73 | + # Execute workflow |
| 74 | + result = await client.execute_workflow( |
| 75 | + TelegramSummariesWorkflow.run, |
| 76 | + input_data, |
| 77 | + id="telegram-summaries-workflow", |
| 78 | + task_queue="your_task_queue" |
| 79 | + ) |
| 80 | + |
| 81 | + return result |
| 82 | +``` |
| 83 | + |
| 84 | +### Running a MediaWiki ETL Workflow |
| 85 | + |
| 86 | +To process MediaWiki content for all communities or a specific platform: |
| 87 | + |
| 88 | +```python |
| 89 | +from temporalio.client import Client |
| 90 | +from hivemind_etl.mediawiki.workflows import MediaWikiETLWorkflow |
| 91 | + |
| 92 | +async def run_mediawiki_workflow(platform_id=None): |
| 93 | + client = await Client.connect("localhost:7233") |
| 94 | + |
| 95 | + # Execute workflow for all platforms or a specific one |
| 96 | + await client.execute_workflow( |
| 97 | + MediaWikiETLWorkflow.run, |
| 98 | + platform_id, # Pass None to process all platforms |
| 99 | + id="mediawiki-etl-workflow", |
| 100 | + task_queue="your_task_queue" |
| 101 | + ) |
| 102 | +``` |
| 103 | + |
| 104 | +### Running a Website Ingestion Workflow |
| 105 | + |
| 106 | +To ingest content from websites: |
| 107 | + |
| 108 | +```python |
| 109 | +from temporalio.client import Client |
| 110 | +from hivemind_etl.website.workflows import WebsiteIngestionSchedulerWorkflow |
| 111 | + |
| 112 | +async def run_website_workflow(platform_id=None): |
| 113 | + client = await Client.connect("localhost:7233") |
| 114 | + |
| 115 | + # Execute workflow for all communities or a specific one |
| 116 | + await client.execute_workflow( |
| 117 | + WebsiteIngestionSchedulerWorkflow.run, |
| 118 | + platform_id, # Pass None to process all platforms |
| 119 | + id="website-ingestion-workflow", |
| 120 | + task_queue="your_task_queue" |
| 121 | + ) |
| 122 | +``` |
| 123 | + |
| 124 | +## Development |
| 125 | + |
| 126 | +To run the worker locally: |
18 | 127 |
|
19 | | - ```bash |
20 | | - docker compose -f docker-compose.dev.yml up -d |
21 | | - ``` |
| 128 | +```bash |
| 129 | +python worker.py |
| 130 | +``` |
22 | 131 |
|
23 | | -3. Open [localhost:8080](http://localhost:8080/) and check temporal dashboard. |
| 132 | +This will start a worker that connects to Temporal and listens for tasks on the configured task queue. |
0 commit comments