Skip to content

Commit 6cb3de5

Browse files
Rachel ChenRachel Chen
authored andcommitted
squash
1 parent e52cdab commit 6cb3de5

File tree

8 files changed

+186
-129
lines changed

8 files changed

+186
-129
lines changed

snuba/datasets/configuration/events_analytics_platform/storages/eap_items.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,6 @@ schema:
109109
local_table_name: eap_items_1_local
110110
dist_table_name: eap_items_1_dist
111111
partition_format: [date]
112-
allocation_policies:
113-
- name: ConcurrentRateLimitAllocationPolicy
114-
args:
115-
required_tenant_types:
116-
- organization_id
117-
- referrer
118-
- project_id
119-
default_config_overrides:
120-
is_enforced: 0
121-
- name: ReferrerGuardRailPolicy
122-
args:
123-
required_tenant_types:
124-
- referrer
125-
default_config_overrides:
126-
is_enforced: 0
127-
is_active: 0
128-
- name: BytesScannedRejectingPolicy
129-
args:
130-
required_tenant_types:
131-
- organization_id
132-
- project_id
133-
- referrer
134-
default_config_overrides:
135-
is_active: 0
136-
is_enforced: 0
137112

138113
query_processors:
139114
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_512.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,6 @@ schema:
108108
local_table_name: eap_items_1_downsample_512_local
109109
dist_table_name: eap_items_1_downsample_512_dist
110110
partition_format: [date]
111-
allocation_policies:
112-
- name: ConcurrentRateLimitAllocationPolicy
113-
args:
114-
required_tenant_types:
115-
- organization_id
116-
- referrer
117-
- project_id
118-
default_config_overrides:
119-
is_enforced: 0
120-
- name: ReferrerGuardRailPolicy
121-
args:
122-
required_tenant_types:
123-
- referrer
124-
default_config_overrides:
125-
is_enforced: 0
126-
is_active: 0
127-
- name: BytesScannedRejectingPolicy
128-
args:
129-
required_tenant_types:
130-
- organization_id
131-
- project_id
132-
- referrer
133-
default_config_overrides:
134-
is_active: 0
135-
is_enforced: 0
136111

137112
query_processors:
138113
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_64.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,6 @@ schema:
109109
local_table_name: eap_items_1_downsample_64_local
110110
dist_table_name: eap_items_1_downsample_64_dist
111111
partition_format: [date]
112-
allocation_policies:
113-
- name: ConcurrentRateLimitAllocationPolicy
114-
args:
115-
required_tenant_types:
116-
- organization_id
117-
- referrer
118-
- project_id
119-
default_config_overrides:
120-
is_enforced: 0
121-
- name: ReferrerGuardRailPolicy
122-
args:
123-
required_tenant_types:
124-
- referrer
125-
default_config_overrides:
126-
is_enforced: 0
127-
is_active: 0
128-
- name: BytesScannedRejectingPolicy
129-
args:
130-
required_tenant_types:
131-
- organization_id
132-
- project_id
133-
- referrer
134-
default_config_overrides:
135-
is_active: 0
136-
is_enforced: 0
137112

138113
query_processors:
139114
- processor: UniqInSelectAndHavingProcessor

snuba/datasets/configuration/events_analytics_platform/storages/eap_items_downsample_8.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,6 @@ schema:
108108
local_table_name: eap_items_1_downsample_8_local
109109
dist_table_name: eap_items_1_downsample_8_dist
110110
partition_format: [date]
111-
allocation_policies:
112-
- name: ConcurrentRateLimitAllocationPolicy
113-
args:
114-
required_tenant_types:
115-
- organization_id
116-
- referrer
117-
- project_id
118-
default_config_overrides:
119-
is_enforced: 0
120-
- name: ReferrerGuardRailPolicy
121-
args:
122-
required_tenant_types:
123-
- referrer
124-
default_config_overrides:
125-
is_enforced: 0
126-
is_active: 0
127-
- name: BytesScannedRejectingPolicy
128-
args:
129-
required_tenant_types:
130-
- organization_id
131-
- project_id
132-
- referrer
133-
default_config_overrides:
134-
is_active: 0
135-
is_enforced: 0
136111

137112
query_processors:
138113
- processor: UniqInSelectAndHavingProcessor

snuba/web/db_query.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,11 @@ def _apply_allocation_policies_quota(
808808
Sets the resource quota in the query_settings object to the minimum of all available
809809
quota allowances from the given allocation policies.
810810
"""
811+
812+
# if it's an EAP query, then allocation policies were already applied in the routing strategy layer, so we bypass it here
813+
if attribution_info.app_id.key == "eap":
814+
return
815+
811816
quota_allowances: dict[str, Any] = {}
812817
can_run = True
813818
rejection_quota_and_policy = None

snuba/web/rpc/__init__.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
3636
RoutingContext,
3737
RoutingDecision,
38+
get_stats_dict,
3839
)
3940
from snuba.web.rpc.storage_routing.routing_strategy_selector import (
4041
RoutingStrategySelector,
@@ -191,7 +192,7 @@ def execute(self, in_msg: Tin) -> Tout:
191192
span = scope.span
192193
if span is not None:
193194
span.description = self.config_key()
194-
self.routing_context = RoutingContext(timer=self._timer, in_msg=in_msg)
195+
self.routing_context = RoutingContext(timer=self._timer, in_msg=in_msg, query_id=uuid.uuid4().hex)
195196

196197
self.__before_execute(in_msg)
197198
error: Exception | None = None
@@ -202,7 +203,20 @@ def execute(self, in_msg: Tin) -> Tout:
202203
span.set_data("selected_tier", self.routing_decision.tier)
203204
out = self._execute(in_msg)
204205
else:
205-
raise AllocationPolicyViolations
206+
self.metrics.increment(
207+
"request_rate_limited",
208+
tags=self._timer.tags,
209+
)
210+
# thoughts on making AlocationPolicyViolations a subclass of QueryException?
211+
raise QueryException.from_args(
212+
AllocationPolicyViolations.__name__,
213+
"Query cannot be run due to routing strategy deciding it cannot run, most likely due to allocation policies",
214+
extra={
215+
"stats": get_stats_dict(self.routing_decision),
216+
"sql": "no sql run",
217+
"experiments": {},
218+
},
219+
)
206220
except QueryException as e:
207221
out = self.response_class()()
208222
if (
@@ -257,12 +271,6 @@ def __before_execute(self, in_msg: Tin) -> None:
257271

258272
self._timer.update_tags(self.__extract_request_tags(in_msg))
259273

260-
# we're calling this function to get the cluster load info to emit metrics and to prevent dead code
261-
# the result is currently not used in storage routing
262-
# can turn off on Snuba Admin
263-
if state.get_config("storage_routing.enable_get_cluster_loadinfo", True):
264-
get_cluster_loadinfo()
265-
266274
selected_strategy = RoutingStrategySelector().select_routing_strategy(self.routing_context)
267275
self.routing_decision = selected_strategy.get_routing_decision(self.routing_context)
268276
self._timer.mark("rpc_start")
@@ -325,6 +333,10 @@ def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -
325333
output_metrics_error = None
326334
try:
327335
self.routing_decision.strategy.output_metrics(self.routing_decision)
336+
if self.routing_decision.routing_context.query_result is not None or isinstance(
337+
error, QueryException
338+
):
339+
self.routing_decision.strategy.update_allocation_policies_balances(self.routing_decision, error) # type: ignore
328340
except Exception as e:
329341
output_metrics_error = e
330342

@@ -336,12 +348,6 @@ def __after_execute(self, in_msg: Tin, out_msg: Tout, error: Exception | None) -
336348
"request_invalid",
337349
tags=self._timer.tags,
338350
)
339-
elif isinstance(error, AllocationPolicyViolations):
340-
sentry_sdk.capture_exception(error)
341-
self.metrics.increment(
342-
"request_rate_limited",
343-
tags=self._timer.tags,
344-
)
345351
else:
346352
sentry_sdk.capture_exception(error)
347353
self.metrics.increment(

snuba/web/rpc/storage_routing/routing_strategies/outcomes_based.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from snuba.web.rpc.storage_routing.common import extract_message_meta
3131
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
3232
BaseRoutingStrategy,
33+
CombinedAllocationPoliciesRecommendations,
3334
RoutingContext,
3435
RoutingDecision,
3536
)
@@ -165,14 +166,55 @@ def _get_min_timerange_to_query_outcomes(self) -> int:
165166
or default
166167
)
167168

168-
def _get_routing_decision(self, routing_context: RoutingContext) -> RoutingDecision:
169+
def _get_combined_allocation_policies_recommendations(
170+
self, routing_context: RoutingContext
171+
) -> CombinedAllocationPoliciesRecommendations:
172+
settings = {}
173+
max_bytes_to_read = min(
174+
[
175+
qa.max_bytes_to_read
176+
for qa in routing_context.allocation_policies_recommendations.values()
177+
],
178+
key=lambda mb: float("inf") if mb == 0 else mb,
179+
)
180+
if max_bytes_to_read != 0:
181+
settings["max_bytes_to_read"] = max_bytes_to_read
182+
183+
settings["max_threads"] = min(
184+
[qa.max_threads for qa in routing_context.allocation_policies_recommendations.values()],
185+
)
186+
187+
return CombinedAllocationPoliciesRecommendations(
188+
can_run=all(
189+
qa.can_run for qa in routing_context.allocation_policies_recommendations.values()
190+
),
191+
is_throttled=any(
192+
qa.is_throttled
193+
for qa in routing_context.allocation_policies_recommendations.values()
194+
),
195+
settings=settings,
196+
)
197+
198+
def _get_routing_decision(
199+
self,
200+
routing_context: RoutingContext,
201+
) -> RoutingDecision:
202+
combined_allocation_policies_recommendations = (
203+
self._get_combined_allocation_policies_recommendations(routing_context)
204+
)
205+
169206
routing_decision = RoutingDecision(
170207
routing_context=routing_context,
171208
strategy=self,
172209
tier=Tier.TIER_1,
173-
clickhouse_settings={},
174-
can_run=True,
210+
clickhouse_settings=combined_allocation_policies_recommendations["settings"],
211+
can_run=combined_allocation_policies_recommendations["can_run"],
212+
is_throttled=combined_allocation_policies_recommendations["is_throttled"],
175213
)
214+
215+
if not routing_decision.can_run:
216+
return routing_decision
217+
176218
in_msg_meta = extract_message_meta(routing_decision.routing_context.in_msg)
177219
sentry_sdk.update_current_span(
178220
attributes={

0 commit comments

Comments
 (0)