Skip to content

Commit 9a5e841

Browse files
CaptainSameSameer SharmaSameer Sharma
authored
COREML-3095 | Prevent auto calculation of executor instances and accurate num_partitions with DRA (#76)
* COREML-3095 | Prevent auto calculation of spark.executor.instances in case DRA is enabled * COREML-3023 | calculate spark.sql.shuffle.partitions accurately with DRA enabled * COREML-3095 | adding unit test cases * COREML-3023 | address review comments * COREML-3023 | bump up version Co-authored-by: Sameer Sharma <[email protected]> Co-authored-by: Sameer Sharma <[email protected]>
1 parent 3c8659b commit 9a5e841

File tree

3 files changed

+142
-4
lines changed

3 files changed

+142
-4
lines changed

service_configuration_lib/spark_config.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
DEFAULT_K8S_BATCH_SIZE = 512
4040
DEFAULT_RESOURCES_WAITING_TIME_PER_EXECUTOR = 2 # seconds
4141
DEFAULT_CLUSTERMAN_OBSERVED_SCALING_TIME = 15 # minutes
42+
DEFAULT_SQL_SHUFFLE_PARTITIONS = 128
4243

4344

4445
NON_CONFIGURABLE_SPARK_OPTS = {
@@ -222,14 +223,32 @@ def _append_sql_shuffle_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str,
222223

223224
num_partitions = 2 * (
224225
int(spark_opts.get('spark.cores.max', 0)) or
225-
int(spark_opts.get('spark.executor.instances', 0)) * int(spark_opts.get('spark.executor.cores', 0))
226+
int(spark_opts.get('spark.executor.instances', 0)) *
227+
int(spark_opts.get('spark.executor.cores', DEFAULT_EXECUTOR_CORES))
226228
)
229+
230+
if (
231+
'spark.dynamicAllocation.enabled' in spark_opts and
232+
str(spark_opts['spark.dynamicAllocation.enabled']) == 'true' and
233+
'spark.dynamicAllocation.maxExecutors' in spark_opts and
234+
str(spark_opts['spark.dynamicAllocation.maxExecutors']) != 'infinity'
235+
):
236+
237+
num_partitions_dra = 2 * (
238+
int(spark_opts.get('spark.dynamicAllocation.maxExecutors', 0)) *
239+
int(spark_opts.get('spark.executor.cores', DEFAULT_EXECUTOR_CORES))
240+
)
241+
num_partitions = max(num_partitions, num_partitions_dra)
242+
243+
num_partitions = num_partitions or DEFAULT_SQL_SHUFFLE_PARTITIONS
244+
227245
log.warning(
228246
f'spark.sql.shuffle.partitions has been set to {num_partitions} '
229247
'to be equal to twice the number of requested cores, but you should '
230248
'consider setting a higher value if necessary.'
231249
' Follow y/spark for help on partition sizing',
232250
)
251+
233252
spark_opts['spark.sql.shuffle.partitions'] = str(num_partitions)
234253
return spark_opts
235254

@@ -298,8 +317,23 @@ def _adjust_spark_requested_resources(
298317
# TODO(gcoll|COREML-2697): Consider cleaning this part of the code up
299318
# once mesos is not longer around at Yelp.
300319
if 'spark.executor.instances' not in user_spark_opts:
301-
executor_instances = int(user_spark_opts.get('spark.cores.max', str(DEFAULT_MAX_CORES))) // executor_cores
302-
user_spark_opts['spark.executor.instances'] = str(executor_instances)
320+
321+
if (
322+
'spark.dynamicAllocation.enabled' not in user_spark_opts or
323+
str(user_spark_opts['spark.dynamicAllocation.enabled']) != 'true'
324+
):
325+
executor_instances = int(user_spark_opts.get(
326+
'spark.cores.max',
327+
str(DEFAULT_MAX_CORES),
328+
)) // executor_cores
329+
user_spark_opts['spark.executor.instances'] = str(executor_instances)
330+
331+
elif (
332+
'spark.dynamicAllocation.enabled' in user_spark_opts and
333+
str(user_spark_opts['spark.dynamicAllocation.enabled']) == 'true'
334+
):
335+
user_spark_opts['spark.executor.instances'] = str(DEFAULT_EXECUTOR_INSTANCES)
336+
303337
if (
304338
'spark.mesos.executor.memoryOverhead' in user_spark_opts and
305339
'spark.executor.memoryOverhead' not in user_spark_opts

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
setup(
1919
name='service-configuration-lib',
20-
version='2.10.1',
20+
version='2.10.2',
2121
provides=['service_configuration_lib'],
2222
description='Start, stop, and inspect Yelp SOA services',
2323
url='https://github.com/Yelp/service_configuration_lib',

tests/spark_config_test.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,73 @@ def gpu_pool(self, tmpdir, monkeypatch):
296296

297297
@pytest.mark.parametrize(
298298
'cluster_manager,user_spark_opts,expected_output', [
299+
# dynamic resource allocation enabled
300+
(
301+
'kubernetes',
302+
{
303+
'spark.dynamicAllocation.enabled': 'true',
304+
'spark.executor.cores': '4',
305+
'spark.cores.max': '128',
306+
},
307+
{
308+
'spark.executor.memory': '4g',
309+
'spark.executor.cores': '4',
310+
'spark.executor.instances': '2',
311+
'spark.kubernetes.executor.limit.cores': '4',
312+
'spark.kubernetes.allocation.batch.size': '2',
313+
'spark.scheduler.maxRegisteredResourcesWaitingTime': '15min',
314+
},
315+
),
316+
(
317+
'kubernetes',
318+
{
319+
'spark.dynamicAllocation.enabled': 'true',
320+
'spark.dynamicAllocation.maxExecutors': '512',
321+
'spark.dynamicAllocation.minExecutors': '128',
322+
'spark.dynamicAllocation.initialExecutors': '128',
323+
'spark.executor.cores': '4',
324+
},
325+
{
326+
'spark.executor.memory': '4g',
327+
'spark.executor.cores': '4',
328+
'spark.executor.instances': '2',
329+
'spark.kubernetes.executor.limit.cores': '4',
330+
'spark.kubernetes.allocation.batch.size': '2',
331+
'spark.scheduler.maxRegisteredResourcesWaitingTime': '15min',
332+
},
333+
),
334+
# dynamic resource allocation disabled with instances specified
335+
(
336+
'kubernetes',
337+
{
338+
'spark.dynamicAllocation.enabled': 'false',
339+
'spark.executor.instances': '600',
340+
},
341+
{
342+
'spark.executor.memory': '4g',
343+
'spark.executor.cores': '2',
344+
'spark.executor.instances': '600',
345+
'spark.kubernetes.executor.limit.cores': '2',
346+
'spark.kubernetes.allocation.batch.size': '512',
347+
'spark.scheduler.maxRegisteredResourcesWaitingTime': '35min',
348+
},
349+
),
350+
# dynamic resource allocation disabled with instances not specified
351+
(
352+
'kubernetes',
353+
{
354+
'spark.executor.cores': '4',
355+
'spark.cores.max': '128',
356+
},
357+
{
358+
'spark.executor.memory': '4g',
359+
'spark.executor.cores': '4',
360+
'spark.executor.instances': '32',
361+
'spark.kubernetes.executor.limit.cores': '4',
362+
'spark.kubernetes.allocation.batch.size': '32',
363+
'spark.scheduler.maxRegisteredResourcesWaitingTime': '16min',
364+
},
365+
),
299366
# use default k8s settings
300367
(
301368
'kubernetes',
@@ -498,6 +565,43 @@ def test_append_event_log_conf(
498565
({'spark.executor.instances': '10', 'spark.executor.cores': '3'}, '60'),
499566
# user defined
500567
({'spark.sql.shuffle.partitions': '300'}, '300'),
568+
# dynamic resource allocation enabled, both maxExecutors and max cores defined
569+
(
570+
{
571+
'spark.dynamicAllocation.enabled': 'true',
572+
'spark.dynamicAllocation.maxExecutors': '128',
573+
'spark.executor.cores': '3',
574+
'spark.cores.max': '10',
575+
},
576+
'768', # max (2 * (max cores), 2 * (maxExecutors * executor cores))
577+
),
578+
# dynamic resource allocation enabled maxExecutors not defined, max cores defined
579+
(
580+
{
581+
'spark.dynamicAllocation.enabled': 'true',
582+
'spark.executor.cores': '3',
583+
'spark.cores.max': '10',
584+
},
585+
'20', # 2 * max cores
586+
),
587+
# dynamic resource allocation enabled maxExecutors not defined, max cores not defined
588+
(
589+
{
590+
'spark.dynamicAllocation.enabled': 'true',
591+
'spark.executor.cores': '3',
592+
},
593+
'128', # DEFAULT_SQL_SHUFFLE_PARTITIONS
594+
),
595+
# dynamic resource allocation enabled maxExecutors infinity
596+
(
597+
{
598+
'spark.dynamicAllocation.enabled': 'true',
599+
'spark.dynamicAllocation.maxExecutors': 'infinity',
600+
'spark.executor.cores': '3',
601+
'spark.cores.max': '10',
602+
},
603+
'20', # 2 * max cores
604+
),
501605
],
502606
)
503607
def test_append_sql_shuffle_partitions_conf(

0 commit comments

Comments
 (0)