Skip to content

Commit a2b8526

Browse files
authored
Merge pull request #34 from TogetherCrew/feat/update-qdrant-col-name-plt-id
feat: update qdrant platform data naming!
2 parents 40a8f51 + 64c27a6 commit a2b8526

File tree

10 files changed

+95
-30
lines changed

10 files changed

+95
-30
lines changed

hivemind_etl/mediawiki/activities.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,16 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
5858
community_id = mediawiki_platform["community_id"]
5959
api_url = mediawiki_platform["base_url"]
6060
namespaces = mediawiki_platform["namespaces"]
61+
platform_id = mediawiki_platform["platform_id"]
6162

6263
logging.info(
6364
f"Starting extraction for community {community_id} with API URL: {api_url}"
6465
)
65-
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
66+
mediawiki_etl = MediawikiETL(
67+
community_id=community_id,
68+
namespaces=namespaces,
69+
platform_id=platform_id,
70+
)
6671
mediawiki_etl.extract(api_url=api_url)
6772
logging.info(f"Completed extraction for community {community_id}")
6873
except Exception as e:
@@ -78,11 +83,16 @@ async def transform_mediawiki_data(
7883
"""Transform the extracted MediaWiki data."""
7984

8085
community_id = mediawiki_platform["community_id"]
86+
platform_id = mediawiki_platform["platform_id"]
8187
try:
8288
namespaces = mediawiki_platform["namespaces"]
8389

8490
logging.info(f"Starting transformation for community {community_id}")
85-
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
91+
mediawiki_etl = MediawikiETL(
92+
community_id=community_id,
93+
namespaces=namespaces,
94+
platform_id=platform_id,
95+
)
8696
result = mediawiki_etl.transform()
8797
logging.info(f"Completed transformation for community {community_id}")
8898
return result
@@ -95,6 +105,7 @@ async def transform_mediawiki_data(
95105
async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
96106
"""Load the transformed MediaWiki data into the database."""
97107
community_id = mediawiki_platform["community_id"]
108+
platform_id = mediawiki_platform["platform_id"]
98109
namespaces = mediawiki_platform["namespaces"]
99110

100111
try:
@@ -103,7 +114,11 @@ async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
103114
documents = [Document.from_dict(doc) for doc in documents_dict]
104115

105116
logging.info(f"Starting data load for community {community_id}")
106-
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
117+
mediawiki_etl = MediawikiETL(
118+
community_id=community_id,
119+
namespaces=namespaces,
120+
platform_id=platform_id,
121+
)
107122
mediawiki_etl.load(documents=documents)
108123
logging.info(f"Completed data load for community {community_id}")
109124
except Exception as e:

hivemind_etl/mediawiki/etl.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ def __init__(
1313
self,
1414
community_id: str,
1515
namespaces: list[int],
16+
platform_id: str,
1617
delete_dump_after_load: bool = True,
1718
) -> None:
1819
self.community_id = community_id
20+
self.platform_id = platform_id
1921

2022
self.proxy_url = os.getenv("PROXY_URL", "")
2123
if self.proxy_url:
@@ -96,7 +98,7 @@ def transform(self) -> list[Document]:
9698
def load(self, documents: list[Document]) -> None:
9799
logging.info(f"Loading {len(documents)} documents into Qdrant!")
98100
ingestion_pipeline = CustomIngestionPipeline(
99-
self.community_id, collection_name="mediawiki"
101+
self.community_id, collection_name=self.platform_id
100102
)
101103
ingestion_pipeline.run_pipeline(documents)
102104
logging.info(f"Loaded {len(documents)} documents into Qdrant!")

hivemind_etl/mediawiki/module.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ def get_learning_platforms(
2828
example data output:
2929
```
3030
[{
31-
"community_id": "6579c364f1120850414e0dc5",
31+
"platform_id": "xxxx",
32+
"community_id": "xxxxxx",
3233
"base_url": "some_api_url",
3334
"namespaces": [1, 2, 3],
3435
}]
@@ -87,6 +88,7 @@ def get_learning_platforms(
8788

8889
communities_data.append(
8990
{
91+
"platform_id": str(platform_id),
9092
"community_id": str(community),
9193
"namespaces": namespaces,
9294
"base_url": base_url + path, # type: ignore

hivemind_etl/website/activities.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@ async def get_hivemind_website_comminities(
4343

4444

4545
@activity.defn
46-
async def extract_website(urls: list[str], community_id: str) -> list[dict]:
46+
async def extract_website(
47+
urls: list[str], community_id: str, platform_id: str
48+
) -> list[dict]:
4749
"""Extract data from website URLs."""
4850
try:
4951
logging.info(
50-
f"Starting extraction for community {community_id} with {len(urls)} URLs"
52+
f"Starting extraction for community {community_id} | platform {platform_id} with {len(urls)} URLs"
5153
)
52-
website_etl = WebsiteETL(community_id=community_id)
54+
website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id)
5355
result = await website_etl.extract(urls=urls)
5456
logging.info(f"Completed extraction for community {community_id}")
5557
return result
@@ -60,12 +62,14 @@ async def extract_website(urls: list[str], community_id: str) -> list[dict]:
6062

6163
@activity.defn
6264
async def transform_website_data(
63-
raw_data: list[dict], community_id: str
65+
raw_data: list[dict], community_id: str, platform_id: str
6466
) -> list[Document]:
6567
"""Transform the extracted raw data."""
6668
try:
67-
logging.info(f"Starting transformation for community {community_id}")
68-
website_etl = WebsiteETL(community_id=community_id)
69+
logging.info(
70+
f"Starting transformation for community {community_id} | platform {platform_id}"
71+
)
72+
website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id)
6973
result = website_etl.transform(raw_data=raw_data)
7074
logging.info(f"Completed transformation for community {community_id}")
7175
return result
@@ -75,11 +79,15 @@ async def transform_website_data(
7579

7680

7781
@activity.defn
78-
async def load_website_data(documents: list[Document], community_id: str) -> None:
82+
async def load_website_data(
83+
documents: list[Document], community_id: str, platform_id: str
84+
) -> None:
7985
"""Load the transformed data into the database."""
8086
try:
81-
logging.info(f"Starting data load for community {community_id}")
82-
website_etl = WebsiteETL(community_id=community_id)
87+
logging.info(
88+
f"Starting data load for community {community_id} | platform {platform_id}"
89+
)
90+
website_etl = WebsiteETL(community_id=community_id, platform_id=platform_id)
8391
website_etl.load(documents=documents)
8492
logging.info(f"Completed data load for community {community_id}")
8593
except Exception as e:

hivemind_etl/website/module.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def get_learning_platforms(
2929
example data output:
3030
```
3131
[{
32-
"community_id": "6579c364f1120850414e0dc5",
33-
"platform_id": "6579c364f1120850414e0dc6",
32+
"community_id": "xxxx",
33+
"platform_id": "xxxxxxx",
3434
"urls": ["link1", "link2"],
3535
}]
3636
```

hivemind_etl/website/website_etl.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,27 @@ class WebsiteETL:
1010
def __init__(
1111
self,
1212
community_id: str,
13+
platform_id: str,
1314
) -> None:
1415
"""
1516
Parameters
1617
-----------
1718
community_id : str
1819
the community to save its data
20+
platform_id : str
21+
the platform to save its data
22+
23+
Note: the collection name would be `community_id_platform_id`
1924
"""
2025
if not community_id or not isinstance(community_id, str):
2126
raise ValueError("community_id must be a non-empty string")
2227

2328
self.community_id = community_id
24-
collection_name = "website"
29+
self.platform_id = platform_id
2530

2631
# preparing the ingestion pipeline
2732
self.ingestion_pipeline = CustomIngestionPipeline(
28-
self.community_id, collection_name=collection_name
33+
self.community_id, collection_name=self.platform_id
2934
)
3035

3136
async def extract(

hivemind_etl/website/workflows.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async def run(self, community_info: dict) -> None:
2929
# Execute activities in sequence with retries
3030
raw_data = await workflow.execute_activity(
3131
extract_website,
32-
args=[urls, community_id],
32+
args=[urls, community_id, platform_id],
3333
start_to_close_timeout=timedelta(minutes=30),
3434
retry_policy=RetryPolicy(
3535
initial_interval=timedelta(seconds=10),
@@ -40,7 +40,7 @@ async def run(self, community_info: dict) -> None:
4040

4141
documents = await workflow.execute_activity(
4242
transform_website_data,
43-
args=[raw_data, community_id],
43+
args=[raw_data, community_id, platform_id],
4444
start_to_close_timeout=timedelta(minutes=10),
4545
retry_policy=RetryPolicy(
4646
initial_interval=timedelta(seconds=5),
@@ -51,7 +51,7 @@ async def run(self, community_info: dict) -> None:
5151

5252
await workflow.execute_activity(
5353
load_website_data,
54-
args=[documents, community_id],
54+
args=[documents, community_id, platform_id],
5555
start_to_close_timeout=timedelta(minutes=60),
5656
retry_policy=RetryPolicy(
5757
initial_interval=timedelta(seconds=5),

tests/integration/test_mediawiki_modules.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def test_get_single_data(self):
6767
[0, 1, 2],
6868
)
6969
self.assertEqual(result[0]["base_url"], "http://example.com/api")
70+
self.assertEqual(result[0]["platform_id"], str(platform_id))
7071

7172
def test_get_mediawiki_communities_data_multiple_platforms(self):
7273
"""
@@ -146,6 +147,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self):
146147
"community_id": str(community_id),
147148
"namespaces": [0, 1, 2],
148149
"base_url": "http://example1.com/api",
150+
"platform_id": str(platform_id1),
149151
},
150152
)
151153
self.assertEqual(
@@ -154,6 +156,7 @@ def test_get_mediawiki_communities_data_multiple_platforms(self):
154156
"community_id": str(community_id),
155157
"namespaces": [3, 4, 5],
156158
"base_url": "http://example2.com/api",
159+
"platform_id": str(platform_id2),
157160
},
158161
)
159162

@@ -237,6 +240,7 @@ def test_get_mediawiki_communities_data_filtered_platforms(self):
237240
"community_id": str(community_id),
238241
"namespaces": [0, 1, 2],
239242
"base_url": "http://example1.com/api",
243+
"platform_id": str(platform_id1),
240244
},
241245
)
242246

@@ -318,5 +322,6 @@ def test_get_mediawiki_communities_data_filtered_platforms_not_activated(self):
318322
"community_id": str(community_id),
319323
"namespaces": [3, 4, 5],
320324
"base_url": "http://example2.com/api",
325+
"platform_id": str(platform_id2),
321326
},
322327
)

tests/unit/test_mediawiki_etl.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def setUp(self):
1212
self.community_id = "test_community"
1313
self.api_url = "https://example.com/api.php"
1414
self.custom_path = "custom/path"
15+
self.platform_id = "test_platform"
1516
self.namespaces = [0, 1] # Main and Talk namespaces
1617

1718
# Create a temporary dumps directory
@@ -26,7 +27,11 @@ def tearDown(self):
2627
shutil.rmtree(self.custom_path)
2728

2829
def test_mediawiki_etl_initialization(self):
29-
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
30+
etl = MediawikiETL(
31+
community_id=self.community_id,
32+
namespaces=self.namespaces,
33+
platform_id=self.platform_id,
34+
)
3035
self.assertEqual(etl.community_id, self.community_id)
3136
self.assertTrue(etl.delete_dump_after_load)
3237
self.assertEqual(etl.dump_dir, f"dumps/{self.community_id}")
@@ -35,12 +40,17 @@ def test_mediawiki_etl_initialization(self):
3540
community_id=self.community_id,
3641
namespaces=self.namespaces,
3742
delete_dump_after_load=False,
43+
platform_id=self.platform_id,
3844
)
3945
self.assertFalse(etl.delete_dump_after_load)
4046

4147
def test_extract_with_default_path(self):
4248
# Create a ETL instance with mocked wikiteam_crawler
43-
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
49+
etl = MediawikiETL(
50+
community_id=self.community_id,
51+
namespaces=self.namespaces,
52+
platform_id=self.platform_id,
53+
)
4454
etl.wikiteam_crawler = Mock()
4555

4656
etl.extract(self.api_url)
@@ -51,7 +61,11 @@ def test_extract_with_default_path(self):
5161

5262
def test_extract_with_custom_path(self):
5363
# Create a ETL instance with mocked wikiteam_crawler
54-
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
64+
etl = MediawikiETL(
65+
community_id=self.community_id,
66+
namespaces=self.namespaces,
67+
platform_id=self.platform_id,
68+
)
5569
etl.wikiteam_crawler = Mock()
5670

5771
etl.extract(self.api_url, self.custom_path)
@@ -63,7 +77,11 @@ def test_extract_with_custom_path(self):
6377

6478
@patch("hivemind_etl.mediawiki.etl.parse_mediawiki_xml")
6579
def test_transform_success(self, mock_parse_mediawiki_xml):
66-
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
80+
etl = MediawikiETL(
81+
community_id=self.community_id,
82+
namespaces=self.namespaces,
83+
platform_id=self.platform_id,
84+
)
6785

6886
# Mock page data
6987
mock_page = Mock()
@@ -98,7 +116,11 @@ def test_transform_success(self, mock_parse_mediawiki_xml):
98116
@patch("hivemind_etl.mediawiki.etl.logging")
99117
@patch("hivemind_etl.mediawiki.etl.parse_mediawiki_xml")
100118
def test_transform_error_handling(self, mock_parse_mediawiki_xml, mock_logging):
101-
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
119+
etl = MediawikiETL(
120+
community_id=self.community_id,
121+
namespaces=self.namespaces,
122+
platform_id=self.platform_id,
123+
)
102124

103125
# Mock page that will raise an exception
104126
mock_page = Mock()
@@ -122,7 +144,11 @@ def get_attribute_error(*args, **kwargs):
122144

123145
@patch("hivemind_etl.mediawiki.etl.CustomIngestionPipeline")
124146
def test_load_with_dump_deletion(self, mock_ingestion_pipeline_class):
125-
etl = MediawikiETL(community_id=self.community_id, namespaces=self.namespaces)
147+
etl = MediawikiETL(
148+
community_id=self.community_id,
149+
namespaces=self.namespaces,
150+
platform_id=self.platform_id,
151+
)
126152
documents = [Document(text="Test content")]
127153

128154
# Setup the mock
@@ -138,7 +164,7 @@ def test_load_with_dump_deletion(self, mock_ingestion_pipeline_class):
138164

139165
# Verify that methods were called correctly
140166
mock_ingestion_pipeline_class.assert_called_once_with(
141-
self.community_id, collection_name="mediawiki"
167+
self.community_id, collection_name=self.platform_id
142168
)
143169
mock_pipeline.run_pipeline.assert_called_once_with(documents)
144170
self.assertFalse(os.path.exists(etl.dump_dir))
@@ -149,6 +175,7 @@ def test_load_without_dump_deletion(self, mock_ingestion_pipeline_class):
149175
community_id=self.community_id,
150176
namespaces=self.namespaces,
151177
delete_dump_after_load=False,
178+
platform_id=self.platform_id,
152179
)
153180
documents = [Document(text="Test content")]
154181

@@ -165,7 +192,7 @@ def test_load_without_dump_deletion(self, mock_ingestion_pipeline_class):
165192

166193
# Verify that methods were called correctly
167194
mock_ingestion_pipeline_class.assert_called_once_with(
168-
self.community_id, collection_name="mediawiki"
195+
self.community_id, collection_name=self.platform_id
169196
)
170197
mock_pipeline.run_pipeline.assert_called_once_with(documents)
171198
self.assertTrue(os.path.exists(etl.dump_dir))

tests/unit/test_website_etl.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ def setUp(self):
1414
"""
1515
load_dotenv()
1616
self.community_id = "test_community"
17-
self.website_etl = WebsiteETL(self.community_id)
17+
self.platform_id = "test_platform"
18+
self.website_etl = WebsiteETL(self.community_id, self.platform_id)
1819
self.website_etl.crawlee_client = AsyncMock()
1920
self.website_etl.ingestion_pipeline = MagicMock()
2021

0 commit comments

Comments
 (0)