Skip to content

Commit ecd918b

Browse files
committed
emit clog logs when --jira-ticket param is not passed to paasta spark-run
1 parent 3872dea commit ecd918b

File tree

5 files changed

+655
-44
lines changed

5 files changed

+655
-44
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ pyyaml >= 3.0
44
typing-extensions==4.13.2
55
# To resolve the error: botocore 1.29.125 has requirement urllib3<1.27,>=1.25.4, but you'll have urllib3 2.0.1 which is incompatible.
66
urllib3==1.26.15
7+
yelp-clog==7.2.0

service_configuration_lib/spark_config.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from service_configuration_lib.text_colors import TextColors
2525
from service_configuration_lib.utils import EPHEMERAL_PORT_END
2626
from service_configuration_lib.utils import EPHEMERAL_PORT_START
27+
from service_configuration_lib.utils import get_clog_handler
2728

2829
AWS_CREDENTIALS_DIR = '/etc/boto_cfg/'
2930
AWS_ENV_CREDENTIALS_PROVIDER = 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider'
@@ -440,13 +441,16 @@ def __init__(self, is_driver_on_k8s_tron: bool = False):
440441
try:
441442
(
442443
self.spark_srv_conf, self.spark_constants, self.default_spark_srv_conf,
443-
self.mandatory_default_spark_srv_conf, self.spark_costs,
444+
self.mandatory_default_spark_srv_conf, self.spark_costs, _,
444445
) = utils.load_spark_srv_conf()
445446
except Exception as e:
446447
log.error(f'Failed to load Spark srv configs: {e}')
447448
# should fail because Spark config calculation depends on values in srv-configs
448449
raise e
449450

451+
# Get a MonkHandler instance for JIRA validation warnings.
452+
self.jira_monk_handler_ref = get_clog_handler(client_id='service_configuration_lib.spark_config')
453+
450454
def _append_spark_prometheus_conf(self, spark_opts: Dict[str, str]) -> Dict[str, str]:
451455
spark_opts['spark.ui.prometheus.enabled'] = 'true'
452456
spark_opts[
@@ -1040,13 +1044,29 @@ def _handle_jira_ticket_validation(
10401044
)
10411045
raise RuntimeError(error_msg)
10421046
else:
1043-
log.warning(
1044-
f'Jira ticket check is configured, but ticket is missing or invalid for user "{user}".\n'
1045-
f'Proceeding with job execution. Original ticket value: "{jira_ticket}".\n'
1046-
'Please pass the parameter as: paasta spark-run --jira-ticket=PROJ-1234 \n'
1047-
'For more information: http://y/spark-jira-ticket-param \n'
1048-
'If you have questions, please reach out to #spark on Slack.',
1047+
warning_message = (
1048+
f'Jira ticket check is configured, but ticket is missing or invalid for user "{user}". '
1049+
f'Proceeding with job execution. Original ticket value: "{jira_ticket}". '
1050+
'Please pass the parameter as: paasta spark-run --jira-ticket=PROJ-1234 '
1051+
'For more information: http://y/spark-jira-ticket-param '
1052+
'If you have questions, please reach out to #spark on Slack. '
10491053
)
1054+
if self.jira_monk_handler_ref:
1055+
log_payload = {
1056+
'timestamp': int(time.time()),
1057+
'scribe_log': self.jira_monk_handler_ref.stream,
1058+
'event': 'jira_ticket_validation_warning',
1059+
'level': 'WARNING',
1060+
'reason': 'Ticket missing or invalid. See http://y/spark-jira-ticket-param',
1061+
'user': user,
1062+
'jira_ticket_provided': jira_ticket,
1063+
}
1064+
self.jira_monk_handler_ref.logger.log_line(
1065+
self.jira_monk_handler_ref.stream, json.dumps(log_payload),
1066+
)
1067+
else:
1068+
# Fallback to default logger if clog handler setup failed or ref is missing/invalid
1069+
log.warning(warning_message)
10501070
return valid_ticket
10511071

10521072
def get_spark_conf(

service_configuration_lib/utils.py

Lines changed: 175 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import errno
44
import hashlib
55
import logging
6+
import os
67
import random
78
import string
89
import uuid
@@ -13,12 +14,19 @@
1314
from socket import SOL_SOCKET
1415
from typing import Any
1516
from typing import Dict
17+
from typing import List
18+
from typing import Optional
1619
from typing import Tuple
1720

21+
import clog.config
22+
import srv_configs
1823
import yaml
24+
from clog.config import monk_host
25+
from clog.config import monk_port
26+
from clog.handlers import MonkHandler
1927
from typing_extensions import Literal
2028

21-
DEFAULT_SPARK_RUN_CONFIG = '/nail/srv/configs/spark.yaml'
29+
DEFAULT_SPARK_RUN_CONFIG = '/nail/home/sids/repos/srv-configs/common/spark.yaml'
2230
POD_TEMPLATE_PATH = '/nail/tmp/spark-pt-{file_uuid}.yaml'
2331
SPARK_EXECUTOR_POD_TEMPLATE = '/nail/srv/configs/spark_executor_pod_template.yaml'
2432

@@ -37,11 +45,12 @@
3745

3846

3947
def load_spark_srv_conf(preset_values=None) -> Tuple[
40-
Dict[str, Any],
41-
Dict[str, Any],
42-
Dict[str, Any],
43-
Dict[str, Any],
44-
Dict[str, Dict[str, float]],
48+
Dict[str, Any], # spark_srv_conf
49+
Dict[str, Any], # spark_constants
50+
Dict[str, Any], # default_spark_srv_conf
51+
Dict[str, Any], # mandatory_default_spark_srv_conf
52+
Dict[str, Dict[str, float]], # spark_costs
53+
List[Dict[str, Any]], # module_configs
4554
]:
4655
if preset_values is None:
4756
preset_values = dict()
@@ -53,9 +62,10 @@ def load_spark_srv_conf(preset_values=None) -> Tuple[
5362
default_spark_srv_conf = spark_constants['defaults']
5463
mandatory_default_spark_srv_conf = spark_constants['mandatory_defaults']
5564
spark_costs = spark_constants['cost_factor']
65+
module_configs = loaded_values['module_config']
5666
return (
5767
spark_srv_conf, spark_constants, default_spark_srv_conf,
58-
mandatory_default_spark_srv_conf, spark_costs,
68+
mandatory_default_spark_srv_conf, spark_costs, module_configs,
5969
)
6070
except Exception as e:
6171
log.warning(f'Failed to load {DEFAULT_SPARK_RUN_CONFIG}: {e}')
@@ -217,3 +227,161 @@ def get_spark_driver_memory_overhead_mb(spark_conf: Dict[str, str]) -> float:
217227
)
218228
driver_mem_overhead_mb = driver_mem_mb * driver_mem_overhead_factor
219229
return round(driver_mem_overhead_mb, 5)
230+
231+
232+
def _load_default_service_configurations_for_clog() -> Optional[Dict[str, Any]]:
233+
"""
234+
Loads the external configuration file for the 'clog' namespace if specified in
235+
DEFAULT_SPARK_RUN_CONFIG's 'module_config' section.
236+
Returns the inline 'config' dictionary for the 'clog' namespace if found,
237+
otherwise None.
238+
"""
239+
clog_config_file_path = None
240+
clog_inline_config = None
241+
found_clog_module_config = False
242+
243+
try:
244+
_, _, _, _, _, module_configs = load_spark_srv_conf()
245+
246+
for mc_item in module_configs:
247+
if isinstance(mc_item, dict) and mc_item.get('namespace') == 'clog':
248+
found_clog_module_config = True
249+
clog_config_file_path = mc_item.get('file')
250+
clog_inline_config = mc_item.get('config')
251+
break
252+
253+
if not found_clog_module_config:
254+
log.warning(
255+
f"Could not find 'clog' namespace entry in 'module_config' "
256+
f'section within {DEFAULT_SPARK_RUN_CONFIG}.',
257+
)
258+
return None
259+
260+
if clog_config_file_path:
261+
if os.path.exists(clog_config_file_path):
262+
try:
263+
srv_configs.use_file(clog_config_file_path, namespace='clog')
264+
log.info(
265+
f'Successfully loaded clog configuration file {clog_config_file_path} '
266+
f"into namespace 'clog'.",
267+
)
268+
except Exception as e_use_file:
269+
log.error(
270+
f'Error loading clog configuration file {clog_config_file_path} '
271+
f'using srv_configs.use_file: {e_use_file}',
272+
)
273+
else:
274+
log.error(
275+
f"Clog configuration file specified in 'module_config' of {DEFAULT_SPARK_RUN_CONFIG} "
276+
f'does not exist: {clog_config_file_path}.',
277+
)
278+
else:
279+
log.info(
280+
f"No 'file' specified for 'clog' namespace in 'module_config' of {DEFAULT_SPARK_RUN_CONFIG}. "
281+
'Not loading any external file for clog via module_config.',
282+
)
283+
284+
# Return the inline config dictionary, which might be None if not present
285+
if isinstance(clog_inline_config, dict):
286+
return clog_inline_config
287+
elif clog_inline_config is not None:
288+
log.warning(f"Inline 'config' for 'clog' namespace in {DEFAULT_SPARK_RUN_CONFIG} is not a dictionary.")
289+
return None
290+
else:
291+
return None
292+
293+
except FileNotFoundError:
294+
log.error(
295+
f'Error: Main Spark run config file {DEFAULT_SPARK_RUN_CONFIG} not found. '
296+
'Cannot process clog configurations.',
297+
)
298+
return None
299+
except yaml.YAMLError as e_yaml:
300+
log.error(f'Error parsing YAML from {DEFAULT_SPARK_RUN_CONFIG}: {e_yaml}')
301+
return None
302+
except Exception as e_main:
303+
log.error(
304+
f'An unexpected error occurred in _load_default_service_configurations_for_clog: {e_main}',
305+
)
306+
return None
307+
308+
309+
def get_clog_handler(
310+
client_id: Optional[str] = None,
311+
stream_name_override: Optional[str] = None,
312+
) -> Optional[MonkHandler]:
313+
"""
314+
Configures and returns a clog MonkHandler for logging.
315+
316+
This utility helps in setting up a MonkHandler. It ensures the external
317+
clog configuration file (if specified in DEFAULT_SPARK_RUN_CONFIG) is loaded
318+
into srv_configs. It then determines the log_stream_name with the following
319+
priority:
320+
1. `stream_name_override` argument.
321+
2. `log_stream_name` from the inline 'config' of the 'clog' module_config
322+
in DEFAULT_SPARK_RUN_CONFIG.
323+
3. `log_stream_name` from the 'clog' namespace in srv_configs (typically
324+
loaded from the external file like /nail/srv/configs/clog.yaml).
325+
326+
Args:
327+
client_id: Optional client identifier for the log messages.
328+
Defaults to the current OS user or 'unknown_spark_user'.
329+
stream_name_override: Optional explicit clog stream name to use,
330+
overriding any configured values.
331+
332+
Returns:
333+
A configured MonkHandler instance if successful, otherwise None.
334+
"""
335+
# Load external file (if any) and get inline config from spark.yaml's module_config
336+
inline_clog_config = _load_default_service_configurations_for_clog()
337+
338+
actual_client_id = client_id or os.getenv('USER') or 'unknown_spark_user'
339+
final_stream_name = stream_name_override
340+
341+
if not final_stream_name:
342+
if inline_clog_config and isinstance(inline_clog_config.get('log_stream_name'), str):
343+
final_stream_name = inline_clog_config['log_stream_name']
344+
log.info(
345+
f"Using log_stream_name '{final_stream_name}' from inline module_config in "
346+
f'{DEFAULT_SPARK_RUN_CONFIG}.',
347+
)
348+
else:
349+
try:
350+
# Fallback to srv_configs (which should have data from the external file)
351+
clog_srv_configs_dict = srv_configs.get_namespace_as_dict('clog')
352+
final_stream_name = clog_srv_configs_dict.get('log_stream_name')
353+
if final_stream_name:
354+
log.info(
355+
f"Using log_stream_name '{final_stream_name}' from srv_configs 'clog' namespace "
356+
f'(likely from external file).',
357+
)
358+
except Exception as e:
359+
log.warning(
360+
f"Could not get 'clog' namespace from srv_configs or 'log_stream_name' key missing. "
361+
f'This may be okay if stream_name_override or inline config provides it. Error: {e}',
362+
)
363+
364+
if not final_stream_name:
365+
log.error(
366+
'Clog stream_name could not be determined. It was not provided as an argument, '
367+
'not found in the inline module_config for "clog", and not found in the '
368+
'"clog" srv_configs namespace. Clog handler cannot be configured.',
369+
)
370+
return None
371+
372+
# Ensure that clog is configured to enable Monk logging.
373+
# The default in clog.config.monk_disable is True.
374+
clog.config.configure_from_dict({'monk_disable': False})
375+
log.info('Clog has been configured to enable Monk logging (monk_disable=False).')
376+
377+
try:
378+
handler = MonkHandler(
379+
client_id=actual_client_id,
380+
host=monk_host,
381+
port=monk_port,
382+
stream=final_stream_name,
383+
)
384+
return handler
385+
except Exception as e:
386+
log.error(f"Failed to create MonkHandler for clog with stream '{final_stream_name}'. Error: {e}")
387+
return None

0 commit comments

Comments
 (0)