Skip to content

Commit 6731d49

Browse files
Rachel ChenRachel Chen
authored andcommitted
squash
mypy uuid max bytes to read tests move to util query id query id combine recommendations remove prchanges? move get_max_bytes_to_read AllocationPolicyViolations allocation policy fixes and tests remove prchagnes
1 parent f90c230 commit 6731d49

File tree

12 files changed

+563
-153
lines changed

12 files changed

+563
-153
lines changed

snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,6 @@ schema:
109109
local_table_name: eap_items_1_local
110110
dist_table_name: eap_items_1_dist
111111
partition_format: [date]
112-
allocation_policies:
113-
- name: ConcurrentRateLimitAllocationPolicy
114-
args:
115-
required_tenant_types:
116-
- organization_id
117-
- referrer
118-
- project_id
119-
default_config_overrides:
120-
is_enforced: 0
121-
- name: ReferrerGuardRailPolicy
122-
args:
123-
required_tenant_types:
124-
- referrer
125-
default_config_overrides:
126-
is_enforced: 0
127-
is_active: 0
128-
- name: BytesScannedRejectingPolicy
129-
args:
130-
required_tenant_types:
131-
- organization_id
132-
- project_id
133-
- referrer
134-
default_config_overrides:
135-
is_active: 0
136-
is_enforced: 0
137112

138113
query_processors:
139114
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,6 @@ schema:
108108
local_table_name: eap_items_1_downsample_512_local
109109
dist_table_name: eap_items_1_downsample_512_dist
110110
partition_format: [date]
111-
allocation_policies:
112-
- name: ConcurrentRateLimitAllocationPolicy
113-
args:
114-
required_tenant_types:
115-
- organization_id
116-
- referrer
117-
- project_id
118-
default_config_overrides:
119-
is_enforced: 0
120-
- name: ReferrerGuardRailPolicy
121-
args:
122-
required_tenant_types:
123-
- referrer
124-
default_config_overrides:
125-
is_enforced: 0
126-
is_active: 0
127-
- name: BytesScannedRejectingPolicy
128-
args:
129-
required_tenant_types:
130-
- organization_id
131-
- project_id
132-
- referrer
133-
default_config_overrides:
134-
is_active: 0
135-
is_enforced: 0
136111

137112
query_processors:
138113
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,6 @@ schema:
109109
local_table_name: eap_items_1_downsample_64_local
110110
dist_table_name: eap_items_1_downsample_64_dist
111111
partition_format: [date]
112-
allocation_policies:
113-
- name: ConcurrentRateLimitAllocationPolicy
114-
args:
115-
required_tenant_types:
116-
- organization_id
117-
- referrer
118-
- project_id
119-
default_config_overrides:
120-
is_enforced: 0
121-
- name: ReferrerGuardRailPolicy
122-
args:
123-
required_tenant_types:
124-
- referrer
125-
default_config_overrides:
126-
is_enforced: 0
127-
is_active: 0
128-
- name: BytesScannedRejectingPolicy
129-
args:
130-
required_tenant_types:
131-
- organization_id
132-
- project_id
133-
- referrer
134-
default_config_overrides:
135-
is_active: 0
136-
is_enforced: 0
137112

138113
query_processors:
139114
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,6 @@ schema:
108108
local_table_name: eap_items_1_downsample_8_local
109109
dist_table_name: eap_items_1_downsample_8_dist
110110
partition_format: [date]
111-
allocation_policies:
112-
- name: ConcurrentRateLimitAllocationPolicy
113-
args:
114-
required_tenant_types:
115-
- organization_id
116-
- referrer
117-
- project_id
118-
default_config_overrides:
119-
is_enforced: 0
120-
- name: ReferrerGuardRailPolicy
121-
args:
122-
required_tenant_types:
123-
- referrer
124-
default_config_overrides:
125-
is_enforced: 0
126-
is_active: 0
127-
- name: BytesScannedRejectingPolicy
128-
args:
129-
required_tenant_types:
130-
- organization_id
131-
- project_id
132-
- referrer
133-
default_config_overrides:
134-
is_active: 0
135-
is_enforced: 0
136111

137112
query_processors:
138113
- processor: UniqInSelectAndHavingProcessor

snuba/query/allocation_policies/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,21 @@
3535
MAX_THRESHOLD = int(1e12)
3636

3737

38+
def get_max_bytes_to_read(quota_allowances: dict[str, QuotaAllowance]) -> int:
39+
"""
40+
alternative implementation:
41+
non_zero_max_bytes = [qa.max_bytes_to_read for qa in quota_allowances.values() if qa.max_bytes_to_read > 0]
42+
return min(non_zero_max_bytes) if non_zero_max_bytes else 0
43+
"""
44+
max_bytes_to_read = min(
45+
[qa.max_bytes_to_read for qa in quota_allowances.values()],
46+
key=lambda mb: float("inf") if mb == 0 else mb,
47+
)
48+
if max_bytes_to_read != 0:
49+
return max_bytes_to_read
50+
return 0
51+
52+
3853
@dataclass(frozen=True)
3954
class QueryResultOrError:
4055
"""When a query executes, even if it errors, we still want the stats associated

snuba/web/db_query.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
AllocationPolicyViolations,
3333
QueryResultOrError,
3434
QuotaAllowance,
35+
get_max_bytes_to_read,
3536
)
3637
from snuba.query.composite import CompositeQuery
3738
from snuba.query.data_source.join import IndividualNode, JoinClause, JoinVisitor
@@ -808,7 +809,12 @@ def _apply_allocation_policies_quota(
808809
Sets the resource quota in the query_settings object to the minimum of all available
809810
quota allowances from the given allocation policies.
810811
"""
811-
quota_allowances: dict[str, Any] = {}
812+
813+
# if it's an EAP query, then allocation policies were already applied in the routing strategy layer, so we bypass it here
814+
if attribution_info.app_id.key == "eap":
815+
return
816+
817+
quota_allowances: dict[str, QuotaAllowance] = {}
812818
can_run = True
813819
rejection_quota_and_policy = None
814820
throttle_quota_and_policy = None
@@ -855,10 +861,7 @@ def _apply_allocation_policies_quota(
855861
summary: dict[str, Any] = {}
856862
summary["threads_used"] = min_threads_across_policies
857863

858-
max_bytes_to_read = min(
859-
[qa.max_bytes_to_read for qa in quota_allowances.values()],
860-
key=lambda mb: float("inf") if mb == 0 else mb,
861-
)
864+
max_bytes_to_read = get_max_bytes_to_read(quota_allowances)
862865
if max_bytes_to_read != 0:
863866
query_settings.push_clickhouse_setting("max_bytes_to_read", max_bytes_to_read)
864867
summary["max_bytes_to_read"] = max_bytes_to_read

snuba/web/rpc/__init__.py

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
RPCRequestException,
3232
convert_rpc_exception_to_proto,
3333
)
34-
from snuba.web.rpc.storage_routing.load_retriever import get_cluster_loadinfo
3534
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
3635
RoutingContext,
3736
RoutingDecision,
37+
get_stats_dict,
3838
)
3939
from snuba.web.rpc.storage_routing.routing_strategy_selector import (
4040
RoutingStrategySelector,
@@ -191,7 +191,9 @@ def execute(self, in_msg: Tin) -> Tout:
191191
span = scope.span
192192
if span is not None:
193193
span.description = self.config_key()
194-
self.routing_context = RoutingContext(timer=self._timer, in_msg=in_msg)
194+
self.routing_context = RoutingContext(
195+
timer=self._timer, in_msg=in_msg, query_id=uuid.uuid4().hex
196+
)
195197

196198
self.__before_execute(in_msg)
197199
error: Exception | None = None
@@ -202,7 +204,29 @@ def execute(self, in_msg: Tin) -> Tout:
202204
span.set_data("selected_tier", self.routing_decision.tier)
203205
out = self._execute(in_msg)
204206
else:
205-
raise AllocationPolicyViolations
207+
self.metrics.increment(
208+
"request_rate_limited",
209+
tags=self._timer.tags,
210+
)
211+
error = QueryException.from_args(
212+
AllocationPolicyViolations.__name__,
213+
"Query cannot be run due to routing strategy deciding it cannot run, most likely due to allocation policies",
214+
extra={
215+
"stats": get_stats_dict(self.routing_decision),
216+
"sql": "no sql run",
217+
"experiments": {},
218+
},
219+
)
220+
error.__cause__ = AllocationPolicyViolations.from_args(
221+
{
222+
"summary": {},
223+
"details": {
224+
key: quota_allowance.to_dict()
225+
for key, quota_allowance in self.routing_decision.routing_context.allocation_policies_recommendations.items()
226+
},
227+
}
228+
)
229+
raise error
206230
except QueryException as e:
207231
out = self.response_class()()
208232
if (
@@ -253,16 +277,10 @@ def __before_execute(self, in_msg: Tin) -> None:
253277
meta = getattr(in_msg, "meta", None)
254278
if meta is not None:
255279
if not hasattr(meta, "request_id") or not meta.request_id:
256-
meta.request_id = str(uuid.uuid4())
280+
meta.request_id = self.routing_context.query_id
257281

258282
self._timer.update_tags(self.__extract_request_tags(in_msg))
259283

260-
# we're calling this function to get the cluster load info to emit metrics and to prevent dead code
261-
# the result is currently not used in storage routing
262-
# can turn off on Snuba Admin
263-
if state.get_config("storage_routing.enable_get_cluster_loadinfo", True):
264-
get_cluster_loadinfo()
265-
266284
selected_strategy = RoutingStrategySelector().select_routing_strategy(self.routing_context)
267285
self.routing_decision = selected_strategy.get_routing_decision(self.routing_context)
268286
self._timer.mark("rpc_start")
@@ -325,6 +343,10 @@ def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -
325343
output_metrics_error = None
326344
try:
327345
self.routing_decision.strategy.output_metrics(self.routing_decision)
346+
if self.routing_decision.routing_context.query_result is not None or isinstance(
347+
error, QueryException
348+
):
349+
self.routing_decision.strategy.update_allocation_policies_balances(self.routing_decision, error) # type: ignore
328350
except Exception as e:
329351
output_metrics_error = e
330352

@@ -336,13 +358,11 @@ def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -
336358
"request_invalid",
337359
tags=self._timer.tags,
338360
)
339-
elif isinstance(error, AllocationPolicyViolations):
340-
sentry_sdk.capture_exception(error)
341-
self.metrics.increment(
342-
"request_rate_limited",
343-
tags=self._timer.tags,
344-
)
345-
else:
361+
# AllocationPolicyViolations is not a request_error
362+
elif not (
363+
isinstance(error, QueryException)
364+
and error.exception_type == AllocationPolicyViolations.__name__
365+
):
346366
sentry_sdk.capture_exception(error)
347367
self.metrics.increment(
348368
"request_error",

snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,26 @@ def _get_min_timerange_to_query_outcomes(self) -> int:
149149
or default
150150
)
151151

152-
def _get_routing_decision(self, routing_context: RoutingContext) -> RoutingDecision:
152+
def _get_routing_decision(
153+
self,
154+
routing_context: RoutingContext,
155+
) -> RoutingDecision:
156+
combined_allocation_policies_recommendations = (
157+
self._get_combined_allocation_policies_recommendations(routing_context)
158+
)
159+
153160
routing_decision = RoutingDecision(
154161
routing_context=routing_context,
155162
strategy=self,
156163
tier=Tier.TIER_1,
157-
clickhouse_settings={},
158-
can_run=True,
164+
clickhouse_settings=combined_allocation_policies_recommendations["settings"],
165+
can_run=combined_allocation_policies_recommendations["can_run"],
166+
is_throttled=combined_allocation_policies_recommendations["is_throttled"],
159167
)
168+
169+
if not routing_decision.can_run:
170+
return routing_decision
171+
160172
in_msg_meta = extract_message_meta(routing_decision.routing_context.in_msg)
161173
sentry_sdk.update_current_span(
162174
attributes={

0 commit comments

Comments
 (0)