Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4d1c7b0
wip
volokluev Sep 12, 2025
b2c3a9a
running test that does ... something?
volokluev Sep 12, 2025
75169d2
update routing strategy to use flextime
volokluev Sep 12, 2025
6a9fd6c
scaffolding test and trace item table integration
volokluev Sep 12, 2025
5ccf286
pull out outcomes things
volokluev Sep 15, 2025
9960db2
basic linear model
volokluev Sep 16, 2025
9ddf45d
prototype of an end to end test
volokluev Sep 16, 2025
db183df
working test with pagination
volokluev Sep 17, 2025
ed57fba
properly slice things
volokluev Sep 18, 2025
e04dd4f
Fix some unit tests and break the one I wrote :D
volokluev Sep 18, 2025
0e227b4
add some comments, fix pagination
volokluev Sep 18, 2025
4cb222c
kruft
volokluev Sep 18, 2025
d7e8dc3
remove copypasta
volokluev Sep 18, 2025
ae3f048
Merge branch 'master' into volo/time_window_routing
volokluev Sep 19, 2025
38ccf1b
cleanup
volokluev Sep 19, 2025
25ccc5d
mypy fixes
volokluev Sep 19, 2025
5bc4663
test fix
volokluev Sep 19, 2025
1ddb242
clear db for outcomes MVs
volokluev Sep 19, 2025
3ebfb4d
remove erroneous test
volokluev Sep 19, 2025
be3caa8
remove redundant condition
volokluev Sep 22, 2025
e9bb32c
redundant condition
volokluev Sep 22, 2025
ed6bbf9
put pagination terms into constants
volokluev Sep 22, 2025
31b14ba
Merge branch 'master' into volo/time_window_routing
volokluev Sep 22, 2025
007d414
[getsentry/action-github-commit] Auto commit
getsantry[bot] Sep 22, 2025
61907e9
put ALL into constants
volokluev Sep 22, 2025
c80c4e3
factor out page token encoding/decoding into its own module
volokluev Sep 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"mypy-type-checker.args": [
"--strict"
],
"makefile.configureOnOpen": false
"makefile.configureOnOpen": false,
"cursorpyright.analysis.autoImportCompletions": true

}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"redis>=4.5.4",
"sentry-arroyo>=2.29.1",
"sentry-kafka-schemas>=2.0.4",
"sentry-protos>=0.3.4",
"sentry-protos>=0.3.6",
"sentry-redis-tools>=0.5.0",
"sentry-relay>=0.9.5",
"sentry-sdk>=2.35.0",
Expand Down
61 changes: 17 additions & 44 deletions snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def truncate_request_meta_to_day(meta: RequestMeta) -> None:
start_timestamp = start_timestamp.replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=1)
end_timestamp = end_timestamp.replace(
hour=0, minute=0, second=0, microsecond=0
) + timedelta(days=1)
end_timestamp = end_timestamp.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(
days=1
)

meta.start_timestamp.seconds = int(start_timestamp.timestamp())
meta.end_timestamp.seconds = int(end_timestamp.timestamp())
Expand Down Expand Up @@ -142,51 +142,32 @@ def trace_item_filters_to_expression(
if len(filters) == 0:
return literal(True)
elif len(filters) == 1:
return trace_item_filters_to_expression(
filters[0], attribute_key_to_expression
)
return trace_item_filters_to_expression(filters[0], attribute_key_to_expression)
return and_cond(
*(
trace_item_filters_to_expression(x, attribute_key_to_expression)
for x in filters
)
*(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters)
)

if item_filter.HasField("or_filter"):
filters = item_filter.or_filter.filters
if len(filters) == 0:
raise BadSnubaRPCRequestException(
"Invalid trace item filter, empty 'or' clause"
)
raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'or' clause")
elif len(filters) == 1:
return trace_item_filters_to_expression(
filters[0], attribute_key_to_expression
)
return trace_item_filters_to_expression(filters[0], attribute_key_to_expression)
return or_cond(
*(
trace_item_filters_to_expression(x, attribute_key_to_expression)
for x in filters
)
*(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters)
)

if item_filter.HasField("not_filter"):
filters = item_filter.not_filter.filters
if len(filters) == 0:
raise BadSnubaRPCRequestException(
"Invalid trace item filter, empty 'not' clause"
)
raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'not' clause")
elif len(filters) == 1:
return not_cond(
trace_item_filters_to_expression(
filters[0], attribute_key_to_expression
)
trace_item_filters_to_expression(filters[0], attribute_key_to_expression)
)
return not_cond(
and_cond(
*(
trace_item_filters_to_expression(x, attribute_key_to_expression)
for x in filters
)
*(trace_item_filters_to_expression(x, attribute_key_to_expression) for x in filters)
)
)

Expand All @@ -198,9 +179,7 @@ def trace_item_filters_to_expression(

value_type = v.WhichOneof("value")
if value_type is None:
raise BadSnubaRPCRequestException(
"comparison does not have a right hand side"
)
raise BadSnubaRPCRequestException("comparison does not have a right hand side")

if v.is_null:
v_expression: Expression = literal(None)
Expand Down Expand Up @@ -246,9 +225,7 @@ def trace_item_filters_to_expression(
)
# we redefine the way equals works for nulls
# now null=null is true
expr_with_null = or_cond(
expr, and_cond(f.isNull(k_expression), f.isNull(v_expression))
)
expr_with_null = or_cond(expr, and_cond(f.isNull(k_expression), f.isNull(v_expression)))
return expr_with_null
if op == ComparisonFilter.OP_NOT_EQUALS:
_check_non_string_values_cannot_ignore_case(item_filter.comparison_filter)
Expand All @@ -259,9 +236,7 @@ def trace_item_filters_to_expression(
)
# we redefine the way not equals works for nulls
# now null!=null is true
expr_with_null = or_cond(
expr, f.xor(f.isNull(k_expression), f.isNull(v_expression))
)
expr_with_null = or_cond(expr, f.xor(f.isNull(k_expression), f.isNull(v_expression)))
return expr_with_null
if op == ComparisonFilter.OP_LIKE:
if k.type != AttributeKey.Type.TYPE_STRING:
Expand Down Expand Up @@ -354,11 +329,11 @@ def timestamp_in_range_condition(start_ts: int, end_ts: int) -> Expression:
return and_cond(
f.less(
column("timestamp"),
f.toDateTime(end_ts),
f.toDateTime(datetime.fromtimestamp(end_ts).isoformat()),
),
f.greaterOrEquals(
column("timestamp"),
f.toDateTime(start_ts),
f.toDateTime(datetime.fromtimestamp(start_ts).isoformat()),
),
)

Expand All @@ -372,9 +347,7 @@ def base_conditions_and(meta: RequestMeta, *other_exprs: Expression) -> Expressi
"""
return and_cond(
project_id_and_org_conditions(meta),
timestamp_in_range_condition(
meta.start_timestamp.seconds, meta.end_timestamp.seconds
),
timestamp_in_range_condition(meta.start_timestamp.seconds, meta.end_timestamp.seconds),
*other_exprs,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import math
import uuid
from typing import cast

import sentry_sdk
from google.protobuf.json_format import MessageToDict
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.timestamp_pb2 import Timestamp as TimestampProto
from sentry_protos.snuba.v1.request_common_pb2 import (
PageToken,
RequestMeta,
TraceItemType,
)

from snuba.attribution.appid import AppID
from snuba.attribution.attribution_info import AttributionInfo
from snuba.clickhouse.query import Expression
from snuba.configs.configuration import Configuration
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.pluggable_dataset import PluggableDataset
from snuba.downsampled_storage_tiers import Tier
from snuba.query import SelectedExpression
from snuba.query.data_source.simple import Entity
from snuba.query.dsl import Functions as f
from snuba.query.dsl import and_cond, column, in_cond, literal, literals_array
from snuba.query.logical import Query
from snuba.query.query_settings import HTTPQuerySettings
from snuba.request import Request as SnubaRequest
from snuba.web.query import run_query
from snuba.web.rpc.common.common import (
timestamp_in_range_condition,
treeify_or_and_conditions,
)
from snuba.web.rpc.storage_routing.common import extract_message_meta
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
BaseRoutingStrategy,
RoutingContext,
RoutingDecision,
TimeWindow,
)


# TODO import these from sentry-relay
class OutcomeCategory:
SPAN_INDEXED = 16
LOG_ITEM = 23
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy pasta, remove



class Outcome:
ACCEPTED = 0


_ITEM_TYPE_TO_OUTCOME = {
TraceItemType.TRACE_ITEM_TYPE_SPAN: OutcomeCategory.SPAN_INDEXED,
TraceItemType.TRACE_ITEM_TYPE_LOG: OutcomeCategory.LOG_ITEM,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy pasted, remove this



def project_id_and_org_conditions(meta: RequestMeta) -> Expression:
return and_cond(
in_cond(
column("project_id"),
literals_array(
alias=None,
literals=[literal(pid) for pid in meta.project_ids],
),
),
f.equals(column("org_id"), meta.organization_id),
)


def _get_request_time_window(routing_context: RoutingContext) -> TimeWindow:
"""Gets the time window of the request, if there is a page token with a start and end timestamp,
it gets it from there, otherwise, it gets it from the message meta
"""
meta = extract_message_meta(routing_context.in_msg)
if routing_context.in_msg.HasField("page_token"):
page_token: PageToken = getattr(routing_context.in_msg, "page_token")
if page_token.HasField("filter_offset"):
time_window = TimeWindow(
start_timestamp=meta.start_timestamp, end_timestamp=meta.end_timestamp
)
if page_token.filter_offset.HasField("and_filter"):
for filter in page_token.filter_offset.and_filter.filters:
if (
filter.HasField("comparison_filter")
and filter.comparison_filter.key.name == "start_timestamp"
):
time_window.start_timestamp = Timestamp(
seconds=filter.comparison_filter.value.val_int
)
if (
filter.HasField("comparison_filter")
and filter.comparison_filter.key.name == "end_timestamp"
):
time_window.end_timestamp = Timestamp(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this page token logic wont work for nested conditions. maybe its fine to leave out for the first pass but we should be aware of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is correct, it is very specific to the logic in this window. I am encoding assumptions about how pagination works here

seconds=filter.comparison_filter.value.val_int
)
return time_window
return TimeWindow(start_timestamp=meta.start_timestamp, end_timestamp=meta.end_timestamp)


class OutcomesFlexTimeRoutingStrategy(BaseRoutingStrategy):
def _additional_config_definitions(self) -> list[Configuration]:
return [
Configuration(
name="max_items_to_query",
description="Maximum number of items to query before adjusting time window",
value_type=int,
default=100_000_000,
),
]

def get_ingested_items_for_timerange(
self, routing_context: RoutingContext, time_window: TimeWindow
) -> int:
entity = Entity(
key=EntityKey("outcomes"),
schema=get_entity(EntityKey("outcomes")).get_data_model(),
sample=None,
)
in_msg_meta = extract_message_meta(routing_context.in_msg)
query = Query(
from_clause=entity,
selected_columns=[
SelectedExpression(
name="num_items",
expression=f.sum(column("quantity"), alias="num_items"),
)
],
condition=and_cond(
project_id_and_org_conditions(in_msg_meta),
timestamp_in_range_condition(
time_window.start_timestamp.seconds,
time_window.end_timestamp.seconds,
),
f.equals(column("outcome"), Outcome.ACCEPTED),
f.equals(
column("category"),
_ITEM_TYPE_TO_OUTCOME.get(
in_msg_meta.trace_item_type,
OutcomeCategory.SPAN_INDEXED,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong, the default should not be span indexed

),
),
),
)
snuba_request = SnubaRequest(
id=uuid.uuid4(),
original_body=MessageToDict(routing_context.in_msg),
query=query,
query_settings=HTTPQuerySettings(),
attribution_info=AttributionInfo(
referrer=in_msg_meta.referrer,
team="eap",
feature="eap",
tenant_ids={
"organization_id": in_msg_meta.organization_id,
"referrer": "eap.route_outcomes",
},
app_id=AppID("eap"),
parent_api="eap.route_outcomes",
),
)
treeify_or_and_conditions(query)
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=snuba_request,
timer=routing_context.timer,
)
routing_context.extra_info["estimation_sql"] = res.extra.get("sql", "")
return cast(int, res.result.get("data", [{}])[0].get("num_items", 0))

def _adjust_time_window(self, routing_context: RoutingContext) -> TimeWindow | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this function return None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no adjustment to be made

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no adjustments, then original_time_window is returned right? And it seems like _get_request_time_window never returns None?

I don't think this affects how the code runs so this is more just a nit, feel free to ignore

"""Adjust the time window to ensure we don't exceed MAX_ITEMS_TO_QUERY."""
original_time_window = _get_request_time_window(routing_context)
original_end_ts = original_time_window.end_timestamp.seconds
original_start_ts = original_time_window.start_timestamp.seconds

max_items = self.get_config_value("max_items_to_query")

ingested_items = self.get_ingested_items_for_timerange(
routing_context, original_time_window
)
factor = ingested_items / max_items
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how we actually shrink the time window

if factor > 1:
window_length = original_end_ts - original_start_ts

start_timestamp_proto = TimestampProto(
seconds=original_end_ts - math.floor((window_length / factor))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this so we prioritize more recent data? and the user will paginate forwards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct

)
end_timestamp_proto = TimestampProto(seconds=original_end_ts)
return TimeWindow(start_timestamp_proto, end_timestamp_proto)

return original_time_window

def _get_routing_decision(self, routing_context: RoutingContext) -> RoutingDecision:
routing_decision = RoutingDecision(
routing_context=routing_context,
strategy=self,
tier=Tier.TIER_1, # Always TIER_1
clickhouse_settings={},
can_run=True,
)

in_msg_meta = extract_message_meta(routing_decision.routing_context.in_msg)

# Check if we need to handle time window adjustment for unknown item types
if (
in_msg_meta.trace_item_type != TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, sorry

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

and in_msg_meta.trace_item_type not in _ITEM_TYPE_TO_OUTCOME
):
routing_decision.routing_context.extra_info["unknown_item_type"] = True
sentry_sdk.capture_message(
f"Trace Item {in_msg_meta.trace_item_type} does not have an associated outcome"
)
return routing_decision

# Adjust time window based on outcomes
adjusted_time_window = self._adjust_time_window(routing_context)
if adjusted_time_window:
routing_decision.time_window = adjusted_time_window
routing_decision.routing_context.extra_info["time_window_adjusted"] = True

return routing_decision
Loading