Skip to content

Commit b528539

Browse files
authored
Merge pull request #68 from Yelp/gcoll_COREML-2570_paasta_spark_run_check_k8s_volumes
[COREML-2570] fix k8s volumes being mounted twice
2 parents e72b2b3 + d7ce285 commit b528539

File tree

3 files changed

+73
-15
lines changed

3 files changed

+73
-15
lines changed

service_configuration_lib/spark_config.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import functools
12
import hashlib
3+
import itertools
24
import json
35
import logging
46
import os
@@ -63,11 +65,6 @@
6365
'spark.kubernetes.executor.label.paasta.yelp.com/cluster',
6466
}
6567
K8S_AUTH_FOLDER = '/etc/pki/spark'
66-
DEFAULT_SPARK_K8S_SECRET_VOLUME = {
67-
'hostPath': K8S_AUTH_FOLDER,
68-
'containerPath': K8S_AUTH_FOLDER,
69-
'mode': 'RO',
70-
}
7168

7269
log = logging.Logger(__name__)
7370

@@ -173,19 +170,37 @@ def _get_k8s_docker_volumes_conf(
173170
volumes: Optional[List[Mapping[str, str]]] = None,
174171
):
175172
env = {}
173+
mounted_volumes = set()
176174
k8s_volumes = volumes or []
177-
k8s_volumes.append(DEFAULT_SPARK_K8S_SECRET_VOLUME)
175+
k8s_volumes.append({'containerPath': K8S_AUTH_FOLDER, 'hostPath': K8S_AUTH_FOLDER, 'mode': 'RO'})
178176
k8s_volumes.append({'containerPath': '/etc/passwd', 'hostPath': '/etc/passwd', 'mode': 'RO'})
179177
k8s_volumes.append({'containerPath': '/etc/group', 'hostPath': '/etc/group', 'mode': 'RO'})
180-
for volume_name, volume in enumerate(k8s_volumes):
181-
env[f'spark.kubernetes.executor.volumes.hostPath.{volume_name}.mount.path'] = volume['containerPath']
182-
env[f'spark.kubernetes.executor.volumes.hostPath.{volume_name}.mount.readOnly'] = (
183-
'true' if volume['mode'].lower() == 'ro' else 'false'
184-
)
185-
env[f'spark.kubernetes.executor.volumes.hostPath.{volume_name}.options.path'] = volume['hostPath']
178+
_get_k8s_volume = functools.partial(_get_k8s_volume_hostpath_dict, count=itertools.count())
179+
180+
for volume in k8s_volumes:
181+
host_path, container_path, mode = volume['hostPath'], volume['containerPath'], volume['mode']
182+
if os.path.exists(host_path) and host_path not in mounted_volumes:
183+
env.update(_get_k8s_volume(host_path, container_path, mode))
184+
mounted_volumes.add(host_path)
185+
else:
186+
log.warning(
187+
f'Path {host_path} does not exist on this host or it has already been mounted.'
188+
' Skipping this bindings.',
189+
)
186190
return env
187191

188192

193+
def _get_k8s_volume_hostpath_dict(host_path: str, container_path: str, mode: str, count: itertools.count):
194+
volume_name = next(count)
195+
return {
196+
f'spark.kubernetes.executor.volumes.hostPath.{volume_name}.mount.path': container_path,
197+
f'spark.kubernetes.executor.volumes.hostPath.{volume_name}.options.path': host_path,
198+
f'spark.kubernetes.executor.volumes.hostPath.{volume_name}.mount.readOnly': (
199+
'true' if mode.lower() == 'ro' else 'false'
200+
),
201+
}
202+
203+
189204
def _append_sql_shuffle_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str, str]:
190205
if 'spark.sql.shuffle.partitions' in spark_opts:
191206
return spark_opts

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
setup(
1919
name='service-configuration-lib',
20-
version='2.5.7',
20+
version='2.5.8',
2121
provides=['service_configuration_lib'],
2222
description='Start, stop, and inspect Yelp SOA services',
2323
url='https://github.com/Yelp/service_configuration_lib',

tests/spark_config_test.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import functools
2+
import itertools
13
import json
24
import os
35
from unittest import mock
@@ -165,7 +167,9 @@ def mock_paasta_volumes(self, monkeypatch, tmpdir):
165167

166168
@pytest.fixture
167169
def mock_existed_files(self, mock_paasta_volumes):
168-
existed_files = [v.split(':')[0] for v in mock_paasta_volumes] + ['/host/file1', '/host/file2', '/host/file3']
170+
existed_files = [v.split(':')[0] for v in mock_paasta_volumes] + [
171+
'/host/file1', '/host/file2', '/host/file3', '/etc/pki/spark', '/etc/group', '/etc/passwd',
172+
]
169173
with mock.patch('os.path.exists', side_effect=lambda f: f in existed_files):
170174
yield existed_files
171175

@@ -220,6 +224,45 @@ def test_get_mesos_docker_volumes_conf(
220224
)
221225
assert sorted(output[validate_key].split(',')) == sorted(set(expected_volumes))
222226

227+
def test_get_k8s_volume_hostpath_dict(self):
228+
assert spark_config._get_k8s_volume_hostpath_dict(
229+
'/host/file1', '/container/file1', 'RO', itertools.count(),
230+
) == {
231+
'spark.kubernetes.executor.volumes.hostPath.0.mount.path': '/container/file1',
232+
'spark.kubernetes.executor.volumes.hostPath.0.options.path': '/host/file1',
233+
'spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly': 'true',
234+
}
235+
236+
@pytest.mark.parametrize(
237+
'volumes', [
238+
None,
239+
[
240+
{'hostPath': '/host/file1', 'containerPath': '/containter/file1', 'mode': 'RO'},
241+
{'hostPath': '/host/file2', 'containerPath': '/containter/file2', 'mode': 'RO'},
242+
{'hostPath': '/host/paasta1', 'containerPath': '/container/paasta1', 'mode': 'RO'},
243+
],
244+
],
245+
)
246+
@pytest.mark.usefixtures('mock_existed_files')
247+
def test_get_k8s_docker_volumes_conf(self, volumes):
248+
expected_volumes = {}
249+
250+
_get_k8s_volume = functools.partial(spark_config._get_k8s_volume_hostpath_dict, count=itertools.count())
251+
if volumes:
252+
for volume in volumes:
253+
expected_volumes.update(
254+
_get_k8s_volume(volume['hostPath'], volume['containerPath'], volume['mode']),
255+
)
256+
257+
expected_volumes.update({
258+
**_get_k8s_volume('/etc/pki/spark', '/etc/pki/spark', 'ro'),
259+
**_get_k8s_volume('/etc/passwd', '/etc/passwd', 'ro'),
260+
**_get_k8s_volume('/etc/group', '/etc/group', 'ro'),
261+
})
262+
263+
output = spark_config._get_k8s_docker_volumes_conf(volumes)
264+
assert output == expected_volumes
265+
223266
@pytest.fixture
224267
def mock_account_id(self, tmpdir, monkeypatch):
225268
def get_client(service_name, **kwargs):
@@ -744,7 +787,7 @@ def test_get_spark_conf_mesos(
744787
@pytest.fixture
745788
def assert_kubernetes_conf(self):
746789
expected_output = {
747-
'spark.master': f'k8s://https://k8s.paasta-{self.cluster}.yelp:16443',
790+
'spark.master': f'k8s://https://k8s.{self.cluster}.paasta:6443',
748791
'spark.executorEnv.PAASTA_SERVICE': self.service,
749792
'spark.executorEnv.PAASTA_INSTANCE': self.instance,
750793
'spark.executorEnv.PAASTA_CLUSTER': self.cluster,

0 commit comments

Comments
 (0)