Skip to content

Commit e795919

Browse files
authored
Merge pull request #164 from Yelp/u/sids/CLOUD-816/dry-run-metrics
dry run mode for --jira-ticket parmeter in paasta spark-run
2 parents 062a609 + 83ac515 commit e795919

File tree

8 files changed

+289
-44
lines changed

8 files changed

+289
-44
lines changed

mypy.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ warn_incomplete_stub = True
55
follow_imports = silent
66
ignore_missing_imports = True
77
mypy_path = stubs
8+
strict_optional = False

service_configuration_lib/spark_config.py

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
TICKET_NOT_REQUIRED_USERS = {
8080
'batch', # non-human spark-run from batch boxes
8181
'TRON', # tronjobs that run commands like paasta mark-for-deployment
82+
'jenkins', # username for jenkins pipeline jobs
8283
None, # placeholder for being unable to determine user
8384
}
8485
USER_LABEL_UNSPECIFIED = 'UNSPECIFIED'
@@ -993,9 +994,79 @@ def _get_valid_jira_ticket(self, jira_ticket: Optional[str]) -> Optional[str]:
993994
if ticket and JIRA_TICKET_PATTERN.match(ticket):
994995
log.info(f'Valid Jira ticket provided: {ticket}')
995996
return ticket
996-
log.warning(f'Jira ticket missing or invalid format: {ticket}')
997+
log.info(f'Jira ticket missing or invalid format: {ticket}')
997998
return None
998999

1000+
def _handle_jira_ticket_validation(
1001+
self,
1002+
cluster_manager: str,
1003+
user: Optional[str],
1004+
jira_ticket: Optional[str],
1005+
paasta_cluster: str,
1006+
paasta_service: str,
1007+
paasta_instance: str,
1008+
) -> Optional[str]:
1009+
"""
1010+
This method checks if Jira ticket validation is enabled, if the user needs
1011+
to provide a ticket, and validates the ticket if needed.
1012+
1013+
Returns:
1014+
The validated Jira ticket string if valid, otherwise None.
1015+
1016+
Args:
1017+
cluster_manager: The cluster manager being used
1018+
user: The user running the job
1019+
jira_ticket: The Jira ticket provided by the user
1020+
"""
1021+
flag_enabled = self.mandatory_default_spark_srv_conf.get('spark.yelp.jira_ticket.enabled', 'false')
1022+
valid_ticket = self._get_valid_jira_ticket(jira_ticket)
1023+
1024+
# Skip validation for local cluster manager or exempt users
1025+
if cluster_manager == 'local' or user in TICKET_NOT_REQUIRED_USERS:
1026+
log.debug('Jira ticket check not required for this job configuration.')
1027+
# If exempt, we still pass through the original ticket if it's valid,
1028+
# otherwise None. This allows exempt users like tron to still have their valid tickets
1029+
# (if provided) attached as labels, without forcing validation.
1030+
return valid_ticket
1031+
1032+
if valid_ticket is None:
1033+
log_payload = {
1034+
'timestamp': int(time.time()),
1035+
'event': 'jira_ticket_validation_warning',
1036+
'level': 'WARNING',
1037+
'reason': 'Ticket missing or invalid',
1038+
'user': user,
1039+
'jira_ticket_provided': jira_ticket,
1040+
'paasta_cluster': paasta_cluster,
1041+
'paasta_service': paasta_service,
1042+
'paasta_instance': paasta_instance,
1043+
}
1044+
if flag_enabled == 'true':
1045+
error_msg = (
1046+
f'Job requires a valid Jira ticket (format PROJ-1234).\n'
1047+
f'Jira ticket check is enabled, but ticket "{jira_ticket}" is '
1048+
f'missing or invalid for user "{user}".\n'
1049+
'Please pass the parameter as: paasta spark-run --jira-ticket=PROJ-1234 \n'
1050+
'For more information: http://y/spark-jira-ticket-param \n'
1051+
'If you have questions, please reach out to #spark on Slack.\n'
1052+
f'paasta_cluster={paasta_cluster}, paasta_service={paasta_service}\n'
1053+
f'paasta_instance={paasta_instance}'
1054+
)
1055+
utils.log_to_clog('spark_jira_ticket', log_payload, error_msg, log)
1056+
raise RuntimeError(error_msg)
1057+
else:
1058+
warning_message = (
1059+
f'Jira ticket check is configured, but ticket is missing or invalid for user "{user}". '
1060+
f'Proceeding with job execution. Original ticket value: "{jira_ticket}". '
1061+
'Please pass the parameter as: paasta spark-run --jira-ticket=PROJ-1234 '
1062+
'For more information: http://y/spark-jira-ticket-param '
1063+
'If you have questions, please reach out to #spark on Slack. '
1064+
f'paasta_cluster={paasta_cluster}, paasta_service={paasta_service}\n'
1065+
f'paasta_instance={paasta_instance}'
1066+
)
1067+
utils.log_to_clog('spark_jira_ticket', log_payload, warning_message, log)
1068+
return valid_ticket
1069+
9991070
def get_spark_conf(
10001071
self,
10011072
cluster_manager: str,
@@ -1040,6 +1111,7 @@ def get_spark_conf(
10401111
to launch the batch, and inside the batch use `spark_tools.paasta` to create
10411112
spark session.
10421113
:param aws_region: The default aws region to use
1114+
:param jira_ticket: The jira project that this spark job is related to.
10431115
:param service_account_name: The k8s service account to use for spark k8s authentication.
10441116
:param force_spark_resource_configs: skip the resource/instances recalculation.
10451117
This is strongly not recommended.
@@ -1058,20 +1130,10 @@ def get_spark_conf(
10581130
# Get user from environment variables if it's not set
10591131
user = user or os.environ.get('USER', None)
10601132

1061-
if self.mandatory_default_spark_srv_conf.get('spark.yelp.jira_ticket.enabled') == 'true':
1062-
needs_jira_check = cluster_manager != 'local' and user not in TICKET_NOT_REQUIRED_USERS
1063-
if needs_jira_check:
1064-
valid_ticket = self._get_valid_jira_ticket(jira_ticket)
1065-
if valid_ticket is None:
1066-
error_msg = (
1067-
'Job requires a valid Jira ticket (format PROJ-1234).\n'
1068-
'Please pass the parameter as: paasta spark-run --jira-ticket=PROJ-1234 \n'
1069-
'For more information: https://yelpwiki.yelpcorp.com/spaces/AML/pages/402885641 \n'
1070-
f'If you have questions, please reach out to #spark on Slack. (user={user})\n'
1071-
)
1072-
raise RuntimeError(error_msg)
1073-
else:
1074-
log.debug('Jira ticket check not required for this job configuration.')
1133+
# Handle Jira ticket validation if enabled
1134+
validated_jira_ticket = self._handle_jira_ticket_validation(
1135+
cluster_manager, user, jira_ticket, paasta_cluster, paasta_service, paasta_instance,
1136+
)
10751137

10761138
app_base_name = (
10771139
user_spark_opts.get('spark.app.name') or
@@ -1160,7 +1222,7 @@ def get_spark_conf(
11601222
include_self_managed_configs=not use_eks,
11611223
k8s_server_address=k8s_server_address,
11621224
user=user,
1163-
jira_ticket=jira_ticket,
1225+
jira_ticket=validated_jira_ticket,
11641226
))
11651227
elif cluster_manager == 'local':
11661228
spark_conf.update(_get_local_spark_env(

service_configuration_lib/utils.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import contextlib
33
import errno
44
import hashlib
5+
import json
56
import logging
67
import os
78
import random
@@ -19,6 +20,12 @@
1920
import yaml
2021
from typing_extensions import Literal
2122

23+
# Only works inside yelpy environments
24+
try:
25+
import clog
26+
except ImportError:
27+
clog = None
28+
2229
DEFAULT_SPARK_RUN_CONFIG = '/nail/srv/configs/spark.yaml'
2330
POD_TEMPLATE_PATH = '/nail/tmp/spark-pt-{file_uuid}.yaml'
2431
SPARK_EXECUTOR_POD_TEMPLATE = '/nail/srv/configs/spark_executor_pod_template.yaml'
@@ -33,16 +40,16 @@
3340
SPARK_DRIVER_MEM_OVERHEAD_FACTOR_DEFAULT = 0.1
3441

3542

36-
log = logging.Logger(__name__)
43+
log = logging.getLogger(__name__)
3744
log.setLevel(logging.INFO)
3845

3946

4047
def load_spark_srv_conf(preset_values=None) -> Tuple[
41-
Dict[str, Any],
42-
Dict[str, Any],
43-
Dict[str, Any],
44-
Dict[str, Any],
45-
Dict[str, Dict[str, float]],
48+
Dict[str, Any], # spark_srv_conf
49+
Dict[str, Any], # spark_constants
50+
Dict[str, Any], # default_spark_srv_conf
51+
Dict[str, Any], # mandatory_default_spark_srv_conf
52+
Dict[str, Dict[str, float]], # spark_costs
4653
]:
4754
if preset_values is None:
4855
preset_values = dict()
@@ -220,3 +227,34 @@ def get_spark_driver_memory_overhead_mb(spark_conf: Dict[str, str]) -> float:
220227
)
221228
driver_mem_overhead_mb = driver_mem_mb * driver_mem_overhead_factor
222229
return round(driver_mem_overhead_mb, 5)
230+
231+
232+
def log_to_clog(log_stream: str, log_payload: dict, warning_message: str, log_instance=None):
233+
"""
234+
Log a message to clog if available, otherwise log a warning.
235+
236+
Args:
237+
log_stream: The clog stream name to log to
238+
log_payload: Dictionary containing the log payload data
239+
warning_message: Warning message to log if clog fails or is unavailable
240+
log_instance: Logger instance to use for warnings (optional)
241+
"""
242+
if clog:
243+
try:
244+
clog.config.configure(
245+
scribe_host='169.254.255.254',
246+
scribe_port='1463',
247+
monk_disable=False,
248+
scribe_disable=False,
249+
)
250+
clog.log_line(log_stream, json.dumps(log_payload))
251+
except Exception as e:
252+
if log_instance:
253+
log_instance.warning(f'{warning_message} Clog operation failed with: {e}')
254+
else:
255+
log.warning(f'{warning_message} Clog operation failed with: {e}')
256+
else:
257+
if log_instance:
258+
log_instance.warning(warning_message)
259+
else:
260+
log.warning(warning_message)

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,7 @@
4343
'scripts/services_needing_puppet_help',
4444
'scripts/services_that_run_here',
4545
],
46+
extras_require={
47+
'yelp': ['yelp-clog>=7'],
48+
},
4649
)

tests/conftest.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from unittest.mock import MagicMock
23
from unittest.mock import Mock
34
from unittest.mock import patch
@@ -80,3 +81,20 @@ def yaml_configs_file_watcher(
8081
configs_suffixes=['.yaml'],
8182
configs_folder=mock_soa_dir,
8283
)
84+
85+
86+
@pytest.fixture(autouse=True)
87+
def mock_clog_logging(monkeypatch):
88+
"""
89+
Autouse fixture to prevent clog logging during tests.
90+
This mocks the log_to_clog function to avoid actual clog operations.
91+
"""
92+
def mock_log_to_clog(log_stream, log_payload, warning_message, log_instance=None):
93+
# During tests, just log the warning message without trying to use clog
94+
if log_instance:
95+
log_instance.warning(warning_message)
96+
else:
97+
logger = logging.getLogger(__name__)
98+
logger.warning(warning_message)
99+
100+
monkeypatch.setattr('service_configuration_lib.utils.log_to_clog', mock_log_to_clog)

0 commit comments

Comments
 (0)