Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions frontend/src/products.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ export const productScenes: Record<string, () => Promise<any>> = {
EndpointsScene: () => import('../../products/endpoints/frontend/EndpointsScene'),
EndpointsUsage: () => import('../../products/endpoints/frontend/EndpointsUsage'),
EndpointScene: () => import('../../products/endpoints/frontend/EndpointScene'),
EndpointNew: () => import('../../products/endpoints/frontend/EndpointScene'),
ErrorTracking: () => import('../../products/error_tracking/frontend/scenes/ErrorTrackingScene/ErrorTrackingScene'),
ErrorTrackingIssue: () =>
import('../../products/error_tracking/frontend/scenes/ErrorTrackingIssueScene/ErrorTrackingIssueScene'),
Expand Down Expand Up @@ -108,7 +107,6 @@ export const productRoutes: Record<string, [string, string]> = {
'/endpoints': ['EndpointsScene', 'endpoints'],
'/endpoints/usage': ['EndpointsScene', 'endpointsUsage'],
'/endpoints/:name': ['EndpointScene', 'endpoint'],
'/endpoints/new': ['EndpointNew', 'endpointNew'],
'/error_tracking': ['ErrorTracking', 'errorTracking'],
'/error_tracking/configuration': ['ErrorTrackingConfiguration', 'errorTrackingConfiguration'],
'/error_tracking/:id': ['ErrorTrackingIssue', 'errorTrackingIssue'],
Expand Down Expand Up @@ -255,7 +253,6 @@ export const productConfiguration: Record<string, any> = {
iconType: 'endpoints',
},
EndpointScene: { projectBased: true, name: 'Endpoint', activityScope: 'Endpoint' },
EndpointNew: { projectBased: true, name: 'EndpointNew', activityScope: 'Endpoint' },
ErrorTracking: {
projectBased: true,
name: 'Error tracking',
Expand Down Expand Up @@ -976,7 +973,7 @@ export const getTreeItemsProducts = (): FileSystemImport[] => [
iconType: 'endpoints',
iconColor: ['var(--color-product-endpoints-light)'] as FileSystemIconColor,
sceneKey: 'EndpointsScene',
sceneKeys: ['EndpointsScene', 'EndpointsUsage', 'EndpointScene', 'EndpointNew'],
sceneKeys: ['EndpointsScene', 'EndpointsUsage', 'EndpointScene'],
},
{
path: 'Error tracking',
Expand Down Expand Up @@ -1257,7 +1254,7 @@ export const getTreeItemsMetadata = (): FileSystemImport[] => [
sceneKey: 'EndpointsScene',
flag: FEATURE_FLAGS.ENDPOINTS,
tags: ['alpha'],
sceneKeys: ['EndpointsScene', 'EndpointsUsage', 'EndpointScene', 'EndpointNew'],
sceneKeys: ['EndpointsScene', 'EndpointsUsage', 'EndpointScene'],
},
{
path: 'Event definitions',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import {
QueryBasedInsightModel,
} from '~/types'

import { endpointLogic } from 'products/endpoints/frontend/endpointLogic'

import { dataWarehouseViewsLogic } from '../saved_queries/dataWarehouseViewsLogic'
import { ViewEmptyState } from './ViewLoadingState'
import { draftsLogic } from './draftsLogic'
Expand Down Expand Up @@ -157,6 +159,8 @@ export const multitabEditorLogic = kea<multitabEditorLogicType>([
['fixErrors', 'fixErrorsSuccess', 'fixErrorsFailure'],
draftsLogic,
['saveAsDraft', 'deleteDraft', 'saveAsDraftSuccess', 'deleteDraftSuccess'],
endpointLogic,
['setIsUpdateMode', 'setSelectedEndpointName'],
],
})),
actions(() => ({
Expand Down Expand Up @@ -1113,6 +1117,10 @@ export const multitabEditorLogic = kea<multitabEditorLogicType>([
if (searchParams.output_tab) {
actions.setActiveTab(searchParams.output_tab as OutputTab)
}
if (searchParams.endpoint_name) {
actions.setIsUpdateMode(true)
actions.setSelectedEndpointName(searchParams.endpoint_name)
}
if (searchParams.open_draft || (hashParams.draft && values.queryInput === null)) {
const draftId = searchParams.open_draft || hashParams.draft
const draft = values.drafts.find((draft) => {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/scenes/insights/InsightPageHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: In
/>
) : null}

{featureFlags[FEATURE_FLAGS.ENDPOINTS] ? (
{hasDashboardItemId && featureFlags[FEATURE_FLAGS.ENDPOINTS] ? (
<ButtonPrimitive onClick={() => setEndpointModalOpen(true)} menuItem>
<IconCode2 />
Create endpoint
Expand Down
1 change: 0 additions & 1 deletion frontend/src/scenes/sceneTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ export enum Scene {
WebAnalyticsWebVitals = 'WebAnalyticsWebVitals',
Endpoints = 'Endpoints',
Endpoint = 'Endpoint',
EndpointNew = 'EndpointNew',
Workflow = 'Workflow',
Workflows = 'Workflows',
Wizard = 'Wizard',
Expand Down
7 changes: 6 additions & 1 deletion frontend/src/scenes/urls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ export const urls = {
view_id?: string,
insightShortId?: string,
draftId?: string,
outputTab?: OutputTab
outputTab?: OutputTab,
endpointName?: string
): string => {
const params = new URLSearchParams()

Expand All @@ -73,6 +74,10 @@ export const urls = {
params.set('output_tab', outputTab)
}

if (endpointName) {
params.set('endpoint_name', endpointName)
}

const queryString = params.toString()
return `/sql${queryString ? `?${queryString}` : ''}`
},
Expand Down
154 changes: 96 additions & 58 deletions products/endpoints/backend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from django.core.cache import cache
from django.shortcuts import get_object_or_404
from django.utils import timezone

from django_filters.rest_framework import DjangoFilterBackend
from loginas.utils import is_impersonated_session
Expand All @@ -24,10 +25,13 @@
QueryRequest,
QueryStatus,
QueryStatusResponse,
RefreshType,
)

from posthog.hogql import ast
from posthog.hogql.constants import LimitContext
from posthog.hogql.errors import ExposedHogQLError, ResolutionError
from posthog.hogql.property import property_to_expr

from posthog.api.documentation import extend_schema
from posthog.api.mixins import PydanticModelMixin
Expand Down Expand Up @@ -218,9 +222,15 @@

return Response(self._serialize_endpoint(endpoint), status=status.HTTP_201_CREATED)

# We should expose if the query name is duplicate
except Exception as e:
capture_exception(e)
capture_exception(
e,
{
"product": Product.ENDPOINTS,
"team_id": self.team_id,
"endpoint_name": data.name,
},
)
raise ValidationError("Failed to create endpoint.")

def validate_update_request(
Expand Down Expand Up @@ -315,7 +325,15 @@
return Response(self._serialize_endpoint(endpoint))

except Exception as e:
capture_exception(e)
capture_exception(
e,
{
"product": Product.ENDPOINTS,
"team_id": self.team_id,
"endpoint_id": endpoint.id,
"saved_query_id": endpoint.saved_query.id if endpoint.saved_query else None,
},
)
raise ValidationError("Failed to update endpoint.")

def _enable_materialization(
Expand Down Expand Up @@ -372,6 +390,7 @@
Returns False if:
- Not materialized
- Materialization incomplete/failed
- Materialized data is stale (older than sync frequency)
- User overrides present (variables, filters, query)
- Force refresh requested
"""
Expand All @@ -385,6 +404,12 @@
if not saved_query.table:
return False

# Check if materialized data is stale
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@orian this 🎀 is for you

if saved_query.last_run_at and saved_query.sync_frequency_interval:
next_refresh_due = saved_query.last_run_at + saved_query.sync_frequency_interval
if timezone.now() >= next_refresh_due:
return False

if data.variables:
return False

Expand Down Expand Up @@ -443,49 +468,55 @@
self, endpoint: Endpoint, data: EndpointRunRequest, request: Request
) -> Response:
"""Execute against a materialized table in S3."""
from posthog.schema import RefreshType

from posthog.hogql import ast
from posthog.hogql.property import property_to_expr
try:
saved_query = endpoint.saved_query
if not saved_query:
raise ValidationError("No materialized query found for this endpoint")

saved_query = endpoint.saved_query
if not saved_query:
raise ValidationError("No materialized query found for this endpoint")
select_query = ast.SelectQuery(
select=[ast.Field(chain=["*"])],
select_from=ast.JoinExpr(table=ast.Field(chain=[saved_query.name])),
)

# Build AST for SELECT * FROM table
select_query = ast.SelectQuery(
select=[ast.Field(chain=["*"])],
select_from=ast.JoinExpr(table=ast.Field(chain=[saved_query.name])),
)
if data.filters_override and data.filters_override.properties:
try:
property_expr = property_to_expr(data.filters_override.properties, self.team)
select_query.where = property_expr
except Exception:
raise ValidationError("Failed to apply property filters.")

if data.filters_override and data.filters_override.properties:
try:
property_expr = property_to_expr(data.filters_override.properties, self.team)
select_query.where = property_expr
except Exception as e:
capture_exception(e)
raise ValidationError(f"Failed to apply property filters.")

materialized_hogql_query = HogQLQuery(
query=select_query.to_hogql(), modifiers=HogQLQueryModifiers(useMaterializedViews=True)
)
materialized_hogql_query = HogQLQuery(
query=select_query.to_hogql(), modifiers=HogQLQueryModifiers(useMaterializedViews=True)
)

query_request_data = {
"client_query_id": data.client_query_id,
"name": f"{endpoint.name}_materialized",
"refresh": data.refresh or RefreshType.BLOCKING,
"query": materialized_hogql_query.model_dump(),
}
query_request_data = {
"client_query_id": data.client_query_id,
"name": f"{endpoint.name}_materialized",
"refresh": data.refresh or RefreshType.BLOCKING,
"query": materialized_hogql_query.model_dump(),
}

extra_fields = {
"_materialized": True,
"_materialized_at": saved_query.last_run_at.isoformat() if saved_query.last_run_at else None,
}
tag_queries(workload=Workload.ENDPOINTS, warehouse_query=True)
extra_fields = {
"endpoint_materialized": True,
"endpoint_materialized_at": saved_query.last_run_at.isoformat() if saved_query.last_run_at else None,
}
tag_queries(workload=Workload.ENDPOINTS, warehouse_query=True)

return self._execute_query_and_respond(
query_request_data, data.client_query_id, request, extra_result_fields=extra_fields
)
return self._execute_query_and_respond(
query_request_data, data.client_query_id, request, extra_result_fields=extra_fields
)
except Exception as e:
capture_exception(
e,
{
"product": Product.ENDPOINTS,
"team_id": self.team_id,
"endpoint_name": endpoint.name,
"materialized": True,
"saved_query_id": saved_query.id if saved_query else None,
},
)
raise

def _parse_variables(self, query: dict[str, dict], variables: dict[str, str]) -> dict[str, dict] | None:
query_variables = query.get("variables", None)
Expand All @@ -507,7 +538,6 @@
variableId=variable_id,
code_name=variable_code_name,
value=variable_value,
# TODO: this needs more attention!
isNull=True if variable_value is None else None,
).model_dump()
return variables_override
Expand Down Expand Up @@ -535,15 +565,17 @@
query_request_data, data.client_query_id, request, cache_age_seconds=endpoint.cache_age_seconds
)

except (ExposedHogQLError, ExposedCHQueryError, HogVMException) as e:
raise ValidationError(str(e), getattr(e, "code_name", None))
except ResolutionError as e:
raise ValidationError(str(e))
except ConcurrencyLimitExceeded as c:
raise Throttled(detail=str(c))
except Exception as e:
self.handle_column_ch_error(e)
capture_exception(e)
capture_exception(
e,
{
"product": Product.ENDPOINTS,
"team_id": self.team_id,
"materialized": False,
"endpoint_name": endpoint.name,
},
)
raise

@extend_schema(
Expand Down Expand Up @@ -588,16 +620,22 @@
# Only the latest version is materialized
use_materialized = version_number is None and self._should_use_materialized_table(endpoint, data)

if use_materialized:
result = self._execute_materialized_endpoint(endpoint, data, request)
else:
# Use version's query if available, otherwise use endpoint.query
query_to_use = version_obj.query if version_obj else endpoint.query.copy()
result = self._execute_inline_endpoint(endpoint, data, request, query_to_use)

try:
if use_materialized:
result = self._execute_materialized_endpoint(endpoint, data, request)
else:
# Use version's query if available, otherwise use endpoint.query
query_to_use = version_obj.query if version_obj else endpoint.query.copy()
result = self._execute_inline_endpoint(endpoint, data, request, query_to_use)
except (ExposedHogQLError, ExposedCHQueryError, HogVMException) as e:
raise ValidationError(str(e), getattr(e, "code_name", None))
except ResolutionError as e:
raise ValidationError(str(e))
except ConcurrencyLimitExceeded:
raise Throttled(detail="Too many concurrent requests. Please try again later.")
if version_obj and isinstance(result.data, dict):
result.data["_version"] = version_obj.version
result.data["_version_created_at"] = version_obj.created_at.isoformat()
result.data["endpoint_version"] = version_obj.version
result.data["endpoint_version_created_at"] = version_obj.created_at.isoformat()

return result

Expand Down Expand Up @@ -648,7 +686,7 @@
except ConcurrencyLimitExceeded as c:
raise Throttled(detail=str(c))
except Exception as e:
capture_exception(e)
capture_exception(e, {"product": Product.ENDPOINTS, "team_id": self.team_id})
raise

def handle_column_ch_error(self, error):
Expand Down
Loading
Loading