Skip to content

Commit f79b9e0

Browse files
authored
MLCOMPUTE-2001 | Skip UI port selection, pod tpl generation, and logs for driver on k8s tron (#151)
* Skip ui port sel and pod tpl and logs for driver on k8s * Fix tests * Add comments
1 parent ef95386 commit f79b9e0

File tree

2 files changed

+53
-34
lines changed

2 files changed

+53
-34
lines changed

service_configuration_lib/spark_config.py

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
from service_configuration_lib import utils
2424
from service_configuration_lib.text_colors import TextColors
25+
from service_configuration_lib.utils import EPHEMERAL_PORT_END
26+
from service_configuration_lib.utils import EPHEMERAL_PORT_START
2527

2628
AWS_CREDENTIALS_DIR = '/etc/boto_cfg/'
2729
AWS_ENV_CREDENTIALS_PROVIDER = 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider'
@@ -32,6 +34,7 @@
3234
GPUS_HARD_LIMIT = 15
3335
CLUSTERMAN_METRICS_YAML_FILE_PATH = '/nail/srv/configs/clusterman_metrics.yaml'
3436
CLUSTERMAN_YAML_FILE_PATH = '/nail/srv/configs/clusterman.yaml'
37+
SPARK_TRON_JOB_USER = 'TRON'
3538

3639
NON_CONFIGURABLE_SPARK_OPTS = {
3740
'spark.master',
@@ -295,7 +298,7 @@ def _get_k8s_spark_env(
295298
paasta_service: str,
296299
paasta_instance: str,
297300
docker_img: str,
298-
pod_template_path: str,
301+
pod_template_path: Optional[str],
299302
volumes: Optional[List[Mapping[str, str]]],
300303
paasta_pool: str,
301304
driver_ui_port: int,
@@ -335,9 +338,12 @@ def _get_k8s_spark_env(
335338
'spark.kubernetes.executor.label.yelp.com/pool': paasta_pool,
336339
'spark.kubernetes.executor.label.paasta.yelp.com/pool': paasta_pool,
337340
'spark.kubernetes.executor.label.yelp.com/owner': 'core_ml',
338-
'spark.kubernetes.executor.podTemplateFile': pod_template_path,
339341
**_get_k8s_docker_volumes_conf(volumes),
340342
}
343+
344+
if pod_template_path is not None:
345+
spark_env['spark.kubernetes.executor.podTemplateFile'] = pod_template_path
346+
341347
if service_account_name is not None:
342348
spark_env.update(
343349
{
@@ -419,12 +425,13 @@ def get_total_driver_memory_mb(spark_conf: Dict[str, str]) -> int:
419425

420426
class SparkConfBuilder:
421427

422-
def __init__(self):
423-
self.spark_srv_conf = dict()
424-
self.spark_constants = dict()
425-
self.default_spark_srv_conf = dict()
426-
self.mandatory_default_spark_srv_conf = dict()
427-
self.spark_costs = dict()
428+
def __init__(self, is_driver_on_k8s_tron: bool = False):
429+
self.is_driver_on_k8s_tron = is_driver_on_k8s_tron
430+
self.spark_srv_conf: Dict[str, Any] = dict()
431+
self.spark_constants: Dict[str, Any] = dict()
432+
self.default_spark_srv_conf: Dict[str, Any] = dict()
433+
self.mandatory_default_spark_srv_conf: Dict[str, Any] = dict()
434+
self.spark_costs: Dict[str, Dict[str, float]] = dict()
428435

429436
try:
430437
(
@@ -628,7 +635,7 @@ def compute_executor_instances_k8s(self, user_spark_opts: Dict[str, str]) -> int
628635
)
629636

630637
# Deprecation message
631-
if 'spark.cores.max' in user_spark_opts:
638+
if not self.is_driver_on_k8s_tron and 'spark.cores.max' in user_spark_opts:
632639
log.warning(
633640
f'spark.cores.max is DEPRECATED. Replace with '
634641
f'spark.executor.instances={executor_instances} in --spark-args and in your service code '
@@ -1102,23 +1109,27 @@ def get_spark_conf(
11021109
spark_app_base_name
11031110
)
11041111

1105-
# Pick a port from a pre-defined port range, which will then be used by our Jupyter
1106-
# server metric aggregator API. The aggregator API collects Prometheus metrics from multiple
1107-
# Spark sessions and exposes them through a single endpoint.
1108-
try:
1109-
ui_port = int(
1110-
(spark_opts_from_env or {}).get('spark.ui.port') or
1111-
utils.ephemeral_port_reserve_range(
1112-
self.spark_constants.get('preferred_spark_ui_port_start'),
1113-
self.spark_constants.get('preferred_spark_ui_port_end'),
1114-
),
1115-
)
1116-
except Exception as e:
1117-
log.warning(
1118-
f'Could not get an available port using srv-config port range: {e}. '
1119-
'Using default port range to get an available port.',
1120-
)
1121-
ui_port = utils.ephemeral_port_reserve_range()
1112+
if self.is_driver_on_k8s_tron:
1113+
# For Tron-launched driver on k8s, we use a static Spark UI port
1114+
ui_port: int = self.spark_constants.get('preferred_spark_ui_port_start', EPHEMERAL_PORT_START)
1115+
else:
1116+
# Pick a port from a pre-defined port range, which will then be used by our Jupyter
1117+
# server metric aggregator API. The aggregator API collects Prometheus metrics from multiple
1118+
# Spark sessions and exposes them through a single endpoint.
1119+
try:
1120+
ui_port = int(
1121+
(spark_opts_from_env or {}).get('spark.ui.port') or
1122+
utils.ephemeral_port_reserve_range(
1123+
self.spark_constants.get('preferred_spark_ui_port_start', EPHEMERAL_PORT_START),
1124+
self.spark_constants.get('preferred_spark_ui_port_end', EPHEMERAL_PORT_END),
1125+
),
1126+
)
1127+
except Exception as e:
1128+
log.warning(
1129+
f'Could not get an available port using srv-config port range: {e}. '
1130+
'Using default port range to get an available port.',
1131+
)
1132+
ui_port = utils.ephemeral_port_reserve_range()
11221133

11231134
spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)}
11241135
random_postfix = utils.get_random_string(4)
@@ -1157,12 +1168,14 @@ def get_spark_conf(
11571168
)
11581169

11591170
# Add pod template file
1160-
pod_template_path = utils.generate_pod_template_path()
1161-
try:
1162-
utils.create_pod_template(pod_template_path, app_base_name)
1163-
except Exception as e:
1164-
log.error(f'Failed to generate Spark executor pod template: {e}')
1165-
pod_template_path = ''
1171+
pod_template_path: Optional[str] = None
1172+
if not self.is_driver_on_k8s_tron:
1173+
pod_template_path = utils.generate_pod_template_path()
1174+
try:
1175+
utils.create_pod_template(pod_template_path, app_base_name)
1176+
except Exception as e:
1177+
log.error(f'Failed to generate Spark executor pod template: {e}')
1178+
pod_template_path = None
11661179

11671180
if cluster_manager == 'kubernetes':
11681181
spark_conf.update(_get_k8s_spark_env(

service_configuration_lib/utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
from socket import SO_REUSEADDR
1212
from socket import socket
1313
from socket import SOL_SOCKET
14+
from typing import Any
1415
from typing import Dict
15-
from typing import Mapping
1616
from typing import Tuple
1717

1818
import yaml
@@ -36,7 +36,13 @@
3636
log.setLevel(logging.INFO)
3737

3838

39-
def load_spark_srv_conf(preset_values=None) -> Tuple[Mapping, Mapping, Mapping, Mapping, Mapping]:
39+
def load_spark_srv_conf(preset_values=None) -> Tuple[
40+
Dict[str, Any],
41+
Dict[str, Any],
42+
Dict[str, Any],
43+
Dict[str, Any],
44+
Dict[str, Dict[str, float]],
45+
]:
4046
if preset_values is None:
4147
preset_values = dict()
4248
try:

0 commit comments

Comments
 (0)