Skip to content

Commit 7bb6ef5

Browse files
authored
Merge pull request #44 from TogetherCrew/feat/43-mediawiki-activities-limit
feat: Added s3 support for mediaWiki ETL!
2 parents 239cdb7 + f526f51 commit 7bb6ef5

File tree

7 files changed

+238
-19
lines changed

7 files changed

+238
-19
lines changed

.env.example

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,11 @@ POSTGRES_PWD=
2828
POSTGRES_SEEDS=
2929
POSTGRES_USER=
3030

31-
PROXY_URL=
31+
PROXY_URL=
32+
33+
AWS_ENDPOINT_URL=
34+
AWS_ACCESS_KEY_ID=
35+
AWS_SECRET_ACCESS_KEY=
36+
AWS_S3_BUCKET=
37+
AWS_REGION=
38+
AWS_SECURE=

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,7 @@ main.ipynb
165165

166166
*.xml
167167

168-
dump_*
168+
dump_*
169+
170+
minio_data/
171+
dumps/*

docker-compose.dev.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,17 @@ services:
1313
condition: service_healthy
1414
redis:
1515
condition: service_healthy
16+
minio:
17+
condition: service_healthy
1618
networks:
1719
- temporal-network
20+
environment:
21+
- AWS_ENDPOINT_URL=http://minio:9000
22+
- AWS_ACCESS_KEY_ID=minioadmin
23+
- AWS_SECRET_ACCESS_KEY=minioadmin
24+
- AWS_S3_BUCKET=hivemind-etl
25+
- AWS_REGION=us-east-1
26+
- AWS_SECURE=false
1827

1928
temporal:
2029
image: temporalio/auto-setup:1.25.2.0
@@ -120,6 +129,38 @@ services:
120129
networks:
121130
- temporal-network
122131

132+
minio:
133+
image: minio/minio:RELEASE.2025-04-22T22-12-26Z
134+
ports:
135+
- "9000:9000" # API
136+
- "9001:9001" # Console
137+
environment:
138+
MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-minioadmin}
139+
MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-minioadmin}
140+
volumes:
141+
- ./minio_data:/data
142+
command: server /data --console-address ":9001"
143+
healthcheck:
144+
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
145+
interval: 30s
146+
timeout: 20s
147+
retries: 3
148+
networks:
149+
- temporal-network
150+
151+
minio-healthcheck:
152+
image: curlimages/curl:8.11.0
153+
entrypoint: ["/bin/sh", "-c", "--", "while true; do sleep 30; done;"]
154+
depends_on:
155+
- minio
156+
healthcheck:
157+
test: ["CMD", "curl", "-f", "http://minio:9000/minio/health/live"]
158+
interval: 10s
159+
timeout: 2s
160+
retries: 5
161+
networks:
162+
- temporal-network
163+
123164
networks:
124165
temporal-network:
125166
driver: bridge

hivemind_etl/mediawiki/activities.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
with workflow.unsafe.imports_passed_through():
77
from hivemind_etl.mediawiki.module import ModulesMediaWiki
88
from hivemind_etl.mediawiki.etl import MediawikiETL
9+
from hivemind_etl.storage.s3_client import S3Client
910
from llama_index.core import Document
1011

1112

@@ -53,7 +54,9 @@ async def get_hivemind_mediawiki_platforms(
5354

5455
@activity.defn
5556
async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
56-
"""Extract data from MediaWiki API URL."""
57+
"""
58+
Extract data from MediaWiki API URL
59+
"""
5760
try:
5861
community_id = mediawiki_platform["community_id"]
5962
api_url = mediawiki_platform["base_url"]
@@ -69,7 +72,8 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
6972
platform_id=platform_id,
7073
)
7174
mediawiki_etl.extract(api_url=api_url)
72-
logging.info(f"Completed extraction for community {community_id}")
75+
76+
logging.info(f"Completed extraction for community {community_id}!")
7377
except Exception as e:
7478
community_id = mediawiki_platform["community_id"]
7579
logging.error(f"Error in extraction for community {community_id}: {str(e)}")
@@ -79,9 +83,20 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
7983
@activity.defn
8084
async def transform_mediawiki_data(
8185
mediawiki_platform: dict[str, Any],
82-
) -> list[Document]:
83-
"""Transform the extracted MediaWiki data."""
86+
) -> str:
87+
"""
88+
Transform the extracted MediaWiki data and store in S3.
8489
90+
Parameters
91+
----------
92+
mediawiki_platform : dict[str, Any]
93+
The platform configuration
94+
95+
Returns
96+
-------
97+
str
98+
The S3 key where the transformed data is stored
99+
"""
85100
community_id = mediawiki_platform["community_id"]
86101
platform_id = mediawiki_platform["platform_id"]
87102
try:
@@ -93,25 +108,51 @@ async def transform_mediawiki_data(
93108
namespaces=namespaces,
94109
platform_id=platform_id,
95110
)
96-
result = mediawiki_etl.transform()
97-
logging.info(f"Completed transformation for community {community_id}")
98-
return result
111+
112+
# Transform data using the extracted data from S3
113+
documents = mediawiki_etl.transform()
114+
115+
s3_client = S3Client()
116+
# Store transformed data in S3
117+
transformed_key = s3_client.store_transformed_data(community_id, documents)
118+
119+
logging.info(
120+
f"Completed transformation for community {community_id} and stored in S3 with key: {transformed_key}"
121+
)
122+
return transformed_key
99123
except Exception as e:
100124
logging.error(f"Error in transformation for community {community_id}: {str(e)}")
101125
raise
102126

103127

104128
@activity.defn
105-
async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
106-
"""Load the transformed MediaWiki data into the database."""
129+
async def load_mediawiki_data(
130+
mediawiki_platform: dict[str, Any],
131+
) -> None:
132+
"""
133+
Load the transformed MediaWiki data into the database.
134+
135+
Parameters
136+
----------
137+
mediawiki_platform : dict[str, Any]
138+
The platform configuration
139+
"""
107140
community_id = mediawiki_platform["community_id"]
108141
platform_id = mediawiki_platform["platform_id"]
109142
namespaces = mediawiki_platform["namespaces"]
143+
transformed_data_key = mediawiki_platform["transformed_data_key"]
110144

111145
try:
112-
documents_dict = mediawiki_platform["documents"]
113-
# temporal had converted them to dicts, so we need to convert them back to Document objects
114-
documents = [Document.from_dict(doc) for doc in documents_dict]
146+
# Get transformed data from S3
147+
s3_client = S3Client()
148+
transformed_data = s3_client.get_data_by_key(transformed_data_key)
149+
if not transformed_data:
150+
raise ValueError(
151+
f"No transformed data found in S3 for community {community_id}"
152+
)
153+
154+
# Convert dict data back to Document objects
155+
documents = [Document.from_dict(doc) for doc in transformed_data]
115156

116157
logging.info(f"Starting data load for community {community_id}")
117158
mediawiki_etl = MediawikiETL(

hivemind_etl/mediawiki/workflows.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ async def run(self, platform_id: str | None = None) -> None:
4646
"namespaces": platform["namespaces"],
4747
"platform_id": platform["platform_id"],
4848
}
49-
# Extract data from MediaWiki
49+
50+
# Extract data from MediaWiki and store in S3
5051
await workflow.execute_activity(
5152
extract_mediawiki,
5253
mediawiki_platform,
@@ -57,8 +58,8 @@ async def run(self, platform_id: str | None = None) -> None:
5758
),
5859
)
5960

60-
# Transform the extracted data
61-
documents = await workflow.execute_activity(
61+
# Transform the extracted data and store in S3
62+
transformed_data_key = await workflow.execute_activity(
6263
transform_mediawiki_data,
6364
mediawiki_platform,
6465
start_to_close_timeout=timedelta(hours=6),
@@ -68,8 +69,8 @@ async def run(self, platform_id: str | None = None) -> None:
6869
),
6970
)
7071

71-
mediawiki_platform["documents"] = documents
72-
# Load the transformed data
72+
mediawiki_platform["transformed_data_key"] = transformed_data_key
73+
# Load the transformed data from S3
7374
await workflow.execute_activity(
7475
load_mediawiki_data,
7576
mediawiki_platform,

hivemind_etl/storage/s3_client.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import os
2+
import json
3+
import logging
4+
from datetime import datetime, timezone
5+
from typing import Any, Dict, List
6+
7+
import boto3
8+
from botocore.config import Config
9+
from botocore.exceptions import ClientError
10+
from llama_index.core import Document
11+
12+
13+
class S3Client:
14+
def __init__(self):
15+
# Get AWS S3 environment variables
16+
self.endpoint_url = os.getenv("AWS_ENDPOINT_URL")
17+
self.access_key = os.getenv("AWS_ACCESS_KEY_ID")
18+
self.secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
19+
self.bucket_name = os.getenv("AWS_S3_BUCKET")
20+
self.region = os.getenv("AWS_REGION")
21+
self.secure = os.getenv("AWS_SECURE", "true").lower() == "true"
22+
23+
# Check each required variable and log if missing
24+
missing_vars = []
25+
if not self.endpoint_url:
26+
missing_vars.append("AWS_ENDPOINT_URL")
27+
if not self.access_key:
28+
missing_vars.append("AWS_ACCESS_KEY_ID")
29+
if not self.secret_key:
30+
missing_vars.append("AWS_SECRET_ACCESS_KEY")
31+
if not self.bucket_name:
32+
missing_vars.append("AWS_S3_BUCKET")
33+
if not self.region:
34+
missing_vars.append("AWS_REGION")
35+
36+
if missing_vars:
37+
error_msg = (
38+
f"Missing required environment variables: {', '.join(missing_vars)}"
39+
)
40+
logging.error(error_msg)
41+
raise ValueError(error_msg)
42+
43+
logging.info(
44+
f"Initializing S3 client with endpoint: {self.endpoint_url}, "
45+
f"bucket: {self.bucket_name}, region: {self.region}, secure: {self.secure}"
46+
)
47+
48+
# Configure S3 client
49+
config = Config(
50+
signature_version="s3v4",
51+
region_name=self.region,
52+
)
53+
54+
self.s3_client = boto3.client(
55+
"s3",
56+
endpoint_url=self.endpoint_url,
57+
aws_access_key_id=self.access_key,
58+
aws_secret_access_key=self.secret_key,
59+
config=config,
60+
verify=self.secure,
61+
)
62+
63+
# Ensure bucket exists
64+
try:
65+
self.s3_client.head_bucket(Bucket=self.bucket_name)
66+
logging.info(f"Successfully connected to bucket: {self.bucket_name}")
67+
except ClientError as e:
68+
if e.response["Error"]["Code"] == "404":
69+
logging.info(f"Creating bucket: {self.bucket_name}")
70+
self.s3_client.create_bucket(
71+
Bucket=self.bucket_name,
72+
CreateBucketConfiguration={"LocationConstraint": self.region},
73+
)
74+
logging.info(f"Successfully created bucket: {self.bucket_name}")
75+
else:
76+
logging.error(f"Error accessing bucket {self.bucket_name}: {str(e)}")
77+
raise
78+
79+
def _get_key(self, community_id: str, activity_type: str, timestamp: str) -> str:
80+
"""Generate a unique S3 key for the data."""
81+
return f"{community_id}/{activity_type}/{timestamp}.json"
82+
83+
def store_extracted_data(self, community_id: str, data: Dict[str, Any]) -> str:
84+
"""Store extracted data in S3."""
85+
timestamp = datetime.now(tz=timezone.utc).isoformat()
86+
key = self._get_key(community_id, "extracted", timestamp)
87+
88+
self.s3_client.put_object(
89+
Bucket=self.bucket_name,
90+
Key=key,
91+
Body=json.dumps(data),
92+
ContentType="application/json",
93+
)
94+
return key
95+
96+
def store_transformed_data(
97+
self, community_id: str, documents: List[Document]
98+
) -> str:
99+
"""Store transformed documents in S3."""
100+
timestamp = datetime.now(tz=timezone.utc).isoformat()
101+
key = self._get_key(community_id, "transformed", timestamp)
102+
103+
# Convert Documents to dict for JSON serialization
104+
docs_data = [doc.to_dict() for doc in documents]
105+
106+
self.s3_client.put_object(
107+
Bucket=self.bucket_name,
108+
Key=key,
109+
Body=json.dumps(docs_data),
110+
ContentType="application/json",
111+
)
112+
return key
113+
114+
def get_data_by_key(self, key: str) -> Dict[str, Any]:
115+
"""Get data from S3 using a specific key."""
116+
try:
117+
obj = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
118+
return json.loads(obj["Body"].read().decode("utf-8"))
119+
except ClientError as e:
120+
if e.response["Error"]["Code"] == "NoSuchKey":
121+
logging.error(f"No data found for key: {key}")
122+
raise ValueError(f"No data found for key: {key}")
123+
logging.error(f"Error retrieving data for key {key}: {str(e)}")
124+
raise

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ pydantic==2.9.2
99
motor>=3.6, <4.0.0
1010
tc-temporal-backend==1.0.0
1111
wikiteam3-fork-proxy==1.0.0
12+
boto3>=1.38.19
13+
botocore>=1.38.19

0 commit comments

Comments
 (0)