Skip to content

Commit 48bf012

Browse files
Rachel ChenRachel Chen
authored andcommitted
squash
1 parent f90c230 commit 48bf012

File tree

13 files changed

+634
-161
lines changed

13 files changed

+634
-161
lines changed

snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,6 @@ schema:
109109
local_table_name: eap_items_1_local
110110
dist_table_name: eap_items_1_dist
111111
partition_format: [date]
112-
allocation_policies:
113-
- name: ConcurrentRateLimitAllocationPolicy
114-
args:
115-
required_tenant_types:
116-
- organization_id
117-
- referrer
118-
- project_id
119-
default_config_overrides:
120-
is_enforced: 0
121-
- name: ReferrerGuardRailPolicy
122-
args:
123-
required_tenant_types:
124-
- referrer
125-
default_config_overrides:
126-
is_enforced: 0
127-
is_active: 0
128-
- name: BytesScannedRejectingPolicy
129-
args:
130-
required_tenant_types:
131-
- organization_id
132-
- project_id
133-
- referrer
134-
default_config_overrides:
135-
is_active: 0
136-
is_enforced: 0
137112

138113
query_processors:
139114
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,6 @@ schema:
108108
local_table_name: eap_items_1_downsample_512_local
109109
dist_table_name: eap_items_1_downsample_512_dist
110110
partition_format: [date]
111-
allocation_policies:
112-
- name: ConcurrentRateLimitAllocationPolicy
113-
args:
114-
required_tenant_types:
115-
- organization_id
116-
- referrer
117-
- project_id
118-
default_config_overrides:
119-
is_enforced: 0
120-
- name: ReferrerGuardRailPolicy
121-
args:
122-
required_tenant_types:
123-
- referrer
124-
default_config_overrides:
125-
is_enforced: 0
126-
is_active: 0
127-
- name: BytesScannedRejectingPolicy
128-
args:
129-
required_tenant_types:
130-
- organization_id
131-
- project_id
132-
- referrer
133-
default_config_overrides:
134-
is_active: 0
135-
is_enforced: 0
136111

137112
query_processors:
138113
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,6 @@ schema:
109109
local_table_name: eap_items_1_downsample_64_local
110110
dist_table_name: eap_items_1_downsample_64_dist
111111
partition_format: [date]
112-
allocation_policies:
113-
- name: ConcurrentRateLimitAllocationPolicy
114-
args:
115-
required_tenant_types:
116-
- organization_id
117-
- referrer
118-
- project_id
119-
default_config_overrides:
120-
is_enforced: 0
121-
- name: ReferrerGuardRailPolicy
122-
args:
123-
required_tenant_types:
124-
- referrer
125-
default_config_overrides:
126-
is_enforced: 0
127-
is_active: 0
128-
- name: BytesScannedRejectingPolicy
129-
args:
130-
required_tenant_types:
131-
- organization_id
132-
- project_id
133-
- referrer
134-
default_config_overrides:
135-
is_active: 0
136-
is_enforced: 0
137112

138113
query_processors:
139114
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,6 @@ schema:
108108
local_table_name: eap_items_1_downsample_8_local
109109
dist_table_name: eap_items_1_downsample_8_dist
110110
partition_format: [date]
111-
allocation_policies:
112-
- name: ConcurrentRateLimitAllocationPolicy
113-
args:
114-
required_tenant_types:
115-
- organization_id
116-
- referrer
117-
- project_id
118-
default_config_overrides:
119-
is_enforced: 0
120-
- name: ReferrerGuardRailPolicy
121-
args:
122-
required_tenant_types:
123-
- referrer
124-
default_config_overrides:
125-
is_enforced: 0
126-
is_active: 0
127-
- name: BytesScannedRejectingPolicy
128-
args:
129-
required_tenant_types:
130-
- organization_id
131-
- project_id
132-
- referrer
133-
default_config_overrides:
134-
is_active: 0
135-
is_enforced: 0
136111

137112
query_processors:
138113
- processor: UniqInSelectAndHavingProcessor
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from snuba.query.allocation_policies import QuotaAllowance
2+
3+
4+
def get_max_bytes_to_read(quota_allowances: dict[str, QuotaAllowance]) -> int:
5+
max_bytes_to_read = min(
6+
[qa.max_bytes_to_read for qa in quota_allowances.values()],
7+
key=lambda mb: float("inf") if mb == 0 else mb,
8+
)
9+
if max_bytes_to_read != 0:
10+
return max_bytes_to_read
11+
return 0

snuba/web/db_query.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
QueryResultOrError,
3434
QuotaAllowance,
3535
)
36+
from snuba.query.allocation_policies.utils import get_max_bytes_to_read
3637
from snuba.query.composite import CompositeQuery
3738
from snuba.query.data_source.join import IndividualNode, JoinClause, JoinVisitor
3839
from snuba.query.data_source.simple import Table
@@ -808,7 +809,11 @@ def _apply_allocation_policies_quota(
808809
Sets the resource quota in the query_settings object to the minimum of all available
809810
quota allowances from the given allocation policies.
810811
"""
811-
quota_allowances: dict[str, Any] = {}
812+
813+
if len(allocation_policies) == 0:
814+
return
815+
816+
quota_allowances: dict[str, QuotaAllowance] = {}
812817
can_run = True
813818
rejection_quota_and_policy = None
814819
throttle_quota_and_policy = None
@@ -855,10 +860,7 @@ def _apply_allocation_policies_quota(
855860
summary: dict[str, Any] = {}
856861
summary["threads_used"] = min_threads_across_policies
857862

858-
max_bytes_to_read = min(
859-
[qa.max_bytes_to_read for qa in quota_allowances.values()],
860-
key=lambda mb: float("inf") if mb == 0 else mb,
861-
)
863+
max_bytes_to_read = get_max_bytes_to_read(quota_allowances)
862864
if max_bytes_to_read != 0:
863865
query_settings.push_clickhouse_setting("max_bytes_to_read", max_bytes_to_read)
864866
summary["max_bytes_to_read"] = max_bytes_to_read

snuba/web/rpc/__init__.py

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@
3131
RPCRequestException,
3232
convert_rpc_exception_to_proto,
3333
)
34-
from snuba.web.rpc.storage_routing.load_retriever import get_cluster_loadinfo
34+
from snuba.web.rpc.storage_routing.common import extract_message_meta
3535
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
3636
RoutingContext,
3737
RoutingDecision,
38+
get_stats_dict,
3839
)
3940
from snuba.web.rpc.storage_routing.routing_strategy_selector import (
4041
RoutingStrategySelector,
@@ -191,7 +192,21 @@ def execute(self, in_msg: Tin) -> Tout:
191192
span = scope.span
192193
if span is not None:
193194
span.description = self.config_key()
194-
self.routing_context = RoutingContext(timer=self._timer, in_msg=in_msg)
195+
request_meta = extract_message_meta(in_msg)
196+
self.routing_context = RoutingContext(
197+
timer=self._timer,
198+
in_msg=in_msg,
199+
query_id=uuid.uuid4().hex,
200+
tenant_ids={
201+
"organization_id": request_meta.organization_id,
202+
"referrer": request_meta.referrer,
203+
**(
204+
{"project_id": request_meta.project_ids[0]}
205+
if hasattr(request_meta, "project_ids") and len(request_meta.project_ids) == 1
206+
else {}
207+
),
208+
},
209+
)
195210

196211
self.__before_execute(in_msg)
197212
error: Exception | None = None
@@ -202,7 +217,29 @@ def execute(self, in_msg: Tin) -> Tout:
202217
span.set_data("selected_tier", self.routing_decision.tier)
203218
out = self._execute(in_msg)
204219
else:
205-
raise AllocationPolicyViolations
220+
self.metrics.increment(
221+
"request_rate_limited",
222+
tags=self._timer.tags,
223+
)
224+
error = QueryException.from_args(
225+
AllocationPolicyViolations.__name__,
226+
"Query cannot be run due to routing strategy deciding it cannot run, most likely due to allocation policies",
227+
extra={
228+
"stats": get_stats_dict(self.routing_decision),
229+
"sql": "no sql run",
230+
"experiments": {},
231+
},
232+
)
233+
error.__cause__ = AllocationPolicyViolations.from_args(
234+
{
235+
"summary": {},
236+
"details": {
237+
key: quota_allowance.to_dict()
238+
for key, quota_allowance in self.routing_decision.routing_context.allocation_policies_recommendations.items()
239+
},
240+
}
241+
)
242+
raise error
206243
except QueryException as e:
207244
out = self.response_class()()
208245
if (
@@ -253,16 +290,10 @@ def __before_execute(self, in_msg: Tin) -> None:
253290
meta = getattr(in_msg, "meta", None)
254291
if meta is not None:
255292
if not hasattr(meta, "request_id") or not meta.request_id:
256-
meta.request_id = str(uuid.uuid4())
293+
meta.request_id = self.routing_context.query_id
257294

258295
self._timer.update_tags(self.__extract_request_tags(in_msg))
259296

260-
# we're calling this function to get the cluster load info to emit metrics and to prevent dead code
261-
# the result is currently not used in storage routing
262-
# can turn off on Snuba Admin
263-
if state.get_config("storage_routing.enable_get_cluster_loadinfo", True):
264-
get_cluster_loadinfo()
265-
266297
selected_strategy = RoutingStrategySelector().select_routing_strategy(self.routing_context)
267298
self.routing_decision = selected_strategy.get_routing_decision(self.routing_context)
268299
self._timer.mark("rpc_start")
@@ -322,11 +353,11 @@ def _execute(self, in_msg: Tin) -> Tout:
322353

323354
def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -> Tout:
324355
res = self._after_execute(in_msg, out_msg, error)
325-
output_metrics_error = None
356+
routing_strategy_after_execute_error = None
326357
try:
327-
self.routing_decision.strategy.output_metrics(self.routing_decision)
358+
self.routing_decision.strategy.after_execute(self.routing_decision, error)
328359
except Exception as e:
329-
output_metrics_error = e
360+
routing_strategy_after_execute_error = e
330361

331362
self._timer.mark("rpc_end")
332363
self._timer.send_metrics_to(self.metrics)
@@ -336,13 +367,11 @@ def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -
336367
"request_invalid",
337368
tags=self._timer.tags,
338369
)
339-
elif isinstance(error, AllocationPolicyViolations):
340-
sentry_sdk.capture_exception(error)
341-
self.metrics.increment(
342-
"request_rate_limited",
343-
tags=self._timer.tags,
344-
)
345-
else:
370+
# AllocationPolicyViolations is not a request_error
371+
elif not (
372+
isinstance(error, QueryException)
373+
and error.exception_type == AllocationPolicyViolations.__name__
374+
):
346375
sentry_sdk.capture_exception(error)
347376
self.metrics.increment(
348377
"request_error",
@@ -355,13 +384,13 @@ def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -
355384
tags=self._timer.tags,
356385
)
357386

358-
if output_metrics_error is not None:
359-
self.metrics.increment("metrics_failure")
387+
if routing_strategy_after_execute_error is not None:
388+
self.metrics.increment("after_execute_failure")
360389
sentry_sdk.capture_message(
361-
f"Error in routing strategy output metrics: {output_metrics_error}"
390+
f"Error in routing strategy after execute: {routing_strategy_after_execute_error}"
362391
)
363392
if settings.RAISE_ON_ROUTING_STRATEGY_FAILURES:
364-
raise output_metrics_error
393+
raise routing_strategy_after_execute_error
365394

366395
return res
367396

snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,26 @@ def _get_min_timerange_to_query_outcomes(self) -> int:
149149
or default
150150
)
151151

152-
def _get_routing_decision(self, routing_context: RoutingContext) -> RoutingDecision:
152+
def _get_routing_decision(
153+
self,
154+
routing_context: RoutingContext,
155+
) -> RoutingDecision:
156+
combined_allocation_policies_recommendations = (
157+
self._get_combined_allocation_policies_recommendations(routing_context)
158+
)
159+
153160
routing_decision = RoutingDecision(
154161
routing_context=routing_context,
155162
strategy=self,
156163
tier=Tier.TIER_1,
157-
clickhouse_settings={},
158-
can_run=True,
164+
clickhouse_settings=combined_allocation_policies_recommendations["settings"],
165+
can_run=combined_allocation_policies_recommendations["can_run"],
166+
is_throttled=combined_allocation_policies_recommendations["is_throttled"],
159167
)
168+
169+
if not routing_decision.can_run:
170+
return routing_decision
171+
160172
in_msg_meta = extract_message_meta(routing_decision.routing_context.in_msg)
161173
sentry_sdk.update_current_span(
162174
attributes={

0 commit comments

Comments
 (0)