Skip to content

Commit 0330c37

Browse files
authored
feat(cross-item-queres): add cross item support in timeseries endpoint (#7400)
1 parent e52cdab commit 0330c37

File tree

6 files changed

+266
-80
lines changed

6 files changed

+266
-80
lines changed

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ markers =
77
clickhouse_db: Use clickhouse
88
redis_db: Use redis
99
ci_only: Only run in CI
10+
eap: Use clickhouse with EAP migrations only
1011

1112
[flake8]
1213
# tests/state/test_state.py:19:36: E712 comparison to True should be 'if cond is True:' or 'if cond:'

snuba/subscriptions/data.py

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,12 @@ class _SubscriptionData(ABC, Generic[TRequest]):
121121

122122
def validate(self) -> None:
123123
if self.time_window_sec < 60:
124-
raise InvalidSubscriptionError(
125-
"Time window must be greater than or equal to 1 minute"
126-
)
124+
raise InvalidSubscriptionError("Time window must be greater than or equal to 1 minute")
127125
elif self.time_window_sec > 60 * 60 * 24:
128-
raise InvalidSubscriptionError(
129-
"Time window must be less than or equal to 24 hours"
130-
)
126+
raise InvalidSubscriptionError("Time window must be less than or equal to 24 hours")
131127

132128
if self.resolution_sec < 60:
133-
raise InvalidSubscriptionError(
134-
"Resolution must be greater than or equal to 1 minute"
135-
)
129+
raise InvalidSubscriptionError("Resolution must be greater than or equal to 1 minute")
136130

137131
@abstractmethod
138132
def build_request(
@@ -238,9 +232,7 @@ def build_request(
238232
request_class.ParseFromString(base64.b64decode(self.time_series_request))
239233

240234
start_time_proto = Timestamp()
241-
start_time_proto.FromDatetime(
242-
timestamp - timedelta(seconds=self.time_window_sec)
243-
)
235+
start_time_proto.FromDatetime(timestamp - timedelta(seconds=self.time_window_sec))
244236
end_time_proto = Timestamp()
245237
end_time_proto.FromDatetime(timestamp)
246238
request_class.meta.start_timestamp.CopyFrom(start_time_proto)
@@ -261,28 +253,24 @@ def run_query(
261253
concurrent_queries_gauge: Optional[Gauge] = None,
262254
) -> QueryResult:
263255
response = EndpointTimeSeries().execute(request)
264-
if not response.result_timeseries:
256+
if not response.result_timeseries or not any(
257+
dp.data_present for dp in response.result_timeseries[0].data_points
258+
):
265259
result: Result = {
266260
"meta": [],
267261
"data": [{request.expressions[0].label: None}],
268262
"trace_output": "",
269263
}
270-
return QueryResult(
271-
result=result, extra={"stats": {}, "sql": "", "experiments": {}}
272-
)
264+
return QueryResult(result=result, extra={"stats": {}, "sql": "", "experiments": {}})
273265

274266
timeseries = response.result_timeseries[0]
275267
data = [{timeseries.label: timeseries.data_points[0].data}]
276268

277269
result = {"meta": [], "data": data, "trace_output": ""}
278-
return QueryResult(
279-
result=result, extra={"stats": {}, "sql": "", "experiments": {}}
280-
)
270+
return QueryResult(result=result, extra={"stats": {}, "sql": "", "experiments": {}})
281271

282272
@classmethod
283-
def from_dict(
284-
cls, data: Mapping[str, Any], entity_key: EntityKey
285-
) -> RPCSubscriptionData:
273+
def from_dict(cls, data: Mapping[str, Any], entity_key: EntityKey) -> RPCSubscriptionData:
286274
entity: Entity = get_entity(entity_key)
287275
metadata = {}
288276
for key in data.keys():
@@ -370,9 +358,7 @@ def add_conditions(
370358
elif isinstance(from_clause, EntityDS):
371359
entities = [(None, get_entity(from_clause.key))]
372360
else:
373-
raise InvalidSubscriptionError(
374-
"Only simple queries and join queries are supported"
375-
)
361+
raise InvalidSubscriptionError("Only simple queries and join queries are supported")
376362
for entity_alias, entity in entities:
377363
conditions_to_add: List[Expression] = [
378364
binary_condition(
@@ -406,9 +392,7 @@ def add_conditions(
406392
new_condition = combine_and_conditions(conditions_to_add)
407393
condition = query.get_condition()
408394
if condition:
409-
new_condition = binary_condition(
410-
BooleanFunctions.AND, condition, new_condition
411-
)
395+
new_condition = binary_condition(BooleanFunctions.AND, condition, new_condition)
412396

413397
query.set_ast_condition(new_condition)
414398

@@ -480,9 +464,7 @@ def run_query(
480464
)
481465

482466
@classmethod
483-
def from_dict(
484-
cls, data: Mapping[str, Any], entity_key: EntityKey
485-
) -> SnQLSubscriptionData:
467+
def from_dict(cls, data: Mapping[str, Any], entity_key: EntityKey) -> SnQLSubscriptionData:
486468
entity: Entity = get_entity(entity_key)
487469

488470
metadata = {}

snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from collections import defaultdict
33
from dataclasses import replace
44
from datetime import datetime
5-
from typing import Any, Callable, Dict, Iterable
5+
from typing import Any, Callable, Dict, Iterable, Optional
66

77
import sentry_sdk
88
from google.protobuf.json_format import MessageToDict
@@ -30,13 +30,14 @@
3030
from snuba.query import OrderBy, OrderByDirection, SelectedExpression
3131
from snuba.query.data_source.simple import Entity
3232
from snuba.query.dsl import Functions as f
33-
from snuba.query.dsl import column, literal
33+
from snuba.query.dsl import column, in_cond, literal, literals_array
3434
from snuba.query.expressions import Expression
3535
from snuba.query.logical import Query
3636
from snuba.query.query_settings import HTTPQuerySettings
3737
from snuba.request import Request as SnubaRequest
3838
from snuba.settings import ENABLE_FORMULA_RELIABILITY_DEFAULT
3939
from snuba.state import get_int_config
40+
from snuba.utils.metrics.timer import Timer
4041
from snuba.web.query import run_query
4142
from snuba.web.rpc.common.common import (
4243
add_existence_check_to_subscriptable_references,
@@ -61,6 +62,9 @@
6162
get_confidence_interval_column,
6263
get_count_column,
6364
)
65+
from snuba.web.rpc.v1.resolvers.common.cross_item_queries import (
66+
get_trace_ids_for_cross_item_query,
67+
)
6468
from snuba.web.rpc.v1.resolvers.common.formula_reliability import (
6569
FormulaReliabilityCalculator,
6670
)
@@ -141,13 +145,11 @@ def _convert_result_timeseries(
141145
# time_converted_to_integer_timestamp: row_data_for_that_time_bucket
142146
# }
143147
# }
144-
result_timeseries_timestamp_to_row: defaultdict[
145-
tuple[str, str], dict[int, Dict[str, Any]]
146-
] = defaultdict(dict)
147-
148-
query_duration = (
149-
request.meta.end_timestamp.seconds - request.meta.start_timestamp.seconds
148+
result_timeseries_timestamp_to_row: defaultdict[tuple[str, str], dict[int, Dict[str, Any]]] = (
149+
defaultdict(dict)
150150
)
151+
152+
query_duration = request.meta.end_timestamp.seconds - request.meta.start_timestamp.seconds
151153
time_buckets = [
152154
Timestamp(seconds=(request.meta.start_timestamp.seconds) + secs)
153155
for secs in range(0, query_duration, request.granularity_secs)
@@ -183,9 +185,7 @@ def _convert_result_timeseries(
183185
if not row_data:
184186
timeseries.data_points.append(DataPoint(data=0, data_present=False))
185187
else:
186-
extrapolation_context = ExtrapolationContext.from_row(
187-
timeseries.label, row_data
188-
)
188+
extrapolation_context = ExtrapolationContext.from_row(timeseries.label, row_data)
189189
if row_data.get(timeseries.label, None) is not None:
190190
timeseries.data_points.append(
191191
DataPoint(
@@ -199,9 +199,7 @@ def _convert_result_timeseries(
199199
else:
200200
timeseries.data_points.append(DataPoint(data=0, data_present=False))
201201

202-
if get_int_config(
203-
"enable_formula_reliability_ts", ENABLE_FORMULA_RELIABILITY_DEFAULT
204-
):
202+
if get_int_config("enable_formula_reliability_ts", ENABLE_FORMULA_RELIABILITY_DEFAULT):
205203
frc = FormulaReliabilityCalculator(request, data, time_buckets)
206204
for timeseries in result_timeseries.values():
207205
if timeseries.label in frc:
@@ -240,10 +238,7 @@ def _get_reliability_context_columns(
240238
which_oneof = expr.WhichOneof("expression")
241239
assert which_oneof in ["conditional_aggregation", "aggregation"]
242240
aggregation = getattr(expr, which_oneof)
243-
if (
244-
aggregation.extrapolation_mode
245-
== ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
246-
):
241+
if aggregation.extrapolation_mode == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED:
247242
confidence_interval_column = get_confidence_interval_column(
248243
aggregation, _get_attribute_key_to_expression_function(request_meta)
249244
)
@@ -271,9 +266,7 @@ def _get_reliability_context_columns(
271266
SelectedExpression(name=count_column.alias, expression=count_column)
272267
)
273268
elif expr.WhichOneof("expression") == "formula":
274-
if not get_int_config(
275-
"enable_formula_reliability_ts", ENABLE_FORMULA_RELIABILITY_DEFAULT
276-
):
269+
if not get_int_config("enable_formula_reliability_ts", ENABLE_FORMULA_RELIABILITY_DEFAULT):
277270
return []
278271
# also query for the left and right parts of the formula separately
279272
# this will be used later to calculate the reliability of the formula
@@ -286,9 +279,7 @@ def _get_reliability_context_columns(
286279
expression=_proto_expression_to_ast_expression(e, request_meta),
287280
)
288281
)
289-
additional_context_columns.extend(
290-
_get_reliability_context_columns(e, request_meta)
291-
)
282+
additional_context_columns.extend(_get_reliability_context_columns(e, request_meta))
292283
return additional_context_columns
293284

294285

@@ -311,13 +302,9 @@ def _proto_expression_to_ast_expression(
311302
case None:
312303
pass
313304
case "default_value_double":
314-
formula_expr = f.coalesce(
315-
formula_expr, expr.formula.default_value_double
316-
)
305+
formula_expr = f.coalesce(formula_expr, expr.formula.default_value_double)
317306
case "default_value_int64":
318-
formula_expr = f.coalesce(
319-
formula_expr, expr.formula.default_value_int64
320-
)
307+
formula_expr = f.coalesce(formula_expr, expr.formula.default_value_int64)
321308
case default:
322309
raise BadSnubaRPCRequestException(
323310
f"Unknown default_value in formula. Expected default_value_double or default_value_int64 but got {default}"
@@ -329,7 +316,7 @@ def _proto_expression_to_ast_expression(
329316
raise ValueError(f"Unknown expression type: {default}")
330317

331318

332-
def build_query(request: TimeSeriesRequest) -> Query:
319+
def build_query(request: TimeSeriesRequest, timer: Optional[Timer] = None) -> Query:
333320
entity = Entity(
334321
key=EntityKey("eap_items"),
335322
schema=get_entity(EntityKey("eap_items")).get_data_model(),
@@ -346,21 +333,30 @@ def build_query(request: TimeSeriesRequest) -> Query:
346333

347334
additional_context_columns = []
348335
for expr in request.expressions:
349-
additional_context_columns.extend(
350-
_get_reliability_context_columns(expr, request.meta)
351-
)
336+
additional_context_columns.extend(_get_reliability_context_columns(expr, request.meta))
352337

353338
groupby_columns = [
354339
SelectedExpression(
355340
name=attr_key.name,
356-
expression=_get_attribute_key_to_expression_function(request.meta)(
357-
attr_key
358-
),
341+
expression=_get_attribute_key_to_expression_function(request.meta)(attr_key),
359342
)
360343
for attr_key in request.group_by
361344
]
362345
item_type_conds = [f.equals(column("item_type"), request.meta.trace_item_type)]
363346

347+
# Handle cross item queries by first getting trace IDs
348+
additional_conditions = []
349+
if request.trace_filters and timer is not None:
350+
trace_ids = get_trace_ids_for_cross_item_query(
351+
request, request.meta, list(request.trace_filters), timer
352+
)
353+
additional_conditions.append(
354+
in_cond(
355+
column("trace_id"),
356+
literals_array(None, [literal(trace_id) for trace_id in trace_ids]),
357+
)
358+
)
359+
364360
res = Query(
365361
from_clause=entity,
366362
selected_columns=[
@@ -402,6 +398,7 @@ def build_query(request: TimeSeriesRequest) -> Query:
402398
request.filter, _get_attribute_key_to_expression_function(request.meta)
403399
),
404400
*item_type_conds,
401+
*additional_conditions,
405402
),
406403
groupby=[
407404
column("time_slot"),
@@ -410,17 +407,15 @@ def build_query(request: TimeSeriesRequest) -> Query:
410407
for attr_key in request.group_by
411408
],
412409
],
413-
order_by=[
414-
OrderBy(expression=column("time_slot"), direction=OrderByDirection.ASC)
415-
],
410+
order_by=[OrderBy(expression=column("time_slot"), direction=OrderByDirection.ASC)],
416411
)
417412
treeify_or_and_conditions(res)
418413
add_existence_check_to_subscriptable_references(res)
419414
return res
420415

421416

422417
def _build_snuba_request(
423-
request: TimeSeriesRequest, query_settings: HTTPQuerySettings
418+
request: TimeSeriesRequest, query_settings: HTTPQuerySettings, timer: Optional[Timer] = None
424419
) -> SnubaRequest:
425420
if request.meta.trace_item_type == TraceItemType.TRACE_ITEM_TYPE_LOG:
426421
team = "ourlogs"
@@ -434,7 +429,7 @@ def _build_snuba_request(
434429
return SnubaRequest(
435430
id=uuid.UUID(request.meta.request_id),
436431
original_body=MessageToDict(request),
437-
query=build_query(request),
432+
query=build_query(request, timer),
438433
query_settings=query_settings,
439434
attribution_info=AttributionInfo(
440435
referrer=request.meta.referrer,
@@ -464,18 +459,14 @@ def resolve(
464459
# if the user passes it in
465460
assert len(in_msg.aggregations) == 0
466461

467-
query_settings = (
468-
setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings()
469-
)
462+
query_settings = setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings()
470463
try:
471-
routing_decision.strategy.merge_clickhouse_settings(
472-
routing_decision, query_settings
473-
)
464+
routing_decision.strategy.merge_clickhouse_settings(routing_decision, query_settings)
474465
query_settings.set_sampling_tier(routing_decision.tier)
475466
except Exception as e:
476467
sentry_sdk.capture_message(f"Error merging clickhouse settings: {e}")
477468

478-
snuba_request = _build_snuba_request(in_msg, query_settings)
469+
snuba_request = _build_snuba_request(in_msg, query_settings, self._timer)
479470
res = run_query(
480471
dataset=PluggableDataset(name="eap", all_entities=[]),
481472
request=snuba_request,

tests/web/rpc/v1/conftest.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from typing import Generator
2+
3+
import pytest
4+
5+
from snuba.datasets.factory import reset_dataset_factory
6+
from snuba.migrations.groups import MigrationGroup
7+
from snuba.migrations.runner import Runner
8+
9+
10+
@pytest.fixture
11+
def eap(request: pytest.FixtureRequest, create_databases: None) -> Generator[None, None, None]:
12+
"""
13+
A custom ClickHouse fixture that only runs EAP (Events Analytics Platform) migrations and Outcomes migrations (for storage routing).
14+
This is much faster than running all migrations for tests that only need EAP tables.
15+
16+
Use this with @pytest.mark.eap marker.
17+
"""
18+
if not request.node.get_closest_marker("eap"):
19+
pytest.fail("Need to use eap marker if eap fixture is used")
20+
21+
try:
22+
reset_dataset_factory()
23+
# Run only SYSTEM migrations (required for migrations table) and EAP migrations
24+
runner = Runner()
25+
runner.run_all(group=MigrationGroup.EVENTS_ANALYTICS_PLATFORM, force=True)
26+
runner.run_all(group=MigrationGroup.OUTCOMES, force=True)
27+
yield
28+
finally:
29+
# Import here to avoid circular imports
30+
from tests.conftest import _clear_db
31+
32+
_clear_db()
33+
34+
35+
# Hook to modify test collection
36+
def pytest_runtest_setup(item: pytest.Item) -> None:
37+
"""Custom setup to handle eap marker."""
38+
if item.get_closest_marker("eap"):
39+
# Remove block_clickhouse_db if it was added by parent conftest
40+
fixturenames = getattr(item, "fixturenames", None)
41+
if fixturenames is not None:
42+
if "block_clickhouse_db" in fixturenames:
43+
fixturenames.remove("block_clickhouse_db")
44+
# Add our custom fixture if not already present
45+
if "eap" not in fixturenames:
46+
fixturenames.append("eap")

0 commit comments

Comments
 (0)