Skip to content

Commit 5cbb07b

Browse files
CaptainSameSameer SharmaSameer Sharma
authored
MLCOMPUTE-23 | Enable DRA using recommended configs (#81)
* MLCOMPUTE-23 | added logic for enabling dra * MLCOMPUTE-23 | added unit tests * MLCOMPUTE-23 | improved logging * MLCOMPUTE-23 | bump up version Co-authored-by: Sameer Sharma <[email protected]> Co-authored-by: Sameer Sharma <[email protected]>
1 parent 0fa1ddb commit 5cbb07b

File tree

3 files changed

+168
-83
lines changed

3 files changed

+168
-83
lines changed

service_configuration_lib/spark_config.py

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
DEFAULT_RESOURCES_WAITING_TIME_PER_EXECUTOR = 2 # seconds
4141
DEFAULT_CLUSTERMAN_OBSERVED_SCALING_TIME = 15 # minutes
4242
DEFAULT_SQL_SHUFFLE_PARTITIONS = 128
43+
DEFAULT_DRA_EXECUTOR_ALLOCATION_RATIO = 0.8
44+
DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT = '420s'
45+
DEFAULT_DRA_MIN_EXECUTOR_RATIO = 0.25
4346

4447

4548
NON_CONFIGURABLE_SPARK_OPTS = {
@@ -70,6 +73,7 @@
7073
K8S_AUTH_FOLDER = '/etc/pki/spark'
7174

7275
log = logging.Logger(__name__)
76+
log.setLevel(logging.INFO)
7377

7478

7579
def _load_aws_credentials_from_yaml(yaml_file_path) -> Tuple[str, str, Optional[str]]:
@@ -250,6 +254,73 @@ def _append_sql_shuffle_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str,
250254
return spark_opts
251255

252256

257+
def _get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
258+
if (
259+
'spark.dynamicAllocation.enabled' not in spark_opts or
260+
str(spark_opts['spark.dynamicAllocation.enabled']) != 'true'
261+
):
262+
return spark_opts
263+
264+
# set defaults if not provided already
265+
_append_spark_config(spark_opts, 'spark.dynamicAllocation.shuffleTracking.enabled', 'true')
266+
_append_spark_config(
267+
spark_opts, 'spark.dynamicAllocation.executorAllocationRatio',
268+
str(DEFAULT_DRA_EXECUTOR_ALLOCATION_RATIO),
269+
)
270+
_append_spark_config(
271+
spark_opts, 'spark.dynamicAllocation.cachedExecutorIdleTimeout',
272+
str(DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT),
273+
)
274+
275+
if 'spark.dynamicAllocation.minExecutors' not in spark_opts:
276+
# the ratio of total executors to be used as minExecutors
277+
min_executor_ratio = spark_opts.get('spark.yelp.dra.minExecutorRatio', DEFAULT_DRA_MIN_EXECUTOR_RATIO)
278+
# set minExecutors default as a ratio of spark.executor.instances
279+
min_executors = int(int(spark_opts.get('spark.executor.instances', DEFAULT_EXECUTOR_INSTANCES)) *
280+
float(min_executor_ratio))
281+
# minExecutors should not be more than initialExecutors
282+
if 'spark.dynamicAllocation.initialExecutors' in spark_opts:
283+
min_executors = min(min_executors, int(spark_opts['spark.dynamicAllocation.initialExecutors']))
284+
# minExecutors should not be more than maxExecutors
285+
if 'spark.dynamicAllocation.maxExecutors' in spark_opts:
286+
min_executors = min(
287+
min_executors, int(int(spark_opts['spark.dynamicAllocation.maxExecutors']) *
288+
float(min_executor_ratio)),
289+
)
290+
291+
spark_opts['spark.dynamicAllocation.minExecutors'] = str(min_executors)
292+
log.warning(
293+
f'\nSetting spark.dynamicAllocation.minExecutors as {min_executors}. If you wish to '
294+
f'change the value of minimum executors, please provide the exact value of '
295+
f'spark.dynamicAllocation.minExecutors in --spark-args\n',
296+
)
297+
298+
if 'spark.yelp.dra.minExecutorRatio' not in spark_opts:
299+
log.debug(
300+
f'\nspark.yelp.dra.minExecutorRatio not provided. This specifies the ratio of total executors '
301+
f'to be used as minimum executors for Dynamic Resource Allocation. More info: y/spark-dra. Using '
302+
f'default ratio: {DEFAULT_DRA_MIN_EXECUTOR_RATIO}. If you wish to change this value, please provide '
303+
f'the desired spark.yelp.dra.minExecutorRatio in --spark-args\n',
304+
)
305+
306+
if 'spark.dynamicAllocation.maxExecutors' not in spark_opts:
307+
# set maxExecutors default equal to spark.executor.instances
308+
max_executors = int(spark_opts.get('spark.executor.instances', DEFAULT_EXECUTOR_INSTANCES))
309+
# maxExecutors should not be less than initialExecutors
310+
if 'spark.dynamicAllocation.initialExecutors' in spark_opts:
311+
max_executors = max(max_executors, int(spark_opts['spark.dynamicAllocation.initialExecutors']))
312+
313+
spark_opts['spark.dynamicAllocation.maxExecutors'] = str(max_executors)
314+
log.warning(
315+
f'\nSetting spark.dynamicAllocation.maxExecutors as {max_executors}. If you wish to '
316+
f'change the value of maximum executors, please provide the exact value of '
317+
f'spark.dynamicAllocation.maxExecutors in --spark-args\n',
318+
)
319+
320+
spark_opts['spark.executor.instances'] = spark_opts['spark.dynamicAllocation.minExecutors']
321+
return spark_opts
322+
323+
253324
def _append_event_log_conf(
254325
spark_opts: Dict[str, str],
255326
access_key: Optional[str],
@@ -340,21 +411,18 @@ def _adjust_spark_requested_resources(
340411
# once mesos is not longer around at Yelp.
341412
if 'spark.executor.instances' not in user_spark_opts:
342413

343-
if (
344-
'spark.dynamicAllocation.enabled' not in user_spark_opts or
345-
str(user_spark_opts['spark.dynamicAllocation.enabled']) != 'true'
346-
):
347-
executor_instances = int(user_spark_opts.get(
348-
'spark.cores.max',
349-
str(DEFAULT_MAX_CORES),
350-
)) // executor_cores
351-
user_spark_opts['spark.executor.instances'] = str(executor_instances)
352-
353-
elif (
354-
'spark.dynamicAllocation.enabled' in user_spark_opts and
355-
str(user_spark_opts['spark.dynamicAllocation.enabled']) == 'true'
356-
):
357-
user_spark_opts['spark.executor.instances'] = str(DEFAULT_EXECUTOR_INSTANCES)
414+
executor_instances = int(user_spark_opts.get(
415+
'spark.cores.max',
416+
str(DEFAULT_MAX_CORES),
417+
)) // executor_cores
418+
user_spark_opts['spark.executor.instances'] = str(executor_instances)
419+
420+
if user_spark_opts['spark.executor.instances'] == str(DEFAULT_EXECUTOR_INSTANCES):
421+
log.warning(
422+
f'spark.executor.instances not provided. Setting spark.executor.instances as '
423+
f'{executor_instances}. If you wish to change the number of executors, please '
424+
f'provide the exact value of spark.executor.instances in --spark-args',
425+
)
358426

359427
if (
360428
'spark.mesos.executor.memoryOverhead' in user_spark_opts and
@@ -740,6 +808,9 @@ def get_spark_conf(
740808
else:
741809
raise ValueError('Unknown resource_manager, should be either [mesos,kubernetes]')
742810

811+
# configure dynamic resource allocation configs
812+
spark_conf = _get_dra_configs(spark_conf)
813+
743814
# configure spark_event_log
744815
spark_conf = _append_event_log_conf(spark_conf, *aws_creds)
745816

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
setup(
1919
name='service-configuration-lib',
20-
version='2.10.6',
20+
version='2.10.7',
2121
provides=['service_configuration_lib'],
2222
description='Start, stop, and inspect Yelp SOA services',
2323
url='https://github.com/Yelp/service_configuration_lib',

tests/spark_config_test.py

Lines changed: 81 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -296,73 +296,6 @@ def gpu_pool(self, tmpdir, monkeypatch):
296296

297297
@pytest.mark.parametrize(
298298
'cluster_manager,user_spark_opts,expected_output', [
299-
# dynamic resource allocation enabled
300-
(
301-
'kubernetes',
302-
{
303-
'spark.dynamicAllocation.enabled': 'true',
304-
'spark.executor.cores': '4',
305-
'spark.cores.max': '128',
306-
},
307-
{
308-
'spark.executor.memory': '4g',
309-
'spark.executor.cores': '4',
310-
'spark.executor.instances': '2',
311-
'spark.kubernetes.executor.limit.cores': '4',
312-
'spark.kubernetes.allocation.batch.size': '512',
313-
'spark.scheduler.maxRegisteredResourcesWaitingTime': '15min',
314-
},
315-
),
316-
(
317-
'kubernetes',
318-
{
319-
'spark.dynamicAllocation.enabled': 'true',
320-
'spark.dynamicAllocation.maxExecutors': '512',
321-
'spark.dynamicAllocation.minExecutors': '128',
322-
'spark.dynamicAllocation.initialExecutors': '128',
323-
'spark.executor.cores': '4',
324-
},
325-
{
326-
'spark.executor.memory': '4g',
327-
'spark.executor.cores': '4',
328-
'spark.executor.instances': '2',
329-
'spark.kubernetes.executor.limit.cores': '4',
330-
'spark.kubernetes.allocation.batch.size': '512',
331-
'spark.scheduler.maxRegisteredResourcesWaitingTime': '15min',
332-
},
333-
),
334-
# dynamic resource allocation disabled with instances specified
335-
(
336-
'kubernetes',
337-
{
338-
'spark.dynamicAllocation.enabled': 'false',
339-
'spark.executor.instances': '600',
340-
},
341-
{
342-
'spark.executor.memory': '4g',
343-
'spark.executor.cores': '2',
344-
'spark.executor.instances': '600',
345-
'spark.kubernetes.executor.limit.cores': '2',
346-
'spark.kubernetes.allocation.batch.size': '512',
347-
'spark.scheduler.maxRegisteredResourcesWaitingTime': '35min',
348-
},
349-
),
350-
# dynamic resource allocation disabled with instances not specified
351-
(
352-
'kubernetes',
353-
{
354-
'spark.executor.cores': '4',
355-
'spark.cores.max': '128',
356-
},
357-
{
358-
'spark.executor.memory': '4g',
359-
'spark.executor.cores': '4',
360-
'spark.executor.instances': '32',
361-
'spark.kubernetes.executor.limit.cores': '4',
362-
'spark.kubernetes.allocation.batch.size': '512',
363-
'spark.scheduler.maxRegisteredResourcesWaitingTime': '16min',
364-
},
365-
),
366299
# k8s allocation batch size not specified
367300
(
368301
'kubernetes',
@@ -531,6 +464,87 @@ def test_adjust_spark_requested_resources_error(
531464
with pytest.raises(ValueError):
532465
spark_config._adjust_spark_requested_resources(spark_opts, cluster_manager, pool)
533466

467+
@pytest.mark.parametrize(
468+
'user_spark_opts,expected_output', [
469+
# dynamic resource allocation enabled
470+
(
471+
{
472+
'spark.dynamicAllocation.enabled': 'true',
473+
},
474+
{
475+
'spark.dynamicAllocation.enabled': 'true',
476+
'spark.dynamicAllocation.shuffleTracking.enabled': 'true',
477+
'spark.dynamicAllocation.executorAllocationRatio': '0.8',
478+
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s',
479+
'spark.dynamicAllocation.minExecutors': '0',
480+
'spark.dynamicAllocation.maxExecutors': '2',
481+
'spark.executor.instances': '0',
482+
},
483+
),
484+
(
485+
{
486+
'spark.dynamicAllocation.enabled': 'true',
487+
'spark.dynamicAllocation.maxExecutors': '512',
488+
'spark.dynamicAllocation.minExecutors': '128',
489+
'spark.dynamicAllocation.initialExecutors': '128',
490+
},
491+
{
492+
'spark.dynamicAllocation.enabled': 'true',
493+
'spark.dynamicAllocation.maxExecutors': '512',
494+
'spark.dynamicAllocation.minExecutors': '128',
495+
'spark.dynamicAllocation.initialExecutors': '128',
496+
'spark.dynamicAllocation.shuffleTracking.enabled': 'true',
497+
'spark.dynamicAllocation.executorAllocationRatio': '0.8',
498+
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s',
499+
'spark.executor.instances': '128',
500+
},
501+
),
502+
(
503+
{
504+
'spark.dynamicAllocation.enabled': 'true',
505+
'spark.executor.instances': '821',
506+
},
507+
{
508+
'spark.dynamicAllocation.enabled': 'true',
509+
'spark.dynamicAllocation.maxExecutors': '821',
510+
'spark.dynamicAllocation.minExecutors': '205',
511+
'spark.dynamicAllocation.shuffleTracking.enabled': 'true',
512+
'spark.dynamicAllocation.executorAllocationRatio': '0.8',
513+
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s',
514+
'spark.executor.instances': '205',
515+
},
516+
),
517+
# dynamic resource allocation disabled explicitly
518+
(
519+
{
520+
'spark.dynamicAllocation.enabled': 'false',
521+
'spark.executor.instances': '600',
522+
},
523+
{
524+
'spark.dynamicAllocation.enabled': 'false',
525+
'spark.executor.instances': '600',
526+
},
527+
),
528+
# dynamic resource allocation not specified
529+
(
530+
{
531+
'spark.executor.instances': '606',
532+
},
533+
{
534+
'spark.executor.instances': '606',
535+
},
536+
),
537+
],
538+
)
539+
def test_get_dra_configs(
540+
self,
541+
user_spark_opts,
542+
expected_output,
543+
):
544+
output = spark_config._get_dra_configs(user_spark_opts)
545+
for key in expected_output.keys():
546+
assert output[key] == expected_output[key], f'wrong value for {key}'
547+
534548
@pytest.mark.parametrize(
535549
'user_spark_opts,aws_creds,expected_output', [
536550
# user specified to disable

0 commit comments

Comments
 (0)