|
25 | 25 | QueryRequest, |
26 | 26 | QueryStatus, |
27 | 27 | QueryStatusResponse, |
| 28 | + RefreshType, |
28 | 29 | ) |
29 | 30 |
|
| 31 | +from posthog.hogql import ast |
30 | 32 | from posthog.hogql.constants import LimitContext |
31 | 33 | from posthog.hogql.errors import ExposedHogQLError, ResolutionError |
| 34 | +from posthog.hogql.property import property_to_expr |
32 | 35 |
|
33 | 36 | from posthog.api.documentation import extend_schema |
34 | 37 | from posthog.api.mixins import PydanticModelMixin |
@@ -219,9 +222,15 @@ def create(self, request: Request, *args, **kwargs) -> Response: |
219 | 222 |
|
220 | 223 | return Response(self._serialize_endpoint(endpoint), status=status.HTTP_201_CREATED) |
221 | 224 |
|
222 | | - # We should expose if the query name is duplicate |
223 | 225 | except Exception as e: |
224 | | - capture_exception(e) |
| 226 | + capture_exception( |
| 227 | + e, |
| 228 | + { |
| 229 | + "product": Product.ENDPOINTS, |
| 230 | + "team_id": self.team_id, |
| 231 | + "endpoint_name": data.name, |
| 232 | + }, |
| 233 | + ) |
225 | 234 | raise ValidationError("Failed to create endpoint.") |
226 | 235 |
|
227 | 236 | def validate_update_request( |
@@ -316,7 +325,15 @@ def update(self, request: Request, name: str | None = None, *args, **kwargs) -> |
316 | 325 | return Response(self._serialize_endpoint(endpoint)) |
317 | 326 |
|
318 | 327 | except Exception as e: |
319 | | - capture_exception(e) |
| 328 | + capture_exception( |
| 329 | + e, |
| 330 | + { |
| 331 | + "product": Product.ENDPOINTS, |
| 332 | + "team_id": self.team_id, |
| 333 | + "endpoint_id": endpoint.id, |
| 334 | + "saved_query_id": endpoint.saved_query.id if endpoint.saved_query else None, |
| 335 | + }, |
| 336 | + ) |
320 | 337 | raise ValidationError("Failed to update endpoint.") |
321 | 338 |
|
322 | 339 | def _enable_materialization( |
@@ -451,49 +468,61 @@ def _execute_materialized_endpoint( |
451 | 468 | self, endpoint: Endpoint, data: EndpointRunRequest, request: Request |
452 | 469 | ) -> Response: |
453 | 470 | """Execute against a materialized table in S3.""" |
454 | | - from posthog.schema import RefreshType |
455 | | - |
456 | | - from posthog.hogql import ast |
457 | | - from posthog.hogql.property import property_to_expr |
| 471 | + try: |
| 472 | + saved_query = endpoint.saved_query |
| 473 | + if not saved_query: |
| 474 | + raise ValidationError("No materialized query found for this endpoint") |
458 | 475 |
|
459 | | - saved_query = endpoint.saved_query |
460 | | - if not saved_query: |
461 | | - raise ValidationError("No materialized query found for this endpoint") |
| 476 | + select_query = ast.SelectQuery( |
| 477 | + select=[ast.Field(chain=["*"])], |
| 478 | + select_from=ast.JoinExpr(table=ast.Field(chain=[saved_query.name])), |
| 479 | + ) |
462 | 480 |
|
463 | | - # Build AST for SELECT * FROM table |
464 | | - select_query = ast.SelectQuery( |
465 | | - select=[ast.Field(chain=["*"])], |
466 | | - select_from=ast.JoinExpr(table=ast.Field(chain=[saved_query.name])), |
467 | | - ) |
| 481 | + if data.filters_override and data.filters_override.properties: |
| 482 | + try: |
| 483 | + property_expr = property_to_expr(data.filters_override.properties, self.team) |
| 484 | + select_query.where = property_expr |
| 485 | + except Exception: |
| 486 | + raise ValidationError("Failed to apply property filters.") |
468 | 487 |
|
469 | | - if data.filters_override and data.filters_override.properties: |
470 | | - try: |
471 | | - property_expr = property_to_expr(data.filters_override.properties, self.team) |
472 | | - select_query.where = property_expr |
473 | | - except Exception as e: |
474 | | - capture_exception(e) |
475 | | - raise ValidationError(f"Failed to apply property filters.") |
476 | | - |
477 | | - materialized_hogql_query = HogQLQuery( |
478 | | - query=select_query.to_hogql(), modifiers=HogQLQueryModifiers(useMaterializedViews=True) |
479 | | - ) |
| 488 | + materialized_hogql_query = HogQLQuery( |
| 489 | + query=select_query.to_hogql(), modifiers=HogQLQueryModifiers(useMaterializedViews=True) |
| 490 | + ) |
480 | 491 |
|
481 | | - query_request_data = { |
482 | | - "client_query_id": data.client_query_id, |
483 | | - "name": f"{endpoint.name}_materialized", |
484 | | - "refresh": data.refresh or RefreshType.BLOCKING, |
485 | | - "query": materialized_hogql_query.model_dump(), |
486 | | - } |
| 492 | + query_request_data = { |
| 493 | + "client_query_id": data.client_query_id, |
| 494 | + "name": f"{endpoint.name}_materialized", |
| 495 | + "refresh": data.refresh or RefreshType.BLOCKING, |
| 496 | + "query": materialized_hogql_query.model_dump(), |
| 497 | + } |
487 | 498 |
|
488 | | - extra_fields = { |
489 | | - "endpoint_materialized": True, |
490 | | - "endpoint_materialized_at": saved_query.last_run_at.isoformat() if saved_query.last_run_at else None, |
491 | | - } |
492 | | - tag_queries(workload=Workload.ENDPOINTS, warehouse_query=True) |
| 499 | + extra_fields = { |
| 500 | + "endpoint_materialized": True, |
| 501 | + "endpoint_materialized_at": saved_query.last_run_at.isoformat() if saved_query.last_run_at else None, |
| 502 | + } |
| 503 | + tag_queries(workload=Workload.ENDPOINTS, warehouse_query=True) |
493 | 504 |
|
494 | | - return self._execute_query_and_respond( |
495 | | - query_request_data, data.client_query_id, request, extra_result_fields=extra_fields |
496 | | - ) |
| 505 | + return self._execute_query_and_respond( |
| 506 | + query_request_data, data.client_query_id, request, extra_result_fields=extra_fields |
| 507 | + ) |
| 508 | + except (ExposedHogQLError, ExposedCHQueryError, HogVMException) as e: |
| 509 | + raise ValidationError(str(e), getattr(e, "code_name", None)) |
| 510 | + except ResolutionError as e: |
| 511 | + raise ValidationError(str(e)) |
| 512 | + except ConcurrencyLimitExceeded as c: |
| 513 | + raise Throttled(detail=str(c)) |
| 514 | + except Exception as e: |
| 515 | + capture_exception( |
| 516 | + e, |
| 517 | + { |
| 518 | + "product": Product.ENDPOINTS, |
| 519 | + "team_id": self.team_id, |
| 520 | + "endpoint_name": endpoint.name, |
| 521 | + "materialized": True, |
| 522 | + "saved_query_id": saved_query.id if saved_query else None, |
| 523 | + }, |
| 524 | + ) |
| 525 | + raise |
497 | 526 |
|
498 | 527 | def _parse_variables(self, query: dict[str, dict], variables: dict[str, str]) -> dict[str, dict] | None: |
499 | 528 | query_variables = query.get("variables", None) |
@@ -550,7 +579,14 @@ def _execute_inline_endpoint( |
550 | 579 | raise Throttled(detail=str(c)) |
551 | 580 | except Exception as e: |
552 | 581 | self.handle_column_ch_error(e) |
553 | | - capture_exception(e) |
| 582 | + capture_exception( |
| 583 | + e, |
| 584 | + { |
| 585 | + "product": Product.ENDPOINTS, |
| 586 | + "team_id": self.team_id, |
| 587 | + "endpoint_name": endpoint.name, |
| 588 | + }, |
| 589 | + ) |
554 | 590 | raise |
555 | 591 |
|
556 | 592 | @extend_schema( |
@@ -655,7 +691,7 @@ def get_endpoints_last_execution_times(self, request: Request, *args, **kwargs) |
655 | 691 | except ConcurrencyLimitExceeded as c: |
656 | 692 | raise Throttled(detail=str(c)) |
657 | 693 | except Exception as e: |
658 | | - capture_exception(e) |
| 694 | + capture_exception(e, {"product": Product.ENDPOINTS, "team_id": self.team_id}) |
659 | 695 | raise |
660 | 696 |
|
661 | 697 | def handle_column_ch_error(self, error): |
|
0 commit comments