Skip to content

Commit 1c042a2

Browse files
authored
Fix jupyter spark memory calculation with DRA (#156)
1 parent 78b5484 commit 1c042a2

File tree

3 files changed

+99
-85
lines changed

3 files changed

+99
-85
lines changed

service_configuration_lib/spark_config.py

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,12 +1235,28 @@ def parse_memory_string(memory_string: Optional[str]) -> int:
12351235
)
12361236

12371237

1238-
def compute_requested_memory_overhead(spark_opts: Mapping[str, str], executor_memory):
1239-
return max(
1238+
def get_spark_executor_memory_overhead_mb(spark_opts: Mapping[str, str], executor_memory) -> float:
1239+
"""Return memory overhead in MB."""
1240+
# By default, Spark adds an overhead of 10% of the executor memory, with a
1241+
# minimum of 384mb
1242+
min_mem_overhead = 384
1243+
default_overhead_factor = 0.1
1244+
1245+
memory_overhead = max(
12401246
parse_memory_string(spark_opts.get('spark.executor.memoryOverhead')),
12411247
parse_memory_string(spark_opts.get('spark.mesos.executor.memoryOverhead')),
1242-
float(spark_opts.get('spark.kubernetes.memoryOverheadFactor', 0)) * executor_memory,
12431248
)
1249+
if memory_overhead:
1250+
return float(max(memory_overhead, min_mem_overhead))
1251+
else:
1252+
memory_overhead_factor = (
1253+
spark_opts.get('spark.executor.memoryOverheadFactor') or
1254+
spark_opts.get('spark.kubernetes.memoryOverheadFactor') or
1255+
spark_opts.get('spark.mesos.executor.memoryOverheadFactor') or
1256+
default_overhead_factor
1257+
)
1258+
calculated_overhead = float(memory_overhead_factor) * executor_memory
1259+
return float(max(calculated_overhead, min_mem_overhead))
12441260

12451261

12461262
def get_grafana_url(spark_conf: Mapping[str, str]) -> str:
@@ -1253,32 +1269,21 @@ def get_grafana_url(spark_conf: Mapping[str, str]) -> str:
12531269

12541270

12551271
def get_resources_requested(spark_opts: Mapping[str, str]) -> Mapping[str, int]:
1272+
dra_enabled = str(spark_opts.get('spark.dynamicAllocation.enabled')).lower() == 'true'
12561273
num_executors = (
1257-
# spark on k8s directly configure num instances
1258-
int(spark_opts.get('spark.executor.instances', 0)) or
1259-
# spark on mesos use cores.max and executor.core to calculate number of
1260-
# executors.
1261-
int(spark_opts.get('spark.cores.max', 0)) // int(spark_opts.get('spark.executor.cores', 0))
1262-
)
1263-
num_cpus = (
1264-
# spark on k8s
1265-
int(spark_opts.get('spark.executor.instances', 0)) * int(spark_opts.get('spark.executor.cores', 0)) or
1266-
# spark on mesos
1267-
int(spark_opts.get('spark.cores.max', 0))
1274+
int(spark_opts.get('spark.dynamicAllocation.maxExecutors', 0)) if dra_enabled
1275+
else
1276+
int(spark_opts.get('spark.executor.instances', 0))
12681277
)
1278+
num_cpus = num_executors * int(spark_opts.get('spark.executor.cores', 0))
12691279
num_gpus = int(spark_opts.get('spark.mesos.gpus.max', 0))
12701280

12711281
executor_memory = parse_memory_string(spark_opts.get('spark.executor.memory', ''))
1272-
requested_memory = compute_requested_memory_overhead(spark_opts, executor_memory)
1273-
# by default, spark adds an overhead of 10% of the executor memory, with a
1274-
# minimum of 384mb
1275-
memory_overhead: int = (
1276-
requested_memory
1277-
if requested_memory > 0
1278-
else max(384, int(0.1 * executor_memory))
1279-
)
1280-
total_memory = (executor_memory + memory_overhead) * num_executors
1281-
log.info(f'Requested total memory of {total_memory} MiB')
1282+
executor_memory_overhead = get_spark_executor_memory_overhead_mb(spark_opts, executor_memory)
1283+
total_memory = int((executor_memory + executor_memory_overhead) * num_executors)
1284+
dra_enabled_string = '(DRA enabled)' if dra_enabled else ''
1285+
1286+
log.info(f'Requested total memory of {total_memory} MiB {dra_enabled_string}')
12821287
return {
12831288
'cpus': num_cpus,
12841289
'mem': total_memory,

service_configuration_lib/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def get_runtime_env() -> str:
163163
return 'unknown'
164164

165165

166+
# TODO: merge with spark_config.parse_memory_string
166167
def get_spark_memory_in_unit(mem: str, unit: Literal['k', 'm', 'g', 't']) -> float:
167168
"""
168169
Converts Spark memory to the desired unit.
@@ -182,6 +183,7 @@ def get_spark_memory_in_unit(mem: str, unit: Literal['k', 'm', 'g', 't']) -> flo
182183
return round(memory_unit, 5)
183184

184185

186+
# TODO: use spark_config.parse_memory_string
185187
def get_spark_driver_memory_mb(spark_conf: Dict[str, str]) -> float:
186188
"""
187189
Returns the Spark driver memory in MB.
@@ -194,6 +196,7 @@ def get_spark_driver_memory_mb(spark_conf: Dict[str, str]) -> float:
194196
return SPARK_DRIVER_MEM_DEFAULT_MB
195197

196198

199+
# TODO: merge with spark_config.compute_requested_memory_overhead
197200
def get_spark_driver_memory_overhead_mb(spark_conf: Dict[str, str]) -> float:
198201
"""
199202
Returns the Spark driver memory overhead in bytes.

tests/spark_config_test.py

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,6 +1528,48 @@ def test_parse_memory_string(memory_string, expected_output):
15281528
assert spark_config.parse_memory_string(memory_string) == expected_output
15291529

15301530

1531+
@pytest.mark.parametrize(
1532+
'spark_opts,executor_memory,expected_output',
1533+
[
1534+
# min_memory_overhead
1535+
({}, 1024, 384),
1536+
# default_memory_overhead_factor
1537+
({}, 4096, 409.6),
1538+
# executor_memoryOverhead_configured
1539+
({'spark.executor.memoryOverhead': '1024'}, 4096, 1024),
1540+
# mesos_memoryOverhead_configured
1541+
({'spark.mesos.executor.memoryOverhead': '2048'}, 4096, 2048),
1542+
# kubernetes_memoryOverheadFactor_configured
1543+
({'spark.kubernetes.memoryOverheadFactor': '0.2'}, 4096, int(4096 * 0.2)),
1544+
# multiple_configs_highest_selected
1545+
(
1546+
{
1547+
'spark.executor.memoryOverhead': '1024',
1548+
'spark.mesos.executor.memoryOverhead': '2048',
1549+
'spark.kubernetes.memoryOverheadFactor': '0.2',
1550+
},
1551+
4096,
1552+
2048,
1553+
),
1554+
# default_memory_overhead_small_executor
1555+
({}, 1024, 384),
1556+
],
1557+
ids=[
1558+
'min_memory_overhead',
1559+
'default_memory_overhead_factor',
1560+
'executor_memoryOverhead_configured',
1561+
'mesos_memoryOverhead_configured',
1562+
'kubernetes_memoryOverheadFactor_configured',
1563+
'multiple_configs_highest_selected',
1564+
'default_memory_overhead_small_executor',
1565+
],
1566+
)
1567+
def test_compute_requested_memory_overhead(spark_opts, executor_memory, expected_output):
1568+
result = spark_config.get_spark_executor_memory_overhead_mb(spark_opts, executor_memory)
1569+
assert isinstance(result, float)
1570+
assert int(result) == int(expected_output)
1571+
1572+
15311573
def test_get_grafana_url():
15321574
spark_conf = {
15331575
'spark.executorEnv.PAASTA_CLUSTER': 'test-cluster',
@@ -1544,38 +1586,10 @@ def test_get_grafana_url():
15441586

15451587
@pytest.mark.parametrize(
15461588
'spark_opts,expected_output', [
1547-
# mesos ( 2 instances, not configure memory overhead, default: 384m )
1548-
(
1549-
{
1550-
'spark.cores.max': '10',
1551-
'spark.executor.cores': '5',
1552-
'spark.executor.memory': '2g',
1553-
},
1554-
{
1555-
'cpus': 10,
1556-
'mem': (384 + 2048) * 2,
1557-
'disk': (384 + 2048) * 2,
1558-
'gpus': 0,
1559-
},
1560-
),
1561-
# mesos ( 2 instances, not configure memory overhead, default: 409m )
1562-
(
1563-
{
1564-
'spark.cores.max': '10',
1565-
'spark.executor.cores': '5',
1566-
'spark.executor.memory': '4g',
1567-
},
1568-
{
1569-
'cpus': 10,
1570-
'mem': (409 + 4096) * 2,
1571-
'disk': (409 + 4096) * 2,
1572-
'gpus': 0,
1573-
},
1574-
),
1575-
# mesos ( 2 instances, configure memory overhead)
1589+
# basic_config
15761590
(
15771591
{
1578-
'spark.cores.max': '10',
1592+
'spark.executor.instances': '2',
15791593
'spark.executor.cores': '5',
15801594
'spark.executor.memory': '4g',
15811595
'spark.executor.memoryOverhead': '3072',
@@ -1587,26 +1601,25 @@ def test_get_grafana_url():
15871601
'gpus': 0,
15881602
},
15891603
),
1590-
# mesos ( 2 instances, Duplicate config, choose the higher memory overhead)
1604+
# kubernetes_memory_overhead
15911605
(
15921606
{
1593-
'spark.cores.max': '10',
1607+
'spark.executor.instances': '2',
15941608
'spark.executor.cores': '5',
15951609
'spark.executor.memory': '4g',
1596-
'spark.executor.memoryOverhead': '3072',
1597-
'spark.mesos.executor.memoryOverhead': '4096',
1610+
'spark.kubernetes.memoryOverheadFactor': '0.5',
15981611
},
15991612
{
16001613
'cpus': 10,
1601-
'mem': (4096 + 4096) * 2,
1602-
'disk': (4096 + 4096) * 2,
1614+
'mem': (4096 * 0.5 + 4096) * 2,
1615+
'disk': (4096 * 0.5 + 4096) * 2,
16031616
'gpus': 0,
16041617
},
16051618
),
1606-
# mesos ( 2 instances, configure memory overhead)
1619+
# mesos_memory_overhead
16071620
(
16081621
{
1609-
'spark.cores.max': '10',
1622+
'spark.executor.instances': '2',
16101623
'spark.executor.cores': '5',
16111624
'spark.executor.memory': '4g',
16121625
'spark.mesos.executor.memoryOverhead': '3072',
@@ -1618,10 +1631,11 @@ def test_get_grafana_url():
16181631
'gpus': 0,
16191632
},
16201633
),
1621-
# k8s
1634+
# gpu_enabled
16221635
(
16231636
{
16241637
'spark.executor.instances': '2',
1638+
'spark.mesos.gpus.max': '2',
16251639
'spark.executor.cores': '5',
16261640
'spark.executor.memory': '4g',
16271641
'spark.executor.memoryOverhead': '3072',
@@ -1630,41 +1644,33 @@ def test_get_grafana_url():
16301644
'cpus': 10,
16311645
'mem': (3072 + 4096) * 2,
16321646
'disk': (3072 + 4096) * 2,
1633-
'gpus': 0,
1647+
'gpus': 2,
16341648
},
16351649
),
1636-
# k8s
1650+
# dynamic_allocation_enabled
16371651
(
16381652
{
1639-
'spark.executor.instances': '2',
1653+
'spark.executor.instances': '0',
1654+
'spark.dynamicAllocation.enabled': 'true',
1655+
'spark.dynamicAllocation.maxExecutors': '2',
16401656
'spark.executor.cores': '5',
16411657
'spark.executor.memory': '4g',
16421658
'spark.kubernetes.memoryOverheadFactor': '0.5',
16431659
},
16441660
{
16451661
'cpus': 10,
1646-
'mem': (4096 * 0.5 + 4096) * 2,
1647-
'disk': (4096 * 0.5 + 4096) * 2,
1662+
'mem': (4096 * 1.5) * 2,
1663+
'disk': (4096 * 1.5) * 2,
16481664
'gpus': 0,
16491665
},
16501666
),
1651-
# gpu
1652-
(
1653-
{
1654-
'spark.cores.max': '10',
1655-
'spark.mesos.gpus.max': '2',
1656-
'spark.executor.cores': '5',
1657-
'spark.executor.memory': '4g',
1658-
'spark.executor.memoryOverhead': '3072',
1659-
},
1660-
{
1661-
'cpus': 10,
1662-
'mem': (3072 + 4096) * 2,
1663-
'disk': (3072 + 4096) * 2,
1664-
'gpus': 2,
1665-
},
1666-
1667-
),
1667+
],
1668+
ids=[
1669+
'basic_config',
1670+
'kubernetes_memory_overhead',
1671+
'mesos_memory_overhead',
1672+
'gpu_enabled',
1673+
'dynamic_allocation_enabled',
16681674
],
16691675
)
16701676
def test_get_resources_requested(spark_opts, expected_output):

0 commit comments

Comments
 (0)