Skip to content

Commit 73808cc

Browse files
authored
Merge pull request #30 from TogetherCrew/feat/29-fetch-summaries
feat: Added telegram fetch summaries workflow!
2 parents 61cc52a + 5805a1f commit 73808cc

File tree

7 files changed

+496
-7
lines changed

7 files changed

+496
-7
lines changed

README.md

Lines changed: 116 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,132 @@
11
# temporal-worker-python
22

3-
This repository contains TogetherCrew's Temporal Python workflows. Follow the steps below to set up the project locally:
3+
This repository contains TogetherCrew's Temporal Python workflows for data processing and analysis. It leverages the Temporal workflow engine to orchestrate ETL processes and data summarization tasks.
4+
5+
## Project Components
6+
7+
### Hivemind ETL
8+
9+
- **Website Ingestion**: Extracts, transforms, and loads data from websites defined in the platform configuration.
10+
- **MediaWiki Ingestion**: Processes content from MediaWiki instances, including extraction of pages, revisions, and content.
11+
12+
### Hivemind Summarizer
13+
14+
- **Telegram Summaries**: Retrieves and processes summaries from Telegram data stored in Qdrant, with options to fetch by date or date range.
15+
16+
## Architecture
17+
18+
The project uses Temporal for workflow orchestration with the following components:
19+
20+
- **Temporal Server**: Manages workflow execution and task queues
21+
- **MongoDB**: Stores platform and community configuration
22+
- **Qdrant**: Vector database for storing and retrieving summary content
23+
- **Redis**: Caching and state management
24+
- **PostgreSQL**: Used by Temporal for workflow history and state
425

526
## Setup Instructions
627

728
1. Configure Environment Variables
829
- Copy the example environment file:
930

1031
```bash
11-
cp .env.example .env
32+
cp .env.example .env
1233
```
1334

1435
Update the `.env` file with your own values, referencing the services defined in `docker-compose.dev.yml`.
1536

37+
Required variables:
38+
- `TEMPORAL_TASK_QUEUE`: Queue name for the worker
39+
- Database connection parameters for MongoDB, Qdrant, etc.
40+
1641
2. Start Services
17-
- Use the following command to set up and run the required services:
42+
- Use the following command to set up and run the required services:
43+
44+
```bash
45+
docker compose -f docker-compose.dev.yml up -d
46+
```
47+
48+
3. Open [localhost:8080](http://localhost:8080/) to access the Temporal dashboard.
49+
50+
## Usage Examples
51+
52+
### Running a Telegram Summary Workflow
53+
54+
To fetch summaries for a specific community and date range:
55+
56+
```python
57+
from temporalio.client import Client
58+
from hivemind_summarizer.workflows import TelegramSummariesWorkflow
59+
from hivemind_summarizer.schema import TelegramFetchSummariesWorkflowInput
60+
61+
async def run_telegram_workflow():
62+
client = await Client.connect("localhost:7233")
63+
64+
# Create workflow input
65+
input_data = TelegramFetchSummariesWorkflowInput(
66+
platform_id="your_platform_id",
67+
community_id="your_community_id",
68+
start_date="2023-05-01",
69+
end_date="2023-05-07",
70+
extract_text_only=True
71+
)
72+
73+
# Execute workflow
74+
result = await client.execute_workflow(
75+
TelegramSummariesWorkflow.run,
76+
input_data,
77+
id="telegram-summaries-workflow",
78+
task_queue="your_task_queue"
79+
)
80+
81+
return result
82+
```
83+
84+
### Running a MediaWiki ETL Workflow
85+
86+
To process MediaWiki content for all communities or a specific platform:
87+
88+
```python
89+
from temporalio.client import Client
90+
from hivemind_etl.mediawiki.workflows import MediaWikiETLWorkflow
91+
92+
async def run_mediawiki_workflow(platform_id=None):
93+
client = await Client.connect("localhost:7233")
94+
95+
# Execute workflow for all platforms or a specific one
96+
await client.execute_workflow(
97+
MediaWikiETLWorkflow.run,
98+
platform_id, # Pass None to process all platforms
99+
id="mediawiki-etl-workflow",
100+
task_queue="your_task_queue"
101+
)
102+
```
103+
104+
### Running a Website Ingestion Workflow
105+
106+
To ingest content from websites:
107+
108+
```python
109+
from temporalio.client import Client
110+
from hivemind_etl.website.workflows import WebsiteIngestionSchedulerWorkflow
111+
112+
async def run_website_workflow(platform_id=None):
113+
client = await Client.connect("localhost:7233")
114+
115+
# Execute workflow for all communities or a specific one
116+
await client.execute_workflow(
117+
WebsiteIngestionSchedulerWorkflow.run,
118+
platform_id, # Pass None to process all platforms
119+
id="website-ingestion-workflow",
120+
task_queue="your_task_queue"
121+
)
122+
```
123+
124+
## Development
125+
126+
To run the worker locally:
18127

19-
```bash
20-
docker compose -f docker-compose.dev.yml up -d
21-
```
128+
```bash
129+
python worker.py
130+
```
22131

23-
3. Open [localhost:8080](http://localhost:8080/) and check temporal dashboard.
132+
This will start a worker that connects to Temporal and listens for tasks on the configured task queue.

hivemind_summarizer/__init__.py

Whitespace-only changes.

hivemind_summarizer/activities.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
import json
2+
import logging
3+
from typing import Any
4+
from datetime import datetime, timedelta
5+
6+
from bson import ObjectId
7+
from tc_hivemind_backend.db.qdrant import QdrantSingleton
8+
from tc_hivemind_backend.db.mongo import MongoSingleton
9+
10+
from temporalio import activity, workflow
11+
from qdrant_client.models import Filter, FieldCondition, MatchValue
12+
13+
with workflow.unsafe.imports_passed_through():
14+
from hivemind_summarizer.schema import (
15+
TelegramSummariesActivityInput,
16+
TelegramSummariesRangeActivityInput,
17+
TelegramGetCollectionNameInput,
18+
)
19+
20+
21+
def extract_summary_text(node_content: dict[str, Any]) -> str:
22+
"""
23+
Extract the actual summary text from the node_content.
24+
25+
Parameters
26+
----------
27+
node_content : dict[str, Any]
28+
The parsed node_content object
29+
30+
Returns
31+
-------
32+
str
33+
The extracted summary text
34+
"""
35+
# Based on the example provided, the text is in the "text" field
36+
if isinstance(node_content, dict) and "text" in node_content:
37+
return node_content["text"]
38+
39+
return "Summary text not found"
40+
41+
42+
@activity.defn
43+
async def get_collection_name(input: TelegramGetCollectionNameInput) -> str:
44+
"""
45+
Activity that extracts collection name from MongoDB based on platform_id and community_id.
46+
47+
Parameters
48+
----------
49+
input: TelegramGetCollectionNameInput
50+
Input object containing platform_id and community_id
51+
52+
Returns
53+
-------
54+
str
55+
The collection name in format [communityId]_[platformName]_summary
56+
57+
Raises
58+
------
59+
Exception
60+
If platform not found or error occurs during DB access
61+
"""
62+
platform_id = input.platform_id
63+
community_id = input.community_id
64+
65+
logging.info(
66+
f"Getting collection name for platform_id: {platform_id}, community_id: {community_id}"
67+
)
68+
69+
try:
70+
# Get MongoDB client
71+
mongo_client = MongoSingleton.get_instance().get_client()
72+
73+
# Query the platform from Core database
74+
platform = mongo_client["Core"]["platforms"].find_one(
75+
{"_id": ObjectId(platform_id)}
76+
)
77+
78+
if not platform:
79+
raise Exception(f"Platform with ID {platform_id} not found")
80+
81+
# Extract platform name
82+
platform_name = platform.get("name")
83+
if not platform_name:
84+
raise Exception(f"Platform name not found for platform_id {platform_id}")
85+
86+
# Construct collection name
87+
collection_name = f"{community_id}_{platform_name}_summary"
88+
89+
logging.info(f"Generated collection name: {collection_name}")
90+
return collection_name
91+
92+
except Exception as e:
93+
logging.error(f"Error getting collection name: {str(e)}")
94+
raise
95+
96+
97+
@activity.defn
98+
async def fetch_telegram_summaries_by_date(
99+
input: TelegramSummariesActivityInput,
100+
) -> list[dict[str, Any]] | str:
101+
"""
102+
Activity that fetches Telegram summaries for a specific date from Qdrant.
103+
104+
Parameters
105+
----------
106+
input : TelegramSummariesActivityInput
107+
Input object containing date, collection_name and extract_text_only
108+
109+
Returns
110+
-------
111+
list[dict[str, Any]] | str
112+
A list of summary objects for the specified date or a string of summaries
113+
"""
114+
date = input.date
115+
extract_text_only = input.extract_text_only
116+
collection_name = input.collection_name
117+
118+
logging.info("Started fetch_telegram_summaries_by_date!")
119+
if not collection_name:
120+
raise ValueError("Collection name is required but was not provided")
121+
122+
logging.info(
123+
f"Fetching summaries for date: {date} from collection: {collection_name}"
124+
)
125+
126+
try:
127+
# Get Qdrant client
128+
qdrant_client = QdrantSingleton.get_instance().get_client()
129+
130+
# Create filter for the specified date
131+
filter_conditions = [FieldCondition(key="date", match=MatchValue(value=date))]
132+
133+
date_filter = Filter(must=filter_conditions)
134+
135+
# Query Qdrant for all summaries matching the date using the provided collection name
136+
search_results = qdrant_client.search(
137+
collection_name=collection_name,
138+
query_vector=[0] * 1024,
139+
query_filter=date_filter,
140+
limit=100,
141+
with_payload=True,
142+
with_vectors=False,
143+
)
144+
145+
summaries = []
146+
for point in search_results:
147+
# Extract the summary data from each point
148+
summary_data = point.payload
149+
150+
# If _node_content is a JSON string, parse it
151+
if "_node_content" in summary_data and isinstance(
152+
summary_data["_node_content"], str
153+
):
154+
try:
155+
node_content = json.loads(summary_data["_node_content"])
156+
if extract_text_only:
157+
summary_data = extract_summary_text(node_content)
158+
else:
159+
summary_data["parsed_content"] = node_content
160+
summary_data["summary_text"] = extract_summary_text(
161+
node_content
162+
)
163+
except json.JSONDecodeError:
164+
logging.warning(
165+
f"Failed to parse _node_content as JSON for point with date {date}"
166+
)
167+
168+
summaries.append(summary_data)
169+
170+
logging.info(
171+
f"Found {len(summaries)} summaries for date {date} in collection {collection_name}"
172+
)
173+
return "\n".join(summaries) if extract_text_only else summaries
174+
175+
except Exception as e:
176+
logging.error(
177+
f"Error fetching summaries for date {date} from collection {collection_name}: {str(e)}"
178+
)
179+
raise
180+
181+
182+
@activity.defn
183+
async def fetch_telegram_summaries_by_date_range(
184+
input: TelegramSummariesRangeActivityInput,
185+
) -> dict[str, list[dict[str, Any] | str]]:
186+
"""
187+
Activity that fetches Telegram summaries for a range of dates from Qdrant.
188+
189+
Parameters
190+
----------
191+
input : TelegramSummariesRangeActivityInput
192+
Input object containing start_date, end_date, collection_name and extract_text_only
193+
194+
Returns
195+
-------
196+
dict[str, list[dict[str, Any] | str]]
197+
A dictionary mapping dates to lists of summary objects or a string of summaries
198+
199+
Raises
200+
------
201+
ValueError
202+
If end_date is before start_date or collection_name is not provided
203+
"""
204+
start_date = input.start_date
205+
end_date = input.end_date
206+
extract_text_only = input.extract_text_only
207+
collection_name = input.collection_name
208+
209+
if not collection_name:
210+
raise ValueError("Collection name is required but was not provided")
211+
212+
logging.info(
213+
f"Fetching summaries for date range: {start_date} to {end_date} from collection: {collection_name}"
214+
)
215+
216+
try:
217+
# Parse the date strings to datetime objects
218+
start = datetime.strptime(start_date, "%Y-%m-%d").date()
219+
end = datetime.strptime(end_date, "%Y-%m-%d").date()
220+
221+
# Validate that end_date is not before start_date
222+
if end < start:
223+
raise ValueError("End date cannot be before start date")
224+
225+
# Calculate all dates in the range
226+
date_range = []
227+
current = start
228+
while current <= end:
229+
date_range.append(current.strftime("%Y-%m-%d"))
230+
current += timedelta(days=1)
231+
232+
# Fetch summaries for each date
233+
result = {}
234+
for date in date_range:
235+
date_input = TelegramSummariesActivityInput(
236+
date=date,
237+
extract_text_only=extract_text_only,
238+
collection_name=collection_name,
239+
)
240+
summaries = await fetch_telegram_summaries_by_date(date_input)
241+
result[date] = summaries
242+
243+
return result
244+
245+
except Exception as e:
246+
logging.error(
247+
f"Error fetching summaries for date range {start_date} to {end_date} from collection {collection_name}: {str(e)}"
248+
)
249+
raise

0 commit comments

Comments
 (0)