66from bson import ObjectId
77from tc_hivemind_backend .db .qdrant import QdrantSingleton
88from tc_hivemind_backend .db .mongo import MongoSingleton
9+ from tc_hivemind_backend .ingest_qdrant import CustomIngestionPipeline
910
1011from temporalio import activity , workflow
1112from qdrant_client .models import Filter , FieldCondition , MatchValue
13+ from qdrant_client .http import models
1214
1315with workflow .unsafe .imports_passed_through ():
1416 from hivemind_summarizer .schema import (
@@ -40,7 +42,7 @@ def extract_summary_text(node_content: dict[str, Any]) -> str:
4042
4143
4244@activity .defn
43- async def get_collection_name (input : TelegramGetCollectionNameInput ) -> str :
45+ async def get_platform_name (input : TelegramGetCollectionNameInput ) -> str :
4446 """
4547 Activity that extracts collection name from MongoDB based on platform_id and community_id.
4648
@@ -52,7 +54,7 @@ async def get_collection_name(input: TelegramGetCollectionNameInput) -> str:
5254 Returns
5355 -------
5456 str
55- The collection name in format [communityId]_[platformName]_summary
57+ The platform name
5658
5759 Raises
5860 ------
@@ -83,11 +85,7 @@ async def get_collection_name(input: TelegramGetCollectionNameInput) -> str:
8385 if not platform_name :
8486 raise Exception (f"Platform name not found for platform_id { platform_id } " )
8587
86- # Construct collection name
87- collection_name = f"{ community_id } _{ platform_name } _summary"
88-
89- logging .info (f"Generated collection name: { collection_name } " )
90- return collection_name
88+ return platform_name
9189
9290 except Exception as e :
9391 logging .error (f"Error getting collection name: { str (e )} " )
@@ -113,11 +111,13 @@ async def fetch_telegram_summaries_by_date(
113111 """
114112 date = input .date
115113 extract_text_only = input .extract_text_only
116- collection_name = input .collection_name
114+ collection_name = f"{ input .community_id } _{ input .platform_name } _summary"
115+ community_id = input .community_id
117116
118117 logging .info ("Started fetch_telegram_summaries_by_date!" )
119- if not collection_name :
120- raise ValueError ("Collection name is required but was not provided" )
118+
119+ if not input .platform_name :
120+ raise ValueError ("Platform name is required but was not provided" )
121121
122122 logging .info (
123123 f"Fetching summaries for date: { date } from collection: { collection_name } "
@@ -128,19 +128,46 @@ async def fetch_telegram_summaries_by_date(
128128 qdrant_client = QdrantSingleton .get_instance ().get_client ()
129129
130130 # Create filter for the specified date
131- filter_conditions = [FieldCondition (key = "date" , match = MatchValue (value = date ))]
132-
133- date_filter = Filter (must = filter_conditions )
134-
135- # Query Qdrant for all summaries matching the date using the provided collection name
136- search_results = qdrant_client .search (
137- collection_name = collection_name ,
138- query_vector = [0 ] * 1024 ,
139- query_filter = date_filter ,
140- limit = 100 ,
141- with_payload = True ,
142- with_vectors = False ,
143- )
131+ if date is not None :
132+ filter_conditions = [
133+ FieldCondition (key = "date" , match = MatchValue (value = date ))
134+ ]
135+ date_filter = Filter (must = filter_conditions )
136+
137+ # Query Qdrant for all summaries matching the date using the provided collection name
138+ search_results = qdrant_client .search (
139+ collection_name = collection_name ,
140+ query_vector = [0 ] * 1024 ,
141+ query_filter = date_filter ,
142+ limit = 100 ,
143+ with_payload = True ,
144+ with_vectors = False ,
145+ )
146+ else :
147+ # pipeline requires a different format for the collection name
148+ pipeline = CustomIngestionPipeline (
149+ community_id = community_id ,
150+ collection_name = f"{ input .platform_name } _summary" ,
151+ )
152+ # get the latest date from the collection
153+ latest_date = pipeline .get_latest_document_date (
154+ field_name = "date" , field_schema = models .PayloadSchemaType .DATETIME
155+ )
156+
157+ filter_conditions = [
158+ FieldCondition (
159+ key = "date" , match = MatchValue (value = latest_date .strftime ("%Y-%m-%d" ))
160+ )
161+ ]
162+ date_filter = Filter (must = filter_conditions )
163+ search_results = qdrant_client .search (
164+ collection_name = collection_name ,
165+ query_vector = [0 ] * 1024 ,
166+ query_filter = date_filter ,
167+ limit = 100 ,
168+ with_payload = True ,
169+ with_vectors = False ,
170+ )
144171
145172 summaries = []
146173 for point in search_results :
@@ -189,7 +216,7 @@ async def fetch_telegram_summaries_by_date_range(
189216 Parameters
190217 ----------
191218 input : TelegramSummariesRangeActivityInput
192- Input object containing start_date, end_date, collection_name and extract_text_only
219+ Input object containing start_date, end_date, platform_name and community_id
193220
194221 Returns
195222 -------
@@ -199,15 +226,15 @@ async def fetch_telegram_summaries_by_date_range(
199226 Raises
200227 ------
201228 ValueError
202- If end_date is before start_date or collection_name is not provided
229+ If end_date is before start_date or platform_name is not provided
203230 """
204231 start_date = input .start_date
205232 end_date = input .end_date
206233 extract_text_only = input .extract_text_only
207- collection_name = input .collection_name
208-
209- if not collection_name :
210- raise ValueError ("Collection name is required but was not provided" )
234+ platform_name = input .platform_name
235+ community_id = input . community_id
236+ if not platform_name :
237+ raise ValueError ("Platform name is required but was not provided" )
211238
212239 logging .info (
213240 f"Fetching summaries for date range: { start_date } to { end_date } from collection: { collection_name } "
@@ -235,7 +262,8 @@ async def fetch_telegram_summaries_by_date_range(
235262 date_input = TelegramSummariesActivityInput (
236263 date = date ,
237264 extract_text_only = extract_text_only ,
238- collection_name = collection_name ,
265+ platform_name = input .platform_name ,
266+ community_id = community_id ,
239267 )
240268 summaries = await fetch_telegram_summaries_by_date (date_input )
241269 result [date ] = summaries
0 commit comments