Skip to content

Commit d61e8d4

Browse files
authored
Merge pull request #62 from TogetherCrew/feat/batch-vector-ingestion-workflow
feat: add batch processing capabilities with BatchVectorIngestionWork…
2 parents 3745b99 + da30715 commit d61e8d4

File tree

6 files changed

+335
-1
lines changed

6 files changed

+335
-1
lines changed

hivemind_etl/activities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from hivemind_etl.simple_ingestion.pipeline import (
1717
process_document,
18+
process_documents_batch,
1819
)
1920

2021
from temporalio import activity
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# Simple Ingestion Workflows
2+
3+
This module provides Temporal workflows for ingesting documents into the vector database (Qdrant) using the TC Hivemind backend.
4+
5+
## Available Workflows
6+
7+
### 1. VectorIngestionWorkflow
8+
9+
A workflow for processing single document ingestion requests.
10+
11+
**Usage:**
12+
```python
13+
from hivemind_etl.simple_ingestion.schema import IngestionRequest
14+
from temporalio.client import Client
15+
16+
# Create single ingestion request
17+
request = IngestionRequest(
18+
communityId="my_community",
19+
platformId="my_platform",
20+
text="Document content here...",
21+
metadata={
22+
"title": "Document Title",
23+
"author": "Author Name"
24+
}
25+
)
26+
27+
# Execute workflow
28+
client = await Client.connect("localhost:7233")
29+
await client.execute_workflow(
30+
"VectorIngestionWorkflow",
31+
request,
32+
id="single-ingestion-123",
33+
task_queue="hivemind-etl"
34+
)
35+
```
36+
37+
### 2. BatchVectorIngestionWorkflow
38+
39+
A workflow for processing multiple document ingestion requests in parallel batches for improved efficiency.
40+
41+
**Key Features:**
42+
- **Automatic Chunking**: Large batches are automatically split into smaller parallel chunks
43+
- **Parallel Processing**: Multiple `process_documents_batch` activities run simultaneously
44+
- **Configurable Batch Size**: Control the size of each processing chunk (default: 10 documents)
45+
- **Same Collection**: All documents in a batch request must belong to the same community and collection
46+
- **Error Handling**: Same retry policy as single document workflow but with longer timeout for batch processing
47+
48+
**Usage:**
49+
```python
50+
from hivemind_etl.simple_ingestion.schema import BatchIngestionRequest, BatchDocument
51+
from temporalio.client import Client
52+
53+
# Create batch ingestion request
54+
batch_request = BatchIngestionRequest(
55+
communityId="my_community",
56+
platformId="my_platform",
57+
collectionName="optional_custom_collection", # Optional
58+
document=[
59+
BatchDocument(
60+
docId="doc_1",
61+
text="First document content...",
62+
metadata={"title": "Document 1"},
63+
excludedEmbedMetadataKeys=["some_key"],
64+
excludedLlmMetadataKeys=["other_key"]
65+
),
66+
BatchDocument(
67+
docId="doc_2",
68+
text="Second document content...",
69+
metadata={"title": "Document 2"}
70+
),
71+
# ... more documents
72+
]
73+
)
74+
75+
# Execute batch workflow
76+
client = await Client.connect("localhost:7233")
77+
await client.execute_workflow(
78+
"BatchVectorIngestionWorkflow",
79+
batch_request,
80+
10, # batch_size: optional, default is 10
81+
id="batch-ingestion-123",
82+
task_queue="hivemind-etl"
83+
)
84+
```
85+
86+
## Schema Reference
87+
88+
### IngestionRequest (Single Document)
89+
90+
```python
91+
class IngestionRequest(BaseModel):
92+
communityId: str # Community identifier
93+
platformId: str # Platform identifier
94+
text: str # Document text content
95+
metadata: dict # Document metadata
96+
docId: str = str(uuid4()) # Unique document ID (auto-generated)
97+
excludedEmbedMetadataKeys: list[str] = [] # Keys to exclude from embedding
98+
excludedLlmMetadataKeys: list[str] = [] # Keys to exclude from LLM processing
99+
collectionName: str | None = None # Optional custom collection name
100+
```
101+
102+
### BatchIngestionRequest (Multiple Documents)
103+
104+
```python
105+
class BatchIngestionRequest(BaseModel):
106+
communityId: str # Community identifier
107+
platformId: str # Platform identifier
108+
collectionName: str | None = None # Optional custom collection name
109+
document: list[BatchDocument] # List of documents to process
110+
111+
class BatchDocument(BaseModel):
112+
docId: str # Unique document ID
113+
text: str # Document text content
114+
metadata: dict # Document metadata
115+
excludedEmbedMetadataKeys: list[str] = [] # Keys to exclude from embedding
116+
excludedLlmMetadataKeys: list[str] = [] # Keys to exclude from LLM processing
117+
```
118+
119+
## Collection Naming
120+
121+
- **Default**: `{communityId}_{platformId}`
122+
- **Custom**: `{communityId}_{collectionName}` (when `collectionName` is provided)
123+
124+
The collection name reconstruction is handled automatically by the `CustomIngestionPipeline`.
125+
126+
## Performance Considerations
127+
128+
### When to Use Batch vs Single Workflows
129+
130+
**Use BatchVectorIngestionWorkflow when:**
131+
- Processing multiple documents from the same community/collection
132+
- Bulk importing large datasets
133+
- You have 10+ documents to process together
134+
- You want to maximize throughput with parallel processing
135+
136+
**Use VectorIngestionWorkflow when:**
137+
- Processing single documents in real-time
138+
- Documents arrive individually
139+
- You need immediate processing
140+
- Simple use cases with occasional documents
141+
142+
### Batch Processing Optimizations
143+
144+
The batch workflow automatically optimizes performance by:
145+
146+
1. **Parallel Chunking**: Large batches are split into smaller chunks that process simultaneously
147+
2. **Configurable Batch Size**: Tune chunk size based on your system resources (default: 10)
148+
3. **Pipeline Reuse**: One `CustomIngestionPipeline` instance per chunk
149+
4. **Bulk Operations**: All documents in a chunk are processed together
150+
5. **Concurrent Execution**: Multiple chunks can run in parallel using asyncio.gather()
151+
152+
## Error Handling
153+
154+
Both workflows implement the same retry policy:
155+
- **Initial retry interval**: 1 second
156+
- **Maximum retry interval**: 1 minute
157+
- **Maximum attempts**: 3
158+
- **Timeout**: 5 minutes (single), 10 minutes (batch)
159+
160+
## Testing
161+
162+
Use the provided test script to verify functionality:
163+
164+
```bash
165+
python test_batch_workflow.py
166+
```
167+
168+
The test script demonstrates:
169+
- Batch processing with multiple documents
170+
- Mixed collection handling
171+
- Comparison between single and batch workflows
172+
173+
## Integration
174+
175+
Both workflows are automatically registered in the Temporal worker through `registry.py`. Ensure your worker includes:
176+
177+
```python
178+
from registry import WORKFLOWS, ACTIVITIES
179+
180+
# Worker setup includes both workflows and activities
181+
worker = Worker(
182+
client=client,
183+
task_queue="hivemind-etl",
184+
workflows=WORKFLOWS,
185+
activities=ACTIVITIES
186+
)
187+
```

hivemind_etl/simple_ingestion/pipeline.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1+
import asyncio
12
from datetime import timedelta
23

34
from temporalio import activity, workflow
45
from temporalio.common import RetryPolicy
56
from temporalio.workflow import execute_activity
6-
from .schema import IngestionRequest
7+
from .schema import IngestionRequest, BatchIngestionRequest, BatchDocument
78

89
with workflow.unsafe.imports_passed_through():
910
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
1011
from llama_index.core import Document
1112

1213

14+
class BatchChunk(BatchIngestionRequest):
15+
"""A smaller chunk of a BatchIngestionRequest for parallel processing."""
16+
pass
17+
18+
1319
@workflow.defn
1420
class VectorIngestionWorkflow:
1521
"""A Temporal workflow for processing document ingestion requests.
@@ -50,6 +56,69 @@ async def run(self, ingestion_request: IngestionRequest) -> None:
5056
)
5157

5258

59+
@workflow.defn
60+
class BatchVectorIngestionWorkflow:
61+
"""A Temporal workflow for processing batch document ingestion requests.
62+
63+
This workflow handles the orchestration of batch document processing activities,
64+
including retry logic and timeout configurations for multiple documents.
65+
"""
66+
67+
@workflow.run
68+
async def run(self, ingestion_requests: BatchIngestionRequest) -> None:
69+
"""Execute the batch ingestion workflow.
70+
71+
Parameters
72+
----------
73+
ingestion_requests : BatchIngestionRequest
74+
The batch request containing all necessary information for document processing,
75+
including community ID, platform ID, text content, and metadata for each document.
76+
77+
Notes
78+
-----
79+
The workflow splits documents into smaller batches and processes them in parallel.
80+
Each batch implements a retry policy with the following configuration:
81+
- Initial retry interval: 1 second
82+
- Maximum retry interval: 1 minute
83+
- Maximum retry attempts: 3
84+
- Activity timeout: 10 minutes
85+
"""
86+
batch_size: int = 10
87+
retry_policy = RetryPolicy(
88+
initial_interval=timedelta(seconds=1),
89+
maximum_interval=timedelta(minutes=1),
90+
maximum_attempts=3,
91+
)
92+
93+
# Split documents into smaller batches
94+
document_chunks = []
95+
for i in range(0, len(ingestion_requests.document), batch_size):
96+
chunk_documents = ingestion_requests.document[i:i + batch_size]
97+
98+
# Create a BatchChunk for this subset of documents
99+
batch_chunk = BatchChunk(
100+
communityId=ingestion_requests.communityId,
101+
platformId=ingestion_requests.platformId,
102+
collectionName=ingestion_requests.collectionName,
103+
document=chunk_documents
104+
)
105+
document_chunks.append(batch_chunk)
106+
107+
# Process all chunks in parallel
108+
batch_activities = []
109+
for i, chunk in enumerate(document_chunks):
110+
activity_task = workflow.execute_activity(
111+
process_documents_batch,
112+
chunk,
113+
retry_policy=retry_policy,
114+
start_to_close_timeout=timedelta(minutes=10),
115+
)
116+
batch_activities.append(activity_task)
117+
118+
# Wait for all batches to complete
119+
await asyncio.gather(*batch_activities)
120+
121+
53122
@activity.defn
54123
async def process_document(
55124
ingestion_request: IngestionRequest,
@@ -84,6 +153,54 @@ async def process_document(
84153
doc_id=ingestion_request.docId,
85154
text=ingestion_request.text,
86155
metadata=ingestion_request.metadata,
156+
excluded_embed_metadata_keys=ingestion_request.excludedEmbedMetadataKeys,
157+
excluded_llm_metadata_keys=ingestion_request.excludedLlmMetadataKeys,
87158
)
88159

89160
pipeline.run_pipeline(docs=[document])
161+
162+
163+
@activity.defn
164+
async def process_documents_batch(
165+
batch_chunk: BatchChunk,
166+
) -> None:
167+
"""Process a batch chunk of documents according to the ingestion request specifications.
168+
169+
Parameters
170+
----------
171+
batch_chunk : BatchChunk
172+
A chunk containing a subset of documents from the original batch request,
173+
including community ID, platform ID, text content, and metadata for each document.
174+
175+
Notes
176+
-----
177+
This activity processes a subset of documents from the larger batch,
178+
allowing for parallel processing and better resource management.
179+
"""
180+
if batch_chunk.collectionName is None:
181+
collection_name = batch_chunk.platformId
182+
else:
183+
collection_name = batch_chunk.collectionName
184+
185+
# Initialize the ingestion pipeline
186+
# the collection name will be reconstructed in `CustomIngestionPipeline`
187+
# in the format of `[communityId]_[collection_name]`
188+
pipeline = CustomIngestionPipeline(
189+
community_id=batch_chunk.communityId,
190+
collection_name=collection_name,
191+
)
192+
193+
# Convert all documents in this chunk to Document objects
194+
documents = []
195+
for doc in batch_chunk.document:
196+
document = Document(
197+
doc_id=doc.docId,
198+
text=doc.text,
199+
metadata=doc.metadata,
200+
excluded_embed_metadata_keys=doc.excludedEmbedMetadataKeys,
201+
excluded_llm_metadata_keys=doc.excludedLlmMetadataKeys,
202+
)
203+
documents.append(document)
204+
205+
# Process all documents in this chunk as a batch
206+
pipeline.run_pipeline(docs=documents)

hivemind_etl/simple_ingestion/schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,27 @@ class IngestionRequest(BaseModel):
3737
excludedEmbedMetadataKeys: list[str] = []
3838
excludedLlmMetadataKeys: list[str] = []
3939
collectionName: str | None = None
40+
41+
class BatchDocument(BaseModel):
42+
"""A model representing a document for batch ingestion.
43+
44+
"""
45+
docId: str
46+
text: str
47+
metadata: dict
48+
excludedEmbedMetadataKeys: list[str] = []
49+
excludedLlmMetadataKeys: list[str] = []
50+
51+
52+
class BatchIngestionRequest(BaseModel):
53+
"""A model representing a batch of ingestion requests for document processing.
54+
55+
Parameters
56+
----------
57+
ingestion_requests : list[IngestionRequest]
58+
A list of ingestion requests.
59+
"""
60+
communityId: str
61+
platformId: str
62+
collectionName: str | None = None
63+
document: list[BatchDocument]

0 commit comments

Comments
 (0)