44
55from django .core .cache import cache
66from django .shortcuts import get_object_or_404
7+ from django .utils import timezone
78
89from django_filters .rest_framework import DjangoFilterBackend
910from loginas .utils import is_impersonated_session
2425 QueryRequest ,
2526 QueryStatus ,
2627 QueryStatusResponse ,
28+ RefreshType ,
2729)
2830
31+ from posthog .hogql import ast
2932from posthog .hogql .constants import LimitContext
3033from posthog .hogql .errors import ExposedHogQLError , ResolutionError
34+ from posthog .hogql .property import property_to_expr
3135
3236from posthog .api .documentation import extend_schema
3337from posthog .api .mixins import PydanticModelMixin
@@ -218,9 +222,15 @@ def create(self, request: Request, *args, **kwargs) -> Response:
218222
219223 return Response (self ._serialize_endpoint (endpoint ), status = status .HTTP_201_CREATED )
220224
221- # We should expose if the query name is duplicate
222225 except Exception as e :
223- capture_exception (e )
226+ capture_exception (
227+ e ,
228+ {
229+ "product" : Product .ENDPOINTS ,
230+ "team_id" : self .team_id ,
231+ "endpoint_name" : data .name ,
232+ },
233+ )
224234 raise ValidationError ("Failed to create endpoint." )
225235
226236 def validate_update_request (
@@ -315,7 +325,15 @@ def update(self, request: Request, name: str | None = None, *args, **kwargs) ->
315325 return Response (self ._serialize_endpoint (endpoint ))
316326
317327 except Exception as e :
318- capture_exception (e )
328+ capture_exception (
329+ e ,
330+ {
331+ "product" : Product .ENDPOINTS ,
332+ "team_id" : self .team_id ,
333+ "endpoint_id" : endpoint .id ,
334+ "saved_query_id" : endpoint .saved_query .id if endpoint .saved_query else None ,
335+ },
336+ )
319337 raise ValidationError ("Failed to update endpoint." )
320338
321339 def _enable_materialization (
@@ -372,6 +390,7 @@ def _should_use_materialized_table(self, endpoint: Endpoint, data: EndpointRunRe
372390 Returns False if:
373391 - Not materialized
374392 - Materialization incomplete/failed
393+ - Materialized data is stale (older than sync frequency)
375394 - User overrides present (variables, filters, query)
376395 - Force refresh requested
377396 """
@@ -385,6 +404,12 @@ def _should_use_materialized_table(self, endpoint: Endpoint, data: EndpointRunRe
385404 if not saved_query .table :
386405 return False
387406
407+ # Check if materialized data is stale
408+ if saved_query .last_run_at and saved_query .sync_frequency_interval :
409+ next_refresh_due = saved_query .last_run_at + saved_query .sync_frequency_interval
410+ if timezone .now () >= next_refresh_due :
411+ return False
412+
388413 if data .variables :
389414 return False
390415
@@ -443,49 +468,55 @@ def _execute_materialized_endpoint(
443468 self , endpoint : Endpoint , data : EndpointRunRequest , request : Request
444469 ) -> Response :
445470 """Execute against a materialized table in S3."""
446- from posthog . schema import RefreshType
447-
448- from posthog . hogql import ast
449- from posthog . hogql . property import property_to_expr
471+ try :
472+ saved_query = endpoint . saved_query
473+ if not saved_query :
474+ raise ValidationError ( "No materialized query found for this endpoint" )
450475
451- saved_query = endpoint .saved_query
452- if not saved_query :
453- raise ValidationError ("No materialized query found for this endpoint" )
476+ select_query = ast .SelectQuery (
477+ select = [ast .Field (chain = ["*" ])],
478+ select_from = ast .JoinExpr (table = ast .Field (chain = [saved_query .name ])),
479+ )
454480
455- # Build AST for SELECT * FROM table
456- select_query = ast .SelectQuery (
457- select = [ast .Field (chain = ["*" ])],
458- select_from = ast .JoinExpr (table = ast .Field (chain = [saved_query .name ])),
459- )
481+ if data .filters_override and data .filters_override .properties :
482+ try :
483+ property_expr = property_to_expr (data .filters_override .properties , self .team )
484+ select_query .where = property_expr
485+ except Exception :
486+ raise ValidationError ("Failed to apply property filters." )
460487
461- if data .filters_override and data .filters_override .properties :
462- try :
463- property_expr = property_to_expr (data .filters_override .properties , self .team )
464- select_query .where = property_expr
465- except Exception as e :
466- capture_exception (e )
467- raise ValidationError (f"Failed to apply property filters." )
468-
469- materialized_hogql_query = HogQLQuery (
470- query = select_query .to_hogql (), modifiers = HogQLQueryModifiers (useMaterializedViews = True )
471- )
488+ materialized_hogql_query = HogQLQuery (
489+ query = select_query .to_hogql (), modifiers = HogQLQueryModifiers (useMaterializedViews = True )
490+ )
472491
473- query_request_data = {
474- "client_query_id" : data .client_query_id ,
475- "name" : f"{ endpoint .name } _materialized" ,
476- "refresh" : data .refresh or RefreshType .BLOCKING ,
477- "query" : materialized_hogql_query .model_dump (),
478- }
492+ query_request_data = {
493+ "client_query_id" : data .client_query_id ,
494+ "name" : f"{ endpoint .name } _materialized" ,
495+ "refresh" : data .refresh or RefreshType .BLOCKING ,
496+ "query" : materialized_hogql_query .model_dump (),
497+ }
479498
480- extra_fields = {
481- "_materialized " : True ,
482- "_materialized_at " : saved_query .last_run_at .isoformat () if saved_query .last_run_at else None ,
483- }
484- tag_queries (workload = Workload .ENDPOINTS , warehouse_query = True )
499+ extra_fields = {
500+ "endpoint_materialized " : True ,
501+ "endpoint_materialized_at " : saved_query .last_run_at .isoformat () if saved_query .last_run_at else None ,
502+ }
503+ tag_queries (workload = Workload .ENDPOINTS , warehouse_query = True )
485504
486- return self ._execute_query_and_respond (
487- query_request_data , data .client_query_id , request , extra_result_fields = extra_fields
488- )
505+ return self ._execute_query_and_respond (
506+ query_request_data , data .client_query_id , request , extra_result_fields = extra_fields
507+ )
508+ except Exception as e :
509+ capture_exception (
510+ e ,
511+ {
512+ "product" : Product .ENDPOINTS ,
513+ "team_id" : self .team_id ,
514+ "endpoint_name" : endpoint .name ,
515+ "materialized" : True ,
516+ "saved_query_id" : saved_query .id if saved_query else None ,
517+ },
518+ )
519+ raise
489520
490521 def _parse_variables (self , query : dict [str , dict ], variables : dict [str , str ]) -> dict [str , dict ] | None :
491522 query_variables = query .get ("variables" , None )
@@ -507,7 +538,6 @@ def _parse_variables(self, query: dict[str, dict], variables: dict[str, str]) ->
507538 variableId = variable_id ,
508539 code_name = variable_code_name ,
509540 value = variable_value ,
510- # TODO: this needs more attention!
511541 isNull = True if variable_value is None else None ,
512542 ).model_dump ()
513543 return variables_override
@@ -535,15 +565,17 @@ def _execute_inline_endpoint(
535565 query_request_data , data .client_query_id , request , cache_age_seconds = endpoint .cache_age_seconds
536566 )
537567
538- except (ExposedHogQLError , ExposedCHQueryError , HogVMException ) as e :
539- raise ValidationError (str (e ), getattr (e , "code_name" , None ))
540- except ResolutionError as e :
541- raise ValidationError (str (e ))
542- except ConcurrencyLimitExceeded as c :
543- raise Throttled (detail = str (c ))
544568 except Exception as e :
545569 self .handle_column_ch_error (e )
546- capture_exception (e )
570+ capture_exception (
571+ e ,
572+ {
573+ "product" : Product .ENDPOINTS ,
574+ "team_id" : self .team_id ,
575+ "materialized" : False ,
576+ "endpoint_name" : endpoint .name ,
577+ },
578+ )
547579 raise
548580
549581 @extend_schema (
@@ -588,16 +620,22 @@ def run(self, request: Request, name=None, *args, **kwargs) -> Response:
588620 # Only the latest version is materialized
589621 use_materialized = version_number is None and self ._should_use_materialized_table (endpoint , data )
590622
591- if use_materialized :
592- result = self ._execute_materialized_endpoint (endpoint , data , request )
593- else :
594- # Use version's query if available, otherwise use endpoint.query
595- query_to_use = version_obj .query if version_obj else endpoint .query .copy ()
596- result = self ._execute_inline_endpoint (endpoint , data , request , query_to_use )
597-
623+ try :
624+ if use_materialized :
625+ result = self ._execute_materialized_endpoint (endpoint , data , request )
626+ else :
627+ # Use version's query if available, otherwise use endpoint.query
628+ query_to_use = version_obj .query if version_obj else endpoint .query .copy ()
629+ result = self ._execute_inline_endpoint (endpoint , data , request , query_to_use )
630+ except (ExposedHogQLError , ExposedCHQueryError , HogVMException ) as e :
631+ raise ValidationError ("An internal error occurred." , getattr (e , "code_name" , None ))
632+ except ResolutionError :
633+ raise ValidationError ("An internal error occurred while resolving the query." )
634+ except ConcurrencyLimitExceeded :
635+ raise Throttled (detail = "Too many concurrent requests. Please try again later." )
598636 if version_obj and isinstance (result .data , dict ):
599- result .data ["_version " ] = version_obj .version
600- result .data ["_version_created_at " ] = version_obj .created_at .isoformat ()
637+ result .data ["endpoint_version " ] = version_obj .version
638+ result .data ["endpoint_version_created_at " ] = version_obj .created_at .isoformat ()
601639
602640 return result
603641
@@ -648,7 +686,7 @@ def get_endpoints_last_execution_times(self, request: Request, *args, **kwargs)
648686 except ConcurrencyLimitExceeded as c :
649687 raise Throttled (detail = str (c ))
650688 except Exception as e :
651- capture_exception (e )
689+ capture_exception (e , { "product" : Product . ENDPOINTS , "team_id" : self . team_id } )
652690 raise
653691
654692 def handle_column_ch_error (self , error ):
0 commit comments