3636CLUSTERMAN_METRICS_YAML_FILE_PATH = '/nail/srv/configs/clusterman_metrics.yaml'
3737CLUSTERMAN_YAML_FILE_PATH = '/nail/srv/configs/clusterman.yaml'
3838SPARK_TRON_JOB_USER = 'TRON'
39+ JIRA_TICKET_PATTERN = re .compile (r'^[A-Z]+-[0-9]+$' )
3940
4041NON_CONFIGURABLE_SPARK_OPTS = {
4142 'spark.master' ,
@@ -305,6 +306,7 @@ def _get_k8s_spark_env(
305306 include_self_managed_configs : bool = True ,
306307 k8s_server_address : Optional [str ] = None ,
307308 user : Optional [str ] = None ,
309+ jira_ticket : Optional [str ] = None ,
308310) -> Dict [str , str ]:
309311 # RFC 1123: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
310312 # technically only paasta instance can be longer than 63 chars. But we apply the normalization regardless.
@@ -357,6 +359,9 @@ def _get_k8s_spark_env(
357359 'spark.master' : f'k8s://{ k8s_server_address } ' ,
358360 })
359361
362+ if jira_ticket is not None :
363+ spark_env ['spark.kubernetes.executor.label.spark.yelp.com/jira_ticket' ] = jira_ticket
364+
360365 return spark_env
361366
362367
@@ -979,6 +984,15 @@ def compute_approx_hourly_cost_dollars(
979984 )
980985 return min_dollars , max_dollars
981986
987+ def _get_valid_jira_ticket (self , jira_ticket : Optional [str ]) -> Optional [str ]:
988+ """Checks for and validates the 'jira_ticket' format."""
989+ ticket = jira_ticket
990+ if ticket and JIRA_TICKET_PATTERN .match (ticket ):
991+ log .info (f'Valid Jira ticket provided: { ticket } ' )
992+ return ticket
993+ log .warning (f'Jira ticket missing or invalid format: { ticket } ' )
994+ return None
995+
982996 def get_spark_conf (
983997 self ,
984998 cluster_manager : str ,
@@ -996,6 +1010,7 @@ def get_spark_conf(
9961010 spark_opts_from_env : Optional [Mapping [str , str ]] = None ,
9971011 aws_region : Optional [str ] = None ,
9981012 service_account_name : Optional [str ] = None ,
1013+ jira_ticket : Optional [str ] = None ,
9991014 force_spark_resource_configs : bool = True ,
10001015 user : Optional [str ] = None ,
10011016 ) -> Dict [str , str ]:
@@ -1036,6 +1051,21 @@ def get_spark_conf(
10361051 # is str type.
10371052 user_spark_opts = _convert_user_spark_opts_value_to_str (user_spark_opts )
10381053
1054+ if self .mandatory_default_spark_srv_conf .get ('spark.yelp.jira_ticket.enabled' ) == 'true' :
1055+ needs_jira_check = os .environ .get ('USER' , '' ) not in ['batch' , 'TRON' , '' ]
1056+ if needs_jira_check :
1057+ valid_ticket = self ._get_valid_jira_ticket (jira_ticket )
1058+ if valid_ticket is None :
1059+ error_msg = (
1060+ 'Job requires a valid Jira ticket (format PROJ-1234).\n '
1061+ 'Please pass the parameter as: paasta spark-run --jira-ticket=PROJ-1234 \n '
1062+ 'For more information: https://yelpwiki.yelpcorp.com/spaces/AML/pages/402885641 \n '
1063+ 'If you have questions, please reach out to #spark on Slack.\n '
1064+ )
1065+ raise RuntimeError (error_msg )
1066+ else :
1067+ log .debug ('Jira ticket check not required for this job configuration.' )
1068+
10391069 app_base_name = (
10401070 user_spark_opts .get ('spark.app.name' ) or
10411071 spark_app_base_name
@@ -1123,6 +1153,7 @@ def get_spark_conf(
11231153 include_self_managed_configs = not use_eks ,
11241154 k8s_server_address = k8s_server_address ,
11251155 user = user ,
1156+ jira_ticket = jira_ticket ,
11261157 ))
11271158 elif cluster_manager == 'local' :
11281159 spark_conf .update (_get_local_spark_env (
0 commit comments