@@ -48,7 +48,8 @@ class DeleteQueryMessage(TypedDict):
48
48
49
49
PRODUCER_MAP : MutableMapping [str , Producer ] = {}
50
50
STORAGE_TOPIC : Mapping [str , Topic ] = {
51
- StorageKey .SEARCH_ISSUES .value : Topic .LW_DELETIONS_GENERIC_EVENTS
51
+ StorageKey .SEARCH_ISSUES .value : Topic .LW_DELETIONS_GENERIC_EVENTS ,
52
+ StorageKey .EAP_ITEMS .value : Topic .LW_DELETIONS_EAP_ITEMS ,
52
53
}
53
54
54
55
@@ -80,17 +81,13 @@ def _flush_producers() -> None:
80
81
for storage , producer in PRODUCER_MAP .items ():
81
82
messages_remaining = producer .flush (5.0 )
82
83
if messages_remaining :
83
- logger .debug (
84
- f"{ messages_remaining } { storage } messages pending delivery"
85
- )
84
+ logger .debug (f"{ messages_remaining } { storage } messages pending delivery" )
86
85
time .sleep (1 )
87
86
88
87
Thread (target = _flush_producers , name = "flush_producers" , daemon = True ).start ()
89
88
90
89
91
- def _delete_query_delivery_callback (
92
- error : Optional [KafkaError ], message : KafkaMessage
93
- ) -> None :
90
+ def _delete_query_delivery_callback (error : Optional [KafkaError ], message : KafkaMessage ) -> None :
94
91
metrics .increment (
95
92
"delete_query.delivery_callback" ,
96
93
tags = {"status" : "failure" if error else "success" },
@@ -147,9 +144,7 @@ def delete_from_storage(
147
144
148
145
delete_settings = storage .get_deletion_settings ()
149
146
if not delete_settings .is_enabled :
150
- raise DeletesNotEnabledError (
151
- f"Deletes not enabled for { storage .get_storage_key ().value } "
152
- )
147
+ raise DeletesNotEnabledError (f"Deletes not enabled for { storage .get_storage_key ().value } " )
153
148
154
149
columns_diff = set (conditions .keys ()) - set (delete_settings .allowed_columns )
155
150
if columns_diff != set ():
@@ -170,9 +165,7 @@ def delete_from_storage(
170
165
return delete_from_tables (storage , delete_settings .tables , conditions , attr_info )
171
166
172
167
173
- def construct_query (
174
- storage : WritableTableStorage , table : str , condition : Expression
175
- ) -> Query :
168
+ def construct_query (storage : WritableTableStorage , table : str , condition : Expression ) -> Query :
176
169
cluster_name = storage .get_cluster ().get_clickhouse_cluster_name ()
177
170
on_cluster = literal (cluster_name ) if cluster_name else None
178
171
return Query (
@@ -233,7 +226,5 @@ def construct_or_conditions(conditions: Sequence[ConditionsType]) -> Expression:
233
226
234
227
235
228
def should_use_killswitch (storage_name : str , project_id : str ) -> bool :
236
- killswitch_config = get_str_config (
237
- f"lw_deletes_killswitch_{ storage_name } " , default = ""
238
- )
229
+ killswitch_config = get_str_config (f"lw_deletes_killswitch_{ storage_name } " , default = "" )
239
230
return project_id in killswitch_config if killswitch_config else False
0 commit comments