diff --git a/.vscode/settings.json b/.vscode/settings.json index 065c62aab84..2ba3223ef97 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -69,6 +69,7 @@ "mypy-type-checker.args": [ "--strict" ], - "makefile.configureOnOpen": false + "makefile.configureOnOpen": false, + "cursorpyright.analysis.autoImportCompletions": true } diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index ad17cd0e7de..aa6596c1ed6 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Callable, TypeVar, cast from google.protobuf.message import Message as ProtobufMessage @@ -56,9 +56,9 @@ def truncate_request_meta_to_day(meta: RequestMeta) -> None: start_timestamp = start_timestamp.replace( hour=0, minute=0, second=0, microsecond=0 ) - timedelta(days=1) - end_timestamp = end_timestamp.replace( - hour=0, minute=0, second=0, microsecond=0 - ) + timedelta(days=1) + end_timestamp = end_timestamp.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta( + days=1 + ) meta.start_timestamp.seconds = int(start_timestamp.timestamp()) meta.end_timestamp.seconds = int(end_timestamp.timestamp()) @@ -142,51 +142,32 @@ def trace_item_filters_to_expression( if len(filters) == 0: return literal(True) elif len(filters) == 1: - return trace_item_filters_to_expression( - filters[0], attribute_key_to_expression - ) + return trace_item_filters_to_expression(filters[0], attribute_key_to_expression) return and_cond( - *( - trace_item_filters_to_expression(x, attribute_key_to_expression) - for x in filters - ) + *(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters) ) if item_filter.HasField("or_filter"): filters = item_filter.or_filter.filters if len(filters) == 0: - raise BadSnubaRPCRequestException( - "Invalid trace item filter, empty 'or' clause" - ) + raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'or' clause") elif len(filters) == 1: - return trace_item_filters_to_expression( - filters[0], attribute_key_to_expression - ) + return trace_item_filters_to_expression(filters[0], attribute_key_to_expression) return or_cond( - *( - trace_item_filters_to_expression(x, attribute_key_to_expression) - for x in filters - ) + *(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters) ) if item_filter.HasField("not_filter"): filters = item_filter.not_filter.filters if len(filters) == 0: - raise BadSnubaRPCRequestException( - "Invalid trace item filter, empty 'not' clause" - ) + raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'not' clause") elif len(filters) == 1: return not_cond( - trace_item_filters_to_expression( - filters[0], attribute_key_to_expression - ) + trace_item_filters_to_expression(filters[0], attribute_key_to_expression) ) return not_cond( and_cond( - *( - trace_item_filters_to_expression(x, attribute_key_to_expression) - for x in filters - ) + *(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters) ) ) @@ -198,9 +179,7 @@ def trace_item_filters_to_expression( value_type = v.WhichOneof("value") if value_type is None: - raise BadSnubaRPCRequestException( - "comparison does not have a right hand side" - ) + raise BadSnubaRPCRequestException("comparison does not have a right hand side") if v.is_null: v_expression: Expression = literal(None) @@ -246,9 +225,7 @@ def trace_item_filters_to_expression( ) # we redefine the way equals works for nulls # now null=null is true - expr_with_null = or_cond( - expr, and_cond(f.isNull(k_expression), f.isNull(v_expression)) - ) + expr_with_null = or_cond(expr, and_cond(f.isNull(k_expression), f.isNull(v_expression))) return expr_with_null if op == ComparisonFilter.OP_NOT_EQUALS: _check_non_string_values_cannot_ignore_case(item_filter.comparison_filter) @@ -259,9 +236,7 @@ def trace_item_filters_to_expression( ) # we redefine the way not equals works for nulls # now null!=null is true - expr_with_null = or_cond( - expr, f.xor(f.isNull(k_expression), f.isNull(v_expression)) - ) + expr_with_null = or_cond(expr, f.xor(f.isNull(k_expression), f.isNull(v_expression))) return expr_with_null if op == ComparisonFilter.OP_LIKE: if k.type != AttributeKey.Type.TYPE_STRING: @@ -354,11 +329,15 @@ def timestamp_in_range_condition(start_ts: int, end_ts: int) -> Expression: return and_cond( f.less( column("timestamp"), - f.toDateTime(end_ts), + f.toDateTime( + datetime.fromtimestamp(end_ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + ), ), f.greaterOrEquals( column("timestamp"), - f.toDateTime(start_ts), + f.toDateTime( + datetime.fromtimestamp(start_ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + ), ), ) @@ -372,9 +351,7 @@ def base_conditions_and(meta: RequestMeta, *other_exprs: Expression) -> Expressi """ return and_cond( project_id_and_org_conditions(meta), - timestamp_in_range_condition( - meta.start_timestamp.seconds, meta.end_timestamp.seconds - ), + timestamp_in_range_condition(meta.start_timestamp.seconds, meta.end_timestamp.seconds), *other_exprs, ) diff --git a/snuba/web/rpc/storage_routing/routing_strategies/common.py b/snuba/web/rpc/storage_routing/routing_strategies/common.py new file mode 100644 index 00000000000..d4b219785f5 --- /dev/null +++ b/snuba/web/rpc/storage_routing/routing_strategies/common.py @@ -0,0 +1,17 @@ +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType + + +class OutcomeCategory: + SPAN_INDEXED = 16 + LOG_ITEM = 23 + + +class Outcome: + ACCEPTED = 0 + + +ITEM_TYPE_TO_OUTCOME_CATEGORY = { + TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED: OutcomeCategory.SPAN_INDEXED, + TraceItemType.TRACE_ITEM_TYPE_SPAN: OutcomeCategory.SPAN_INDEXED, + TraceItemType.TRACE_ITEM_TYPE_LOG: OutcomeCategory.LOG_ITEM, +} diff --git a/snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py b/snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py index 457db5e1a77..30ec1e52e23 100644 --- a/snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py +++ b/snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py @@ -28,6 +28,10 @@ treeify_or_and_conditions, ) from snuba.web.rpc.storage_routing.common import extract_message_meta +from snuba.web.rpc.storage_routing.routing_strategies.common import ( + ITEM_TYPE_TO_OUTCOME_CATEGORY, + Outcome, +) from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( BaseRoutingStrategy, RoutingContext, @@ -35,22 +39,6 @@ ) -# TODO import these from sentry-relay -class OutcomeCategory: - SPAN_INDEXED = 16 - LOG_ITEM = 23 - - -class Outcome: - ACCEPTED = 0 - - -_ITEM_TYPE_TO_OUTCOME = { - TraceItemType.TRACE_ITEM_TYPE_SPAN: OutcomeCategory.SPAN_INDEXED, - TraceItemType.TRACE_ITEM_TYPE_LOG: OutcomeCategory.LOG_ITEM, -} - - def project_id_and_org_conditions(meta: RequestMeta) -> Expression: return and_cond( in_cond( @@ -111,11 +99,7 @@ def get_ingested_items_for_timerange(self, routing_context: RoutingContext) -> i ), f.equals(column("outcome"), Outcome.ACCEPTED), f.equals( - column("category"), - _ITEM_TYPE_TO_OUTCOME.get( - in_msg_meta.trace_item_type, - OutcomeCategory.SPAN_INDEXED, - ), + column("category"), ITEM_TYPE_TO_OUTCOME_CATEGORY[in_msg_meta.trace_item_type] ), ), ) @@ -187,7 +171,7 @@ def _get_routing_decision(self, routing_context: RoutingContext) -> RoutingDecis # that is necessary for traces anyways # if the type is specified and we don't know its outcome, route to Tier_1 in_msg_meta.trace_item_type != TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED - and in_msg_meta.trace_item_type not in _ITEM_TYPE_TO_OUTCOME + and in_msg_meta.trace_item_type not in ITEM_TYPE_TO_OUTCOME_CATEGORY ): return routing_decision # if we're querying a short enough timeframe, don't bother estimating, route to tier 1 and call it a day diff --git a/snuba/web/rpc/storage_routing/routing_strategies/outcomes_flex_time.py b/snuba/web/rpc/storage_routing/routing_strategies/outcomes_flex_time.py new file mode 100644 index 00000000000..329439aefdb --- /dev/null +++ b/snuba/web/rpc/storage_routing/routing_strategies/outcomes_flex_time.py @@ -0,0 +1,209 @@ +import math +import uuid +from typing import cast + +import sentry_sdk +from google.protobuf.json_format import MessageToDict +from google.protobuf.timestamp_pb2 import Timestamp +from google.protobuf.timestamp_pb2 import Timestamp as TimestampProto +from sentry_protos.snuba.v1.request_common_pb2 import ( + PageToken, + RequestMeta, + TraceItemType, +) + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.clickhouse.query import Expression +from snuba.configs.configuration import Configuration +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.downsampled_storage_tiers import Tier +from snuba.query import SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.dsl import Functions as f +from snuba.query.dsl import and_cond, column, in_cond, literal, literals_array +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.web.query import run_query +from snuba.web.rpc.common.common import ( + timestamp_in_range_condition, + treeify_or_and_conditions, +) +from snuba.web.rpc.storage_routing.common import extract_message_meta +from snuba.web.rpc.storage_routing.routing_strategies.common import ( + ITEM_TYPE_TO_OUTCOME_CATEGORY, + Outcome, +) +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + BaseRoutingStrategy, + RoutingContext, + RoutingDecision, + TimeWindow, +) + + +def project_id_and_org_conditions(meta: RequestMeta) -> Expression: + return and_cond( + in_cond( + column("project_id"), + literals_array( + alias=None, + literals=[literal(pid) for pid in meta.project_ids], + ), + ), + f.equals(column("org_id"), meta.organization_id), + ) + + +def _get_request_time_window(routing_context: RoutingContext) -> TimeWindow: + """Gets the time window of the request, if there is a page token with a start and end timestamp, + it gets it from there, otherwise, it gets it from the message meta + """ + meta = extract_message_meta(routing_context.in_msg) + if routing_context.in_msg.HasField("page_token"): + page_token: PageToken = getattr(routing_context.in_msg, "page_token") + if page_token.HasField("filter_offset"): + time_window = TimeWindow( + start_timestamp=meta.start_timestamp, end_timestamp=meta.end_timestamp + ) + if page_token.filter_offset.HasField("and_filter"): + for filter in page_token.filter_offset.and_filter.filters: + if ( + filter.HasField("comparison_filter") + and filter.comparison_filter.key.name == "start_timestamp" + ): + time_window.start_timestamp = Timestamp( + seconds=filter.comparison_filter.value.val_int + ) + if ( + filter.HasField("comparison_filter") + and filter.comparison_filter.key.name == "end_timestamp" + ): + time_window.end_timestamp = Timestamp( + seconds=filter.comparison_filter.value.val_int + ) + return time_window + return TimeWindow(start_timestamp=meta.start_timestamp, end_timestamp=meta.end_timestamp) + + +class OutcomesFlexTimeRoutingStrategy(BaseRoutingStrategy): + def _additional_config_definitions(self) -> list[Configuration]: + return [ + Configuration( + name="max_items_to_query", + description="Maximum number of items to query before adjusting time window", + value_type=int, + default=100_000_000, + ), + ] + + def get_ingested_items_for_timerange( + self, routing_context: RoutingContext, time_window: TimeWindow + ) -> int: + entity = Entity( + key=EntityKey("outcomes"), + schema=get_entity(EntityKey("outcomes")).get_data_model(), + sample=None, + ) + in_msg_meta = extract_message_meta(routing_context.in_msg) + query = Query( + from_clause=entity, + selected_columns=[ + SelectedExpression( + name="num_items", + expression=f.sum(column("quantity"), alias="num_items"), + ) + ], + condition=and_cond( + project_id_and_org_conditions(in_msg_meta), + timestamp_in_range_condition( + time_window.start_timestamp.seconds, + time_window.end_timestamp.seconds, + ), + f.equals(column("outcome"), Outcome.ACCEPTED), + f.equals( + column("category"), ITEM_TYPE_TO_OUTCOME_CATEGORY[in_msg_meta.trace_item_type] + ), + ), + ) + snuba_request = SnubaRequest( + id=uuid.uuid4(), + original_body=MessageToDict(routing_context.in_msg), + query=query, + query_settings=HTTPQuerySettings(), + attribution_info=AttributionInfo( + referrer=in_msg_meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": in_msg_meta.organization_id, + "referrer": "eap.route_outcomes", + }, + app_id=AppID("eap"), + parent_api="eap.route_outcomes", + ), + ) + treeify_or_and_conditions(query) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=routing_context.timer, + ) + routing_context.extra_info["estimation_sql"] = res.extra.get("sql", "") + return cast(int, res.result.get("data", [{}])[0].get("num_items", 0)) + + def _adjust_time_window(self, routing_context: RoutingContext) -> TimeWindow: + """Adjust the time window to ensure we don't exceed MAX_ITEMS_TO_QUERY.""" + original_time_window = _get_request_time_window(routing_context) + original_end_ts = original_time_window.end_timestamp.seconds + original_start_ts = original_time_window.start_timestamp.seconds + + max_items = self.get_config_value("max_items_to_query") + + ingested_items = self.get_ingested_items_for_timerange( + routing_context, original_time_window + ) + factor = ingested_items / max_items + if factor > 1: + window_length = original_end_ts - original_start_ts + + start_timestamp_proto = TimestampProto( + seconds=original_end_ts - math.floor((window_length / factor)) + ) + end_timestamp_proto = TimestampProto(seconds=original_end_ts) + return TimeWindow(start_timestamp_proto, end_timestamp_proto) + + return original_time_window + + def _get_routing_decision(self, routing_context: RoutingContext) -> RoutingDecision: + routing_decision = RoutingDecision( + routing_context=routing_context, + strategy=self, + tier=Tier.TIER_1, # Always TIER_1 + clickhouse_settings={}, + can_run=True, + ) + + in_msg_meta = extract_message_meta(routing_decision.routing_context.in_msg) + + # if type is unknown, just route to tier 1, no adjustment + if ( + in_msg_meta.trace_item_type != TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED + and in_msg_meta.trace_item_type not in ITEM_TYPE_TO_OUTCOME_CATEGORY + ): + routing_decision.routing_context.extra_info["unknown_item_type"] = True + sentry_sdk.capture_message( + f"Trace Item {in_msg_meta.trace_item_type} does not have an associated outcome" + ) + return routing_decision + + # Adjust time window based on outcomes + adjusted_time_window = self._adjust_time_window(routing_context) + if adjusted_time_window: + routing_decision.time_window = adjusted_time_window + routing_decision.routing_context.extra_info["time_window_adjusted"] = True + + return routing_decision diff --git a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py index e0c0f55baa7..218285a3e28 100644 --- a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py +++ b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py @@ -9,6 +9,7 @@ import sentry_sdk from google.protobuf.json_format import MessageToDict from google.protobuf.message import Message as ProtobufMessage +from google.protobuf.timestamp_pb2 import Timestamp as TimestampProto from sentry_kafka_schemas.schema_types import snuba_queries_v1 from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig from sentry_protos.snuba.v1.endpoint_time_series_pb2 import TimeSeriesRequest @@ -60,6 +61,15 @@ class RoutingContext: extra_info: dict[str, Any] = field(default_factory=dict) +@dataclass +class TimeWindow: + start_timestamp: TimestampProto + end_timestamp: TimestampProto + + def length_hours(self) -> float: + return (self.end_timestamp.seconds - self.start_timestamp.seconds) / 3600 + + @dataclass class RoutingDecision: routing_context: RoutingContext @@ -67,6 +77,7 @@ class RoutingDecision: tier: Tier = Tier.TIER_1 clickhouse_settings: dict[str, str] = field(default_factory=dict) can_run: bool | None = None + time_window: TimeWindow | None = None def to_log_dict(self) -> dict[str, Any]: assert self.routing_context is not None diff --git a/snuba/web/rpc/storage_routing/routing_strategy_selector.py b/snuba/web/rpc/storage_routing/routing_strategy_selector.py index 11b88c86814..d3d20c75035 100644 --- a/snuba/web/rpc/storage_routing/routing_strategy_selector.py +++ b/snuba/web/rpc/storage_routing/routing_strategy_selector.py @@ -5,6 +5,7 @@ import sentry_sdk from google.protobuf.message import Message as ProtobufMessage +from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig from snuba import settings from snuba.state import get_config @@ -12,6 +13,9 @@ from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import ( OutcomesBasedRoutingStrategy, ) +from snuba.web.rpc.storage_routing.routing_strategies.outcomes_flex_time import ( + OutcomesFlexTimeRoutingStrategy, +) from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( BaseRoutingStrategy, RoutingContext, @@ -36,13 +40,9 @@ def from_json(cls, config_dict: dict[str, Any]) -> "StorageRoutingConfig": if config_dict == {}: return _DEFAULT_STORAGE_ROUTING_CONFIG try: - if "version" not in config_dict or not isinstance( - config_dict["version"], int - ): + if "version" not in config_dict or not isinstance(config_dict["version"], int): raise ValueError("please specify version as an integer") - if "config" not in config_dict or not isinstance( - config_dict["config"], dict - ): + if "config" not in config_dict or not isinstance(config_dict["config"], dict): raise ValueError("please specify config as a dict") version = config_dict["version"] @@ -59,9 +59,7 @@ def from_json(cls, config_dict: dict[str, Any]) -> "StorageRoutingConfig": try: BaseRoutingStrategy.get_from_name(strategy_name)() except Exception: - raise ValueError( - f"{strategy_name} does not inherit from BaseRoutingStrategy" - ) + raise ValueError(f"{strategy_name} does not inherit from BaseRoutingStrategy") routing_strategy_and_percentage_routed[strategy_name] = percentage total_percentage += percentage @@ -87,15 +85,11 @@ def from_json(cls, config_dict: dict[str, Any]) -> "StorageRoutingConfig": class RoutingStrategySelector: - def get_storage_routing_config( - self, in_msg: ProtobufMessage - ) -> StorageRoutingConfig: + def get_storage_routing_config(self, in_msg: ProtobufMessage) -> StorageRoutingConfig: in_msg_meta = extract_message_meta(in_msg) organization_id = str(in_msg_meta.organization_id) try: - overrides = json.loads( - str(get_config(_STORAGE_ROUTING_CONFIG_OVERRIDE_KEY, "{}")) - ) + overrides = json.loads(str(get_config(_STORAGE_ROUTING_CONFIG_OVERRIDE_KEY, "{}"))) if organization_id in overrides.keys(): return StorageRoutingConfig.from_json(overrides[organization_id]) @@ -105,11 +99,17 @@ def get_storage_routing_config( sentry_sdk.capture_message(f"Error getting storage routing config: {e}") return _DEFAULT_STORAGE_ROUTING_CONFIG - def select_routing_strategy( - self, routing_context: RoutingContext - ) -> BaseRoutingStrategy: + def select_routing_strategy(self, routing_context: RoutingContext) -> BaseRoutingStrategy: try: in_msg_meta = extract_message_meta(routing_context.in_msg) + + if ( + in_msg_meta.HasField("downsampled_storage_config") + and in_msg_meta.downsampled_storage_config.mode + == DownsampledStorageConfig.Mode.MODE_HIGHEST_ACCURACY_FLEXTIME + ): + return OutcomesFlexTimeRoutingStrategy() + combined_org_and_project_ids = f"{in_msg_meta.organization_id}:{'.'.join(str(pid) for pid in sorted(in_msg_meta.project_ids))}" bucket = ( int(hashlib.md5(combined_org_and_project_ids.encode()).hexdigest(), 16) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py index 52cfcd53bbf..c584393de30 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py @@ -17,7 +17,16 @@ RequestMeta, TraceItemType, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + AttributeValue, + ExtrapolationMode, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + AndFilter, + ComparisonFilter, + TraceItemFilter, +) from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo @@ -41,6 +50,7 @@ from snuba.web.rpc.common.common import ( add_existence_check_to_subscriptable_references, base_conditions_and, + timestamp_in_range_condition, trace_item_filters_to_expression, treeify_or_and_conditions, use_sampling_factor, @@ -52,6 +62,7 @@ from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( RoutingDecision, + TimeWindow, ) from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable from snuba.web.rpc.v1.resolvers.common.aggregation import ( @@ -325,7 +336,33 @@ def _column_to_expression(column: Column, request_meta: RequestMeta) -> Expressi ) -def build_query(request: TraceItemTableRequest, timer: Optional[Timer] = None) -> Query: +def _get_offset_from_page_token(page_token: PageToken | None) -> int: + if page_token is None: + return 0 + if page_token.HasField("offset"): + return page_token.offset + elif page_token.HasField("filter_offset"): + # iterate through the and_filter filters and find the offset comparison filter + if page_token.filter_offset.HasField("and_filter"): + for filter in page_token.filter_offset.and_filter.filters: + if ( + filter.HasField("comparison_filter") + and filter.comparison_filter.key.name == "offset" + ): + return filter.comparison_filter.value.val_int + return 0 + # raise BadSnubaRPCRequestException("offset filter not found in filter_offset", page_token) + else: + raise BadSnubaRPCRequestException("filter_offset must be an and_filter") + else: + return 0 + + +def build_query( + request: TraceItemTableRequest, + time_window: TimeWindow | None = None, + timer: Optional[Timer] = None, +) -> Query: entity = Entity( key=EntityKey("eap_items"), schema=get_entity(EntityKey("eap_items")).get_data_model(), @@ -348,7 +385,7 @@ def build_query(request: TraceItemTableRequest, timer: Optional[Timer] = None) - item_type_conds = [f.equals(snuba_column("item_type"), request.meta.trace_item_type)] # Handle cross item queries by first getting trace IDs - additional_conditions = [] + additional_conditions: List[Expression] = [] if request.trace_filters and timer is not None: trace_ids = get_trace_ids_for_cross_item_query( request, request.meta, list(request.trace_filters), timer @@ -359,8 +396,15 @@ def build_query(request: TraceItemTableRequest, timer: Optional[Timer] = None) - literals_array(None, [literal(trace_id) for trace_id in trace_ids]), ) ) - + if time_window is not None: + additional_conditions.append( + timestamp_in_range_condition( + time_window.start_timestamp.seconds, + time_window.end_timestamp.seconds, + ) + ) groupby = [attribute_key_to_expression(attr_key) for attr_key in request.group_by] + res = Query( from_clause=entity, selected_columns=selected_columns, @@ -380,10 +424,11 @@ def build_query(request: TraceItemTableRequest, timer: Optional[Timer] = None) - ), groupby=groupby, # Only support offset page tokens for now - offset=request.page_token.offset, + offset=_get_offset_from_page_token(request.page_token), # protobuf sets limit to 0 by default if it is not set, # give it a default value that will actually return data - limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, + # we add 1 to the limit to know if there are more rows to fetch + limit=(request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT) + 1, having=( aggregation_filter_to_expression(request.aggregation_filter, request.meta) if request.HasField("aggregation_filter") @@ -397,17 +442,93 @@ def build_query(request: TraceItemTableRequest, timer: Optional[Timer] = None) - def _get_page_token( - request: TraceItemTableRequest, response: list[TraceItemColumnValues] + request: TraceItemTableRequest, + response: list[TraceItemColumnValues], + # amount of rows returned in the DB request (which can be one more than the limit) + num_rows_returned: int, + # time window of the original request without any adjustments by routing strategies + original_time_window: TimeWindow, + # time window of the current request after any adjustments by routing strategies + time_window: TimeWindow | None, ) -> PageToken: if not response: return PageToken(offset=0) - num_rows = len(response[0].results) - return PageToken(offset=request.page_token.offset + num_rows) + current_offset = _get_offset_from_page_token(request.page_token) + num_rows_in_response = len(response[0].results) + if time_window is not None: + if num_rows_returned > request.limit: + # there are more rows in this window so we maintain the same time window and advance the offset + return PageToken( + filter_offset=TraceItemFilter( + and_filter=AndFilter( + filters=[ + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="start_timestamp"), + op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, + value=AttributeValue( + val_int=time_window.start_timestamp.seconds + ), + ) + ), + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="end_timestamp"), + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue(val_int=time_window.end_timestamp.seconds), + ) + ), + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="offset"), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue( + val_int=current_offset + num_rows_in_response + ), + ) + ), + ] + ) + ) + ) + else: + # there are no more rows in this window so we return the next window + # return the next window where the end timestamp is the start timestamp and the start timestamp is the original start timestamp + # the routing strategy will properly truncate the time window of the next request + return PageToken( + filter_offset=TraceItemFilter( + and_filter=AndFilter( + filters=[ + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="start_timestamp"), + op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, + value=AttributeValue( + val_int=original_time_window.start_timestamp.seconds + ), + ) + ), + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="end_timestamp"), + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue( + val_int=time_window.start_timestamp.seconds + ), + ) + ), + ] + ) + ) + ) + else: + return PageToken(offset=request.page_token.offset + num_rows_in_response) def _build_snuba_request( request: TraceItemTableRequest, query_settings: HTTPQuerySettings, + time_window: TimeWindow | None = None, timer: Optional[Timer] = None, ) -> SnubaRequest: if request.meta.trace_item_type == TraceItemType.TRACE_ITEM_TYPE_LOG: @@ -422,7 +543,7 @@ def _build_snuba_request( return SnubaRequest( id=uuid.UUID(request.meta.request_id), original_body=MessageToDict(request), - query=build_query(request, timer), + query=build_query(request, time_window, timer), query_settings=query_settings, attribution_info=AttributionInfo( referrer=request.meta.referrer, @@ -454,15 +575,25 @@ def resolve( query_settings.set_sampling_tier(routing_decision.tier) except Exception as e: sentry_sdk.capture_message(f"Error merging clickhouse settings: {e}") - - snuba_request = _build_snuba_request(in_msg, query_settings, self._timer) + original_time_window = TimeWindow( + start_timestamp=in_msg.meta.start_timestamp, end_timestamp=in_msg.meta.end_timestamp + ) + snuba_request = _build_snuba_request( + in_msg, query_settings, routing_decision.time_window, self._timer + ) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), request=snuba_request, timer=self._timer, ) routing_decision.routing_context.query_result = res - column_values = convert_results(in_msg, res.result.get("data", [])) + # we added 1 to the limit to know if there are more rows to fetch + # so we need to remove the last row + # TODO maybe use islice instead + data = res.result.get("data", []) + if in_msg.limit > 0 and len(data) > in_msg.limit: + data = data[:-1] + column_values = convert_results(in_msg, data) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, @@ -471,6 +602,12 @@ def resolve( ) return TraceItemTableResponse( column_values=column_values, - page_token=_get_page_token(in_msg, column_values), + page_token=_get_page_token( + in_msg, + column_values, + len(res.result.get("data", [])), + original_time_window, + routing_decision.time_window, + ), meta=response_meta, ) diff --git a/tests/conftest.py b/tests/conftest.py index 1c13daa9a4f..65f4b1fc4b5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,9 +27,7 @@ def pytest_configure() -> None: Set up the Sentry SDK to avoid errors hidden by configuration. Ensure the snuba_test database exists """ - assert ( - settings.TESTING - ), "settings.TESTING is False, try `SNUBA_SETTINGS=test` or `make test`" + assert settings.TESTING, "settings.TESTING is False, try `SNUBA_SETTINGS=test` or `make test`" initialize_snuba() setup_sentry() @@ -52,9 +50,11 @@ def create_databases() -> None: storage_sets=cluster["storage_sets"], single_node=cluster["single_node"], cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None, - distributed_cluster_name=cluster["distributed_cluster_name"] - if "distributed_cluster_name" in cluster - else None, + distributed_cluster_name=( + cluster["distributed_cluster_name"] + if "distributed_cluster_name" in cluster + else None + ), ) database_name = cluster["database"] @@ -161,9 +161,7 @@ def _build_migrations_cache() -> None: nodes = [*cluster.get_local_nodes(), *cluster.get_distributed_nodes()] for node in nodes: if (cluster, node) not in MIGRATIONS_CACHE: - connection = cluster.get_node_connection( - ClickhouseClientSettings.MIGRATE, node - ) + connection = cluster.get_node_connection(ClickhouseClientSettings.MIGRATE, node) rows = connection.execute( f"SELECT name, create_table_query FROM system.tables WHERE database='{database}'" ) @@ -195,14 +193,14 @@ def _clear_db() -> None: or storage_key == StorageKey.EAP_ITEMS_DOWNSAMPLE_8 or storage_key == StorageKey.EAP_ITEMS_DOWNSAMPLE_64 or storage_key == StorageKey.EAP_ITEMS_DOWNSAMPLE_512 + or storage_key == StorageKey.OUTCOMES_HOURLY + or storage_key == StorageKey.OUTCOMES_DAILY ): table_name = schema.get_local_table_name() # type: ignore nodes = [*cluster.get_local_nodes(), *cluster.get_distributed_nodes()] for node in nodes: - connection = cluster.get_node_connection( - ClickhouseClientSettings.MIGRATE, node - ) + connection = cluster.get_node_connection(ClickhouseClientSettings.MIGRATE, node) connection.execute(f"TRUNCATE TABLE IF EXISTS {database}.{table_name}") @@ -240,9 +238,7 @@ def clickhouse_db( # apply migrations from cache applied_nodes = set() for (cluster, node), tables in MIGRATIONS_CACHE.items(): - connection = cluster.get_node_connection( - ClickhouseClientSettings.MIGRATE, node - ) + connection = cluster.get_node_connection(ClickhouseClientSettings.MIGRATE, node) for table_name, create_table_query in tables.items(): if (node.host_name, node.port, table_name) in applied_nodes: continue diff --git a/tests/web/rpc/v1/conftest.py b/tests/web/rpc/v1/conftest.py new file mode 100644 index 00000000000..8ddcbdec009 --- /dev/null +++ b/tests/web/rpc/v1/conftest.py @@ -0,0 +1,45 @@ +from typing import Generator + +import pytest + +from snuba.datasets.factory import reset_dataset_factory +from snuba.migrations.groups import MigrationGroup +from snuba.migrations.runner import Runner + + +@pytest.fixture +def eap(request: pytest.FixtureRequest, create_databases: None) -> Generator[None, None, None]: + """ + A custom ClickHouse fixture that only runs EAP (Events Analytics Platform) migrations. + This is much faster than running all migrations for tests that only need EAP tables. + Use this with @pytest.mark.eap marker. + """ + if not request.node.get_closest_marker("eap"): + pytest.fail("Need to use eap marker if eap fixture is used") + + try: + reset_dataset_factory() + # Run only SYSTEM migrations (required for migrations table) and EAP migrations + runner = Runner() + runner.run_all(group=MigrationGroup.EVENTS_ANALYTICS_PLATFORM, force=True) + runner.run_all(group=MigrationGroup.OUTCOMES, force=True) + yield + finally: + # Import here to avoid circular imports + from tests.conftest import _clear_db + + _clear_db() + + +# Hook to modify test collection +def pytest_runtest_setup(item: pytest.Item) -> None: + """Custom setup to handle eap marker.""" + if item.get_closest_marker("eap"): + # Remove block_clickhouse_db if it was added by parent conftest + fixturenames = getattr(item, "fixturenames", None) + if fixturenames is not None: + if "block_clickhouse_db" in fixturenames: + fixturenames.remove("block_clickhouse_db") + # Add our custom fixture if not already present + if "eap" not in fixturenames: + fixturenames.append("eap") diff --git a/tests/web/rpc/v1/routing_strategies/common.py b/tests/web/rpc/v1/routing_strategies/common.py new file mode 100644 index 00000000000..1609c15c471 --- /dev/null +++ b/tests/web/rpc/v1/routing_strategies/common.py @@ -0,0 +1,104 @@ +from datetime import datetime +from typing import Dict, List, Tuple + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.storage_routing.routing_strategies.common import ( + Outcome, + OutcomeCategory, +) +from tests.helpers import write_raw_unprocessed_events + + +def gen_ingest_outcome( + time: datetime, + num: int, + project_id: int = 1, + org_id: int = 1, + outcome_category: int = OutcomeCategory.SPAN_INDEXED, +) -> Dict[str, int | str | None]: + """Generate a single ingest outcome record. + + Args: + time: The timestamp for the outcome + num: The number of outcomes/quantity + project_id: The project ID (defaults to 1) + org_id: The organization ID (defaults to 1) + outcome_category: The outcome category (defaults to SPAN_INDEXED) + + Returns: + A dictionary representing an outcome record + """ + return { + "org_id": org_id, + "project_id": project_id, + "key_id": None, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "outcome": Outcome.ACCEPTED, + "reason": None, + "event_id": None, + "quantity": num, + "category": outcome_category, + } + + +def store_outcomes_data( + outcome_data: List[Tuple[datetime, int]], + outcome_category: int = OutcomeCategory.SPAN_INDEXED, + org_id: int = 1, + project_id: int = 1, +) -> None: + """Store outcomes data to the outcomes storage. + + Args: + outcome_data: List of tuples containing either: + - (datetime, number_of_outcomes) - uses outcome_category parameter + outcome_category: The outcome category to use for all records when using 2-tuple format + (defaults to SPAN_INDEXED, ignored for 3-tuple format) + """ + outcomes_storage = get_storage(StorageKey("outcomes_raw")) + messages = [] + + for item in outcome_data: + if len(item) == 2: + time, num_outcomes = item + category = outcome_category + else: + raise ValueError(f"Invalid tuple length: {len(item)}. Expected 2 or 3 elements.") + + messages.append( + gen_ingest_outcome( + time, num_outcomes, outcome_category=category, org_id=org_id, project_id=project_id + ) + ) + write_raw_unprocessed_events(outcomes_storage, messages) # type: ignore + + +# Available outcome categories (from OutcomeCategory class): +# - OutcomeCategory.SPAN_INDEXED = 16 (for spans) +# - OutcomeCategory.LOG_ITEM = 23 (for logs) + +# Usage examples: +# 1. Store span outcomes (default): +# store_outcomes_data([(datetime1, 1000), (datetime2, 2000)]) +# +# 2. Store log outcomes: +# store_outcomes_data([(datetime1, 1000), (datetime2, 2000)], OutcomeCategory.LOG_ITEM) +# +# 3. Store mixed outcomes: +# store_outcomes_data([ +# (datetime1, 1000, OutcomeCategory.SPAN_INDEXED), +# (datetime2, 2000, OutcomeCategory.LOG_ITEM) +# ]) + +# Backward compatibility alias +gen_span_ingest_outcome = gen_ingest_outcome + +# Re-export for convenience +__all__ = [ + "store_outcomes_data", + "gen_ingest_outcome", + "gen_span_ingest_outcome", + "OutcomeCategory", + "Outcome", +] diff --git a/tests/web/rpc/v1/routing_strategies/test_outcomes_based.py b/tests/web/rpc/v1/routing_strategies/test_outcomes_based.py index cf7674e76fe..796a785e297 100644 --- a/tests/web/rpc/v1/routing_strategies/test_outcomes_based.py +++ b/tests/web/rpc/v1/routing_strategies/test_outcomes_based.py @@ -1,5 +1,5 @@ from datetime import UTC, datetime, timedelta -from typing import Any, Dict +from typing import Any import pytest from google.protobuf.timestamp_pb2 import Timestamp @@ -8,20 +8,16 @@ from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType from snuba import state -from snuba.datasets.storages.factory import get_storage -from snuba.datasets.storages.storage_key import StorageKey from snuba.downsampled_storage_tiers import Tier from snuba.utils.metrics.timer import Timer from snuba.web.rpc.storage_routing.common import extract_message_meta from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import ( - Outcome, - OutcomeCategory, OutcomesBasedRoutingStrategy, ) from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( RoutingContext, ) -from tests.helpers import write_raw_unprocessed_events +from tests.web.rpc.v1.routing_strategies.common import store_outcomes_data BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta( hours=24 @@ -51,29 +47,15 @@ def _get_request_meta( ) -def gen_span_ingest_outcome(time: datetime, num: int) -> Dict[str, int | str | None]: - return { - "org_id": _PROJECT_ID, - "project_id": _ORG_ID, - "key_id": None, - "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "outcome": Outcome.ACCEPTED, - "reason": None, - "event_id": None, - "quantity": num, - "category": OutcomeCategory.SPAN_INDEXED, - } - - @pytest.fixture -def store_outcomes_data() -> None: - outcomes_storage = get_storage(StorageKey("outcomes_raw")) - messages = [] +def store_outcomes_fixture(clickhouse_db: Any) -> None: + # Generate 24 hours of outcomes data with 1M outcomes per hour + outcome_data = [] for hour in range(24): time = BASE_TIME - timedelta(hours=hour) - messages.append(gen_span_ingest_outcome(time, 1_000_000)) + outcome_data.append((time, 1_000_000)) - write_raw_unprocessed_events(outcomes_storage, messages) # type: ignore + store_outcomes_data(outcome_data) @pytest.mark.clickhouse_db @@ -100,7 +82,7 @@ def test_outcomes_based_routing_queries_daily_table() -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -def test_outcomes_based_routing_normal_mode(store_outcomes_data: Any) -> None: +def test_outcomes_based_routing_normal_mode(store_outcomes_fixture: Any) -> None: strategy = OutcomesBasedRoutingStrategy() request = TraceItemTableRequest(meta=_get_request_meta()) @@ -119,7 +101,7 @@ def test_outcomes_based_routing_normal_mode(store_outcomes_data: Any) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -def test_outcomes_based_routing_downsample(store_outcomes_data: Any) -> None: +def test_outcomes_based_routing_downsample(store_outcomes_fixture: Any) -> None: state.set_config("OutcomesBasedRoutingStrategy.max_items_before_downsampling", 5_000_000) strategy = OutcomesBasedRoutingStrategy() @@ -160,7 +142,7 @@ def test_outcomes_based_routing_downsample(store_outcomes_data: Any) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -def test_outcomes_based_routing_highest_accuracy_mode(store_outcomes_data: Any) -> None: +def test_outcomes_based_routing_highest_accuracy_mode(store_outcomes_fixture: Any) -> None: strategy = OutcomesBasedRoutingStrategy() request = TraceItemTableRequest(meta=_get_request_meta()) @@ -180,7 +162,7 @@ def test_outcomes_based_routing_highest_accuracy_mode(store_outcomes_data: Any) @pytest.mark.clickhouse_db @pytest.mark.redis_db def test_outcomes_based_routing_defaults_to_spans_for_unspecified_item_type( - store_outcomes_data: Any, + store_outcomes_fixture: Any, ) -> None: strategy = OutcomesBasedRoutingStrategy() diff --git a/tests/web/rpc/v1/routing_strategies/test_outcomes_flex_time.py b/tests/web/rpc/v1/routing_strategies/test_outcomes_flex_time.py new file mode 100644 index 00000000000..4dfe2f44668 --- /dev/null +++ b/tests/web/rpc/v1/routing_strategies/test_outcomes_flex_time.py @@ -0,0 +1,155 @@ +from datetime import UTC, datetime, timedelta + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest +from sentry_protos.snuba.v1.request_common_pb2 import ( + PageToken, + RequestMeta, + TraceItemType, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + AndFilter, + ComparisonFilter, + TraceItemFilter, +) + +from snuba.utils.metrics.timer import Timer +from snuba.web.rpc.storage_routing.routing_strategies.outcomes_flex_time import ( + OutcomesFlexTimeRoutingStrategy, +) +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + RoutingContext, +) +from tests.web.rpc.v1.routing_strategies.common import ( + OutcomeCategory, + store_outcomes_data, +) + +BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta( + hours=24 +) +_PROJECT_ID = 1 +_ORG_ID = 1 + + +def _get_request_meta( + start: datetime, + end: datetime, + downsampled_storage_config: DownsampledStorageConfig | None = None, +) -> RequestMeta: + + return RequestMeta( + project_ids=[_PROJECT_ID], + organization_id=_ORG_ID, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(start.timestamp())), + end_timestamp=Timestamp(seconds=int(end.timestamp())), + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_LOG, + downsampled_storage_config=downsampled_storage_config, + ) + + +def store_outcomes() -> None: + outcome_data = [] + for hour in range(25): + time = BASE_TIME - timedelta(hours=hour) + outcome_data.append((time, 10_000_000)) + + store_outcomes_data( + outcome_data, OutcomeCategory.LOG_ITEM, org_id=_ORG_ID, project_id=_PROJECT_ID + ) + + +@pytest.mark.eap +@pytest.mark.redis_db +def test_outcomes_flex_time_routing_strategy_with_data() -> None: + # store 10 million log items every hour for 24 hours + store_outcomes() + strategy = OutcomesFlexTimeRoutingStrategy() + strategy.set_config_value("max_items_to_query", 120_000_000) + # query the last 24 hours + request = TraceItemTableRequest( + meta=_get_request_meta(BASE_TIME - timedelta(hours=24), BASE_TIME) + ) + request.meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_LOG + request.meta.downsampled_storage_config.mode = DownsampledStorageConfig.MODE_NORMAL + + routing_decision = strategy.get_routing_decision( + RoutingContext( + in_msg=request, + timer=Timer("test"), + ) + ) + assert routing_decision.time_window is not None + # time range should be 12 hours because 120M items / 10M items per hour = 12 hours + assert ( + routing_decision.time_window.start_timestamp.seconds + == (BASE_TIME - timedelta(hours=12)).timestamp() + ) + assert routing_decision.time_window.end_timestamp.seconds == BASE_TIME.timestamp() + + +@pytest.mark.eap +@pytest.mark.redis_db +def test_outcomes_flex_time_routing_strategy_with_data_and_page_token() -> None: + store_outcomes() + strategy = OutcomesFlexTimeRoutingStrategy() + strategy.set_config_value("max_items_to_query", 120_000_000) + # this is the case where the original request time range is being shortened by the page token + # so even though the original request is for the past 24 hours, the page token specifies the request from 12 hours ago to 24 hours ago + + page_token_start_timestamp = BASE_TIME - timedelta(hours=24) + page_token_end_timestamp = BASE_TIME - timedelta(hours=12) + request = TraceItemTableRequest( + meta=_get_request_meta(BASE_TIME - timedelta(hours=24), BASE_TIME), + page_token=PageToken( + filter_offset=TraceItemFilter( + and_filter=AndFilter( + filters=[ + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + name="start_timestamp", type=AttributeKey.TYPE_INT + ), + op=ComparisonFilter.OP_GREATER_THAN, + value=AttributeValue( + val_int=int(page_token_start_timestamp.timestamp()) + ), + ) + ), + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="end_timestamp", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue( + val_int=int(page_token_end_timestamp.timestamp()) + ), + ) + ), + ] + ) + ) + ), + ) + request.meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_LOG + request.meta.downsampled_storage_config.mode = ( + DownsampledStorageConfig.MODE_HIGHEST_ACCURACY_FLEXTIME + ) + routing_decision = strategy.get_routing_decision( + RoutingContext( + in_msg=request, + timer=Timer("test"), + ) + ) + assert routing_decision.time_window is not None + assert ( + routing_decision.time_window.start_timestamp.seconds + == page_token_start_timestamp.timestamp() + ) + assert ( + routing_decision.time_window.end_timestamp.seconds == page_token_end_timestamp.timestamp() + ) diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_trace_item_table_flex_time.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_trace_item_table_flex_time.py new file mode 100644 index 00000000000..08417b531fb --- /dev/null +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_trace_item_table_flex_time.py @@ -0,0 +1,162 @@ +import random +from datetime import timedelta +from typing import Any + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + Column, + TraceItemTableRequest, + TraceItemTableResponse, +) +from sentry_protos.snuba.v1.request_common_pb2 import ( + PageToken, + RequestMeta, + TraceItemType, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable +from tests.helpers import write_raw_unprocessed_events +from tests.web.rpc.v1.routing_strategies.common import ( + OutcomeCategory, + store_outcomes_data, +) +from tests.web.rpc.v1.test_utils import BASE_TIME, gen_item_message + +_LOG_COUNT = 120 +_ORG_ID = 1 +_PROJECT_ID = 1 + + +@pytest.fixture(autouse=False) +def setup_teardown(eap: None, redis_db: None) -> None: + items_storage = get_storage(StorageKey("eap_items")) + # generate 120 items every hour + messages = [] + for hour in range(25): + messages.extend( + [ + gen_item_message( + start_timestamp=BASE_TIME - timedelta(hours=hour), + item_id=int("123456781234567d", 16).to_bytes(16, byteorder="little"), + type=TraceItemType.TRACE_ITEM_TYPE_LOG, + attributes={ + "color": AnyValue( + string_value=random.choice( + [ + "red", + "green", + "blue", + ] + ) + ), + "eap.measurement": AnyValue( + int_value=random.choice( + [ + 1, + 100, + 1000, + ] + ) + ), + "location": AnyValue( + string_value=random.choice( + [ + "mobile", + "frontend", + "backend", + ] + ) + ), + "custom_measurement": AnyValue(double_value=420.0), + "custom_tag": AnyValue(string_value="blah"), + }, + project_id=_PROJECT_ID, + organization_id=_ORG_ID, + ) + for i in range(_LOG_COUNT) + ] + ) + write_raw_unprocessed_events(items_storage, messages) # type: ignore + + # pretend we have 10 million log items every hour + outcome_data = [] + for hour in range(25): + time = BASE_TIME - timedelta(hours=hour) + outcome_data.append((time, 10_000_000)) + + store_outcomes_data( + outcome_data, OutcomeCategory.LOG_ITEM, org_id=_ORG_ID, project_id=_PROJECT_ID + ) + + +@pytest.mark.eap +@pytest.mark.redis_db +class TestTraceItemTableFlexTime: + def test_paginate_within_time_window(self, eap: Any, setup_teardown: Any) -> None: + from snuba.web.rpc.storage_routing.routing_strategies.outcomes_flex_time import ( + OutcomesFlexTimeRoutingStrategy, + ) + + num_hours_to_query = 4 + # every hour we store 120 items and in outcomes we pretend it's 10 million items stored + strategy = OutcomesFlexTimeRoutingStrategy() + # we tell the routing strategy that the most items we can query is 20_000_000 + # this means that if we query a four hour time range, it will get split in two + strategy.set_config_value("max_items_to_query", 20_000_000) + + start_timestamp = Timestamp( + seconds=int((BASE_TIME - timedelta(hours=num_hours_to_query)).timestamp()) + ) + end_timestamp = Timestamp(seconds=int(BASE_TIME.timestamp())) + + limit_per_query = 120 + + # querying 4 hours of data, split into two windows, + # each window queries has 240 datapoints, 120 points at a time + # means that we will run the query a total of 4 times + + times_queried = 0 + # TODO: back down to 4 + expected_times_queried = 5 + page_token = PageToken(offset=0) + result_size = 1 + while result_size > 0: + times_queried += 1 + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_LOG, + downsampled_storage_config=DownsampledStorageConfig( + mode=DownsampledStorageConfig.MODE_HIGHEST_ACCURACY_FLEXTIME + ), + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color") + ) + ), + columns=[Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location"))], + limit=limit_per_query, + page_token=page_token, + ) + response = EndpointTraceItemTable().execute(message) + assert isinstance(response, TraceItemTableResponse) + if not response.column_values: + break + result_size = len(response.column_values[0].results) + page_token = response.page_token + assert result_size == limit_per_query + + assert times_queried == expected_times_queried diff --git a/tests/web/rpc/v1/test_utils.py b/tests/web/rpc/v1/test_utils.py index 68e7b8cd757..29b65f372ee 100644 --- a/tests/web/rpc/v1/test_utils.py +++ b/tests/web/rpc/v1/test_utils.py @@ -124,6 +124,8 @@ def gen_item_message( end_timestamp: Optional[datetime] = None, remove_default_attributes: bool = False, item_id: Optional[bytes] = None, + project_id: Optional[int] = None, + organization_id: Optional[int] = None, ) -> bytes: item_timestamp = Timestamp() item_timestamp.FromDatetime(start_timestamp) @@ -146,8 +148,8 @@ def gen_item_message( if trace_id is None: trace_id = uuid.uuid4().hex return TraceItem( - organization_id=1, - project_id=1, + organization_id=organization_id or 1, + project_id=project_id or 1, item_type=type, timestamp=item_timestamp, trace_id=trace_id,