Skip to content

Commit 22dd4b2

Browse files
authored
Set AWS creds via Spark Config (#78)
Set AWS creds via Spark Configs
1 parent b89deb4 commit 22dd4b2

File tree

3 files changed

+58
-4
lines changed

3 files changed

+58
-4
lines changed

service_configuration_lib/spark_config.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@
5353
'spark.executorEnv.PAASTA_INSTANCE',
5454
'spark.executorEnv.PAASTA_CLUSTER',
5555
'spark.executorEnv.SPARK_EXECUTOR_DIRS',
56-
'spark.hadoop.fs.s3a.access.key',
57-
'spark.hadoop.fs.s3a.secret.key',
58-
'spark.hadoop.fs.s3a.session.token',
5956
'spark.kubernetes.pyspark.pythonVersion',
6057
'spark.kubernetes.container.image',
6158
'spark.kubernetes.namespace',
@@ -303,6 +300,31 @@ def _append_event_log_conf(
303300
return spark_opts
304301

305302

303+
def _append_aws_credentials_conf(
304+
spark_opts: Dict[str, str],
305+
access_key: Optional[str],
306+
secret_key: Optional[str],
307+
session_token: Optional[str] = None,
308+
) -> Dict[str, str]:
309+
"""It is important that we set the aws creds via the spark configs and not via the AWS
310+
environment variables. See HADOOP-18233 for details
311+
312+
We set both s3a and s3 credentials because s3a is the actual hadoop-aws driver, but our
313+
glue-metatore integration will attempt to use s3 path drivers which are monkeypatched
314+
to use the the s3a driver.
315+
"""
316+
if access_key:
317+
spark_opts['spark.hadoop.fs.s3a.access.key'] = access_key
318+
spark_opts['spark.hadoop.fs.s3.access.key'] = access_key
319+
if secret_key:
320+
spark_opts['spark.hadoop.fs.s3a.secret.key'] = secret_key
321+
spark_opts['spark.hadoop.fs.s3.secret.key'] = secret_key
322+
if session_token:
323+
spark_opts['spark.hadoop.fs.s3a.session.token'] = session_token
324+
spark_opts['spark.hadoop.fs.s3.session.token'] = session_token
325+
return spark_opts
326+
327+
306328
def _adjust_spark_requested_resources(
307329
user_spark_opts: Dict[str, str],
308330
cluster_manager: str,
@@ -727,6 +749,8 @@ def get_spark_conf(
727749

728750
# configure spark Console Progress
729751
spark_conf = _append_spark_config(spark_conf, 'spark.ui.showConsoleProgress', 'true')
752+
753+
spark_conf = _append_aws_credentials_conf(spark_conf, *aws_creds)
730754
return spark_conf
731755

732756

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
setup(
1919
name='service-configuration-lib',
20-
version='2.10.3',
20+
version='2.10.4',
2121
provides=['service_configuration_lib'],
2222
description='Start, stop, and inspect Yelp SOA services',
2323
url='https://github.com/Yelp/service_configuration_lib',

tests/spark_config_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,17 @@ def test_append_console_progress_conf(
643643

644644
assert output[key] == expected_output
645645

646+
def test_append_aws_credentials_conf(self):
647+
output = spark_config._append_aws_credentials_conf(
648+
{},
649+
mock.sentinel.access,
650+
mock.sentinel.secret,
651+
mock.sentinel.token,
652+
)
653+
assert output['spark.hadoop.fs.s3a.access.key'] == mock.sentinel.access
654+
assert output['spark.hadoop.fs.s3a.secret.key'] == mock.sentinel.secret
655+
assert output['spark.hadoop.fs.s3a.session.token'] == mock.sentinel.token
656+
646657
@pytest.fixture
647658
def mock_append_spark_conf_log(self):
648659
return_value = {'spark.logConf': 'true'}
@@ -682,6 +693,19 @@ def mock_append_event_log_conf(self):
682693
with MockConfigFunction('_append_event_log_conf', return_value) as m:
683694
yield m
684695

696+
@pytest.fixture
697+
def mock_append_aws_credentials_conf(self):
698+
return_value = {
699+
'spark.hadoop.fs.s3a.access.key': 'my_key',
700+
'spark.hadoop.fs.s3.access.key': 'my_key',
701+
'spark.hadoop.fs.s3a.secret.key': 'your_key',
702+
'spark.hadoop.fs.s3.secret.key': 'your_key',
703+
'spark.hadoop.fs.s3a.session.token': 'ice_cream',
704+
'spark.hadoop.fs.s3.session.token': 'ice_cream',
705+
}
706+
with MockConfigFunction('_append_aws_credentials_conf', return_value) as m:
707+
yield m
708+
685709
@pytest.fixture
686710
def mock_adjust_spark_requested_resources_mesos(self):
687711
return_value = {
@@ -909,6 +933,7 @@ def test_get_spark_conf_mesos(
909933
extra_docker_params,
910934
mock_get_mesos_docker_volumes_conf,
911935
mock_append_event_log_conf,
936+
mock_append_aws_credentials_conf,
912937
mock_append_sql_shuffle_partitions_conf,
913938
mock_adjust_spark_requested_resources_mesos,
914939
mock_time,
@@ -964,6 +989,7 @@ def test_get_spark_conf_mesos(
964989
list(mock_get_mesos_docker_volumes_conf.return_value.keys()) +
965990
list(mock_adjust_spark_requested_resources_mesos.return_value.keys()) +
966991
list(mock_append_event_log_conf.return_value.keys()) +
992+
list(mock_append_aws_credentials_conf.return_value.keys()) +
967993
list(mock_append_sql_shuffle_partitions_conf.return_value.keys()) +
968994
list(mock_append_spark_conf_log.return_value.keys()) +
969995
list(mock_append_console_progress_conf.return_value.keys()),
@@ -978,6 +1004,7 @@ def test_get_spark_conf_mesos(
9781004
mock_append_event_log_conf.mocker.assert_called_once_with(
9791005
mock.ANY, *aws_creds,
9801006
)
1007+
mock_append_aws_credentials_conf.mocker.assert_called_once_with(mock.ANY, *aws_creds)
9811008
mock_append_sql_shuffle_partitions_conf.mocker.assert_called_once_with(
9821009
mock.ANY,
9831010
)
@@ -1030,6 +1057,7 @@ def tes_leaderst_get_spark_conf_kubernetes(
10301057
spark_opts_from_env,
10311058
ui_port,
10321059
mock_append_event_log_conf,
1060+
mock_append_aws_credentials_conf,
10331061
mock_append_sql_shuffle_partitions_conf,
10341062
mock_adjust_spark_requested_resources_kubernetes,
10351063
mock_time,
@@ -1067,6 +1095,7 @@ def tes_leaderst_get_spark_conf_kubernetes(
10671095
list(other_spark_opts.keys()) +
10681096
list(mock_adjust_spark_requested_resources_kubernetes.return_value.keys()) +
10691097
list(mock_append_event_log_conf.return_value.keys()) +
1098+
list(mock_append_aws_credentials_conf.return_value.keys()) +
10701099
list(mock_append_sql_shuffle_partitions_conf.return_value.keys()),
10711100
)
10721101
assert len(set(output.keys()) - verified_keys) == 0
@@ -1076,6 +1105,7 @@ def tes_leaderst_get_spark_conf_kubernetes(
10761105
mock_append_event_log_conf.mocker.assert_called_once_with(
10771106
mock.ANY, *aws_creds,
10781107
)
1108+
mock_append_aws_credentials_conf.mocker.assert_called_once_with(mock.ANY, *aws_creds)
10791109
mock_append_sql_shuffle_partitions_conf.mocker.assert_called_once_with(
10801110
mock.ANY,
10811111
)

0 commit comments

Comments
 (0)