@@ -100,107 +100,116 @@ def get_cohorts():
100100 cohorts : list [Cohort ] = await get_cohorts ()
101101
102102 cohorts_count = 0
103-
104- # Initialize Kafka producer once before the loop
105103 kafka_producer = KafkaProducer ()
106104
107- # Process each cohort
105+ max_retries = 3
106+ retry_delay_seconds = 5
107+
108+ @database_sync_to_async
109+ def build_query (cohort_obj ):
110+ realtime_query = HogQLRealtimeCohortQuery (cohort = cohort_obj , team = cohort_obj .team )
111+ current_members_query = realtime_query .get_query ()
112+ hogql_context = HogQLContext (team_id = cohort_obj .team_id , enable_select_queries = True )
113+ current_members_sql , _ = prepare_and_print_ast (current_members_query , hogql_context , "clickhouse" )
114+ return current_members_sql , hogql_context .values
115+
108116 for idx , cohort in enumerate (cohorts , 1 ):
109- # Update heartbeat progress every 100 cohorts to minimize overhead
110117 if idx % 100 == 0 or idx == len (cohorts ):
111118 heartbeater .details = (f"Processing cohort { idx } /{ len (cohorts )} " ,)
112-
113- # Log progress periodically
114- if idx % 100 == 0 or idx == len (cohorts ):
115119 logger .info (f"Processed { idx } /{ len (cohorts )} cohorts so far" )
116-
117- try :
118- # Build query in sync context (HogQLRealtimeCohortQuery accesses team properties)
119- @database_sync_to_async
120- def build_query (cohort_obj ):
121- realtime_query = HogQLRealtimeCohortQuery (cohort = cohort_obj , team = cohort_obj .team )
122- current_members_query = realtime_query .get_query ()
123- hogql_context = HogQLContext (team_id = cohort_obj .team_id , enable_select_queries = True )
124- current_members_sql , _ = prepare_and_print_ast (current_members_query , hogql_context , "clickhouse" )
125- return current_members_sql , hogql_context .values
126-
127- current_members_sql , query_params = await build_query (cohort )
128-
129- # Add cohort identifiers to query parameters
130- query_params = {** query_params , "team_id" : cohort .team_id , "cohort_id" : cohort .id }
131-
132- # Wrap with comparison to previous membership to detect changes
133- final_query = f"""
134- SELECT
135- %(team_id)s as team_id,
136- %(cohort_id)s as cohort_id,
137- COALESCE(current_matches.id, previous_members.person_id) as person_id,
138- now64() as last_updated,
139- CASE
140- WHEN previous_members.person_id IS NULL THEN 'entered'
141- WHEN current_matches.id IS NULL THEN 'left'
142- ELSE 'unchanged'
143- END as status
144- FROM
145- (
146- { current_members_sql }
147- ) AS current_matches
148- FULL OUTER JOIN
149- (
150- SELECT team_id, person_id, argMax(status, last_updated) as status
151- FROM cohort_membership
152- WHERE
153- team_id = %(team_id)s
154- AND cohort_id = %(cohort_id)s
155- GROUP BY team_id, person_id
156- HAVING status = 'entered'
157- ) previous_members ON current_matches.id = previous_members.person_id
158- WHERE status != 'unchanged'
159- SETTINGS join_use_nulls = 1
160- FORMAT JSONEachRow
161- """
162-
163- with tags_context (
164- team_id = cohort .team_id ,
165- feature = Feature .BEHAVIORAL_COHORTS ,
166- product = Product .MESSAGING ,
167- query_type = "realtime_cohort_calculation" ,
168- ):
169- async with get_client (team_id = cohort .team_id ) as client :
170- async for row in client .stream_query_as_jsonl (final_query , query_parameters = query_params ):
171- status = row ["status" ]
172- payload = {
173- "team_id" : row ["team_id" ],
174- "cohort_id" : row ["cohort_id" ],
175- "person_id" : str (row ["person_id" ]),
176- "last_updated" : str (row ["last_updated" ]),
177- "status" : status ,
178- }
179- await asyncio .to_thread (
180- kafka_producer .produce ,
181- topic = KAFKA_COHORT_MEMBERSHIP_CHANGED ,
182- key = payload ["person_id" ],
183- data = payload ,
184- )
185-
186- # Track membership change (entered/left)
187- get_membership_changed_metric (status ).add (1 )
188-
189- # Record successful cohort calculation
190- get_cohort_calculation_success_metric ().add (1 )
191- cohorts_count += 1
192-
193- except Exception as e :
194- # Record failed cohort calculation
195- get_cohort_calculation_failure_metric ().add (1 )
196-
197- logger .exception (
198- f"Error calculating cohort { cohort .id } : { type (e ).__name__ } : { str (e )} " ,
199- cohort_id = cohort .id ,
200- error_type = type (e ).__name__ ,
201- error_message = str (e ),
202- )
203- continue
120+ for retry_attempt in range (1 , max_retries + 1 ):
121+ try :
122+ cohort_max_execution_time = 60 * retry_attempt
123+ current_members_sql , query_params = await build_query (cohort )
124+ query_params = {
125+ ** query_params ,
126+ "team_id" : cohort .team_id ,
127+ "cohort_id" : cohort .id ,
128+ "max_execution_time" : cohort_max_execution_time ,
129+ }
130+
131+ final_query = f"""
132+ SELECT
133+ %(team_id)s as team_id,
134+ %(cohort_id)s as cohort_id,
135+ COALESCE(current_matches.id, previous_members.person_id) as person_id,
136+ now64() as last_updated,
137+ CASE
138+ WHEN previous_members.person_id IS NULL THEN 'entered'
139+ WHEN current_matches.id IS NULL THEN 'left'
140+ ELSE 'unchanged'
141+ END as status
142+ FROM
143+ (
144+ { current_members_sql }
145+ ) AS current_matches
146+ FULL OUTER JOIN
147+ (
148+ SELECT team_id, person_id, argMax(status, last_updated) as status
149+ FROM cohort_membership
150+ WHERE
151+ team_id = %(team_id)s
152+ AND cohort_id = %(cohort_id)s
153+ GROUP BY team_id, person_id
154+ HAVING status = 'entered'
155+ ) previous_members ON current_matches.id = previous_members.person_id
156+ WHERE status != 'unchanged'
157+ SETTINGS join_use_nulls = 1, max_execution_time = %(max_execution_time)s
158+ FORMAT JSONEachRow
159+ """
160+
161+ with tags_context (
162+ team_id = cohort .team_id ,
163+ feature = Feature .BEHAVIORAL_COHORTS ,
164+ product = Product .MESSAGING ,
165+ query_type = "realtime_cohort_calculation" ,
166+ ):
167+ async with get_client (team_id = cohort .team_id ) as client :
168+ async for row in client .stream_query_as_jsonl (final_query , query_parameters = query_params ):
169+ status = row ["status" ]
170+ payload = {
171+ "team_id" : row ["team_id" ],
172+ "cohort_id" : row ["cohort_id" ],
173+ "person_id" : str (row ["person_id" ]),
174+ "last_updated" : str (row ["last_updated" ]),
175+ "status" : status ,
176+ }
177+ await asyncio .to_thread (
178+ kafka_producer .produce ,
179+ topic = KAFKA_COHORT_MEMBERSHIP_CHANGED ,
180+ key = payload ["person_id" ],
181+ data = payload ,
182+ )
183+
184+ get_membership_changed_metric (status ).add (1 )
185+
186+ get_cohort_calculation_success_metric ().add (1 )
187+ cohorts_count += 1
188+ break
189+
190+ except Exception as e :
191+ is_last_attempt = retry_attempt == max_retries
192+
193+ if is_last_attempt :
194+ get_cohort_calculation_failure_metric ().add (1 )
195+
196+ logger .exception (
197+ f"Error calculating cohort { cohort .id } after { max_retries } attempts: { type (e ).__name__ } : { str (e )} " ,
198+ cohort_id = cohort .id ,
199+ error_type = type (e ).__name__ ,
200+ error_message = str (e ),
201+ attempts = max_retries ,
202+ )
203+ else :
204+ logger .warning (
205+ f"Error calculating cohort { cohort .id } (attempt { retry_attempt } /{ max_retries } ): { type (e ).__name__ } : { str (e )} . Retrying in { retry_delay_seconds } s with { 60 * (retry_attempt + 1 )} s timeout..." ,
206+ cohort_id = cohort .id ,
207+ error_type = type (e ).__name__ ,
208+ error_message = str (e ),
209+ attempt = retry_attempt ,
210+ next_timeout = 60 * (retry_attempt + 1 ),
211+ )
212+ await asyncio .sleep (retry_delay_seconds )
204213
205214 end_time = time .time ()
206215 duration_seconds = end_time - start_time
0 commit comments