Skip to content

Commit 525bae0

Browse files
authored
Merge pull request #36 from TogetherCrew/feat/35-ingestion-workflow
feat: add simple ingestion workflow and document processing activities
2 parents a2b8526 + a6463ff commit 525bae0

File tree

7 files changed

+139
-15
lines changed

7 files changed

+139
-15
lines changed

hivemind_etl/activities.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
transform_mediawiki_data,
1414
load_mediawiki_data,
1515
)
16+
from hivemind_etl.simple_ingestion.pipeline import (
17+
process_document,
18+
)
1619

1720
from temporalio import activity
1821

hivemind_etl/simple_ingestion/__init__.py

Whitespace-only changes.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from datetime import timedelta
2+
3+
from temporalio import activity, workflow
4+
from temporalio.common import RetryPolicy
5+
from temporalio.workflow import execute_activity
6+
from .schema import IngestionRequest
7+
8+
with workflow.unsafe.imports_passed_through():
9+
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
10+
from llama_index.core import Document
11+
12+
13+
@workflow.defn
14+
class IngestionWorkflow:
15+
"""A Temporal workflow for processing document ingestion requests.
16+
17+
This workflow handles the orchestration of document processing activities,
18+
including retry logic and timeout configurations.
19+
"""
20+
21+
@workflow.run
22+
async def run(self, ingestion_request: IngestionRequest) -> None:
23+
"""Execute the ingestion workflow.
24+
25+
Parameters
26+
----------
27+
ingestion_request : IngestionRequest
28+
The request containing all necessary information for document processing,
29+
including community ID, platform ID, text content, and metadata.
30+
31+
Notes
32+
-----
33+
The workflow implements a retry policy with the following configuration:
34+
- Initial retry interval: 1 second
35+
- Maximum retry interval: 1 minute
36+
- Maximum retry attempts: 3
37+
- Activity timeout: 5 minutes
38+
"""
39+
retry_policy = RetryPolicy(
40+
initial_interval=timedelta(seconds=1),
41+
maximum_interval=timedelta(minutes=1),
42+
maximum_attempts=3,
43+
)
44+
45+
await execute_activity(
46+
process_document,
47+
ingestion_request,
48+
retry_policy=retry_policy,
49+
start_to_close_timeout=timedelta(minutes=5),
50+
)
51+
52+
53+
@activity.defn
54+
async def process_document(
55+
ingestion_request: IngestionRequest,
56+
) -> None:
57+
"""Process the document according to the ingestion request specifications.
58+
59+
Parameters
60+
----------
61+
ingestion_request : IngestionRequest
62+
The request containing all necessary information for document processing,
63+
including community ID, platform ID, text content, and metadata.
64+
65+
Notes
66+
-----
67+
This activity will be implemented by the user to handle the actual document
68+
processing logic, including any necessary embedding or LLM operations.
69+
"""
70+
if ingestion_request.collectionName is None:
71+
collection_name = (
72+
f"{ingestion_request.communityId}_{ingestion_request.platformId}"
73+
)
74+
else:
75+
collection_name = ingestion_request.collectionName
76+
77+
# Initialize the ingestion pipeline
78+
pipeline = CustomIngestionPipeline(
79+
community_id=ingestion_request.communityId,
80+
collection_name=collection_name,
81+
)
82+
83+
document = Document(
84+
doc_id=ingestion_request.docId,
85+
text=ingestion_request.text,
86+
metadata=ingestion_request.metadata,
87+
)
88+
89+
pipeline.run_pipeline(docs=[document])
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from pydantic import BaseModel
2+
from uuid import uuid4
3+
4+
5+
class IngestionRequest(BaseModel):
6+
"""A model representing an ingestion request for document processing.
7+
8+
Parameters
9+
----------
10+
communityId : str
11+
The unique identifier of the community.
12+
platformId : str
13+
The unique identifier of the platform.
14+
text : str
15+
The text content to be processed.
16+
metadata : dict
17+
Additional metadata associated with the document.
18+
docId : str, optional
19+
Unique identifier for the document. If not provided, a UUID will be generated.
20+
Default is a new UUID.
21+
excludedEmbedMetadataKeys : list[str], optional
22+
List of metadata keys to exclude from embedding process.
23+
Default is an empty list.
24+
excludedLlmMetadataKeys : list[str], optional
25+
List of metadata keys to exclude from LLM processing.
26+
Default is an empty list.
27+
collectionName : str | None, optional
28+
The name of the collection to use for the document.
29+
Default is `None` means it would follow the default pattern of `[communityId]_[platformId]`
30+
"""
31+
32+
communityId: str
33+
platformId: str
34+
text: str
35+
metadata: dict
36+
docId: str = str(uuid4())
37+
excludedEmbedMetadataKeys: list[str] = []
38+
excludedLlmMetadataKeys: list[str] = []
39+
collectionName: str | None = None

hivemind_etl/website/website_etl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def __init__(
1919
the community to save its data
2020
platform_id : str
2121
the platform to save its data
22-
22+
2323
Note: the collection name would be `community_id_platform_id`
2424
"""
2525
if not community_id or not isinstance(community_id, str):

registry.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
get_hivemind_mediawiki_platforms,
99
transform_mediawiki_data,
1010
load_mediawiki_data,
11+
process_document,
1112
)
1213
from hivemind_summarizer.activities import (
1314
fetch_platform_summaries_by_date,
@@ -16,18 +17,18 @@
1617
)
1718
from workflows import (
1819
CommunityWebsiteWorkflow,
19-
SayHello,
2020
WebsiteIngestionSchedulerWorkflow,
2121
MediaWikiETLWorkflow,
2222
PlatformSummariesWorkflow,
23+
IngestionWorkflow,
2324
)
2425

2526
WORKFLOWS = [
2627
CommunityWebsiteWorkflow,
27-
SayHello,
2828
WebsiteIngestionSchedulerWorkflow,
2929
MediaWikiETLWorkflow,
3030
PlatformSummariesWorkflow,
31+
IngestionWorkflow,
3132
]
3233

3334
ACTIVITIES = [
@@ -43,4 +44,5 @@
4344
fetch_platform_summaries_by_date,
4445
fetch_platform_summaries_by_date_range,
4546
get_platform_name,
47+
process_document,
4648
]

workflows.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,13 @@
1010
from hivemind_etl.mediawiki.workflows import (
1111
MediaWikiETLWorkflow,
1212
)
13+
from hivemind_etl.simple_ingestion.pipeline import (
14+
IngestionWorkflow,
15+
)
1316
from hivemind_summarizer.workflows import PlatformSummariesWorkflow
1417

1518
from temporalio import workflow
1619

1720
# Configure logging
1821
logging.basicConfig(level=logging.INFO)
1922
logger = logging.getLogger(__name__)
20-
21-
22-
# For test purposes
23-
# To be deleted in future
24-
@workflow.defn
25-
class SayHello:
26-
@workflow.run
27-
async def run(self) -> int:
28-
logger.info(f"Hello at time {workflow.now()}!")
29-
return await workflow.start_activity(
30-
say_hello, start_to_close_timeout=timedelta(seconds=5)
31-
)

0 commit comments

Comments
 (0)