@@ -918,15 +918,17 @@ def _get_k8s_spark_env(
918
918
volumes : Optional [List [Mapping [str , str ]]],
919
919
paasta_pool : str ,
920
920
service_account_name : Optional [str ] = None ,
921
+ include_self_managed_configs : bool = True ,
922
+ k8s_server_address : Optional [str ] = None ,
921
923
) -> Dict [str , str ]:
922
924
# RFC 1123: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
923
925
# technically only paasta instance can be longer than 63 chars. But we apply the normalization regardless.
924
926
# NOTE: this affects only k8s labels, not the pod names.
925
927
_paasta_cluster = _get_k8s_resource_name_limit_size_with_hash (paasta_cluster )
926
928
_paasta_service = _get_k8s_resource_name_limit_size_with_hash (paasta_service )
927
929
_paasta_instance = _get_k8s_resource_name_limit_size_with_hash (paasta_instance )
930
+
928
931
spark_env = {
929
- 'spark.master' : f'k8s://https://k8s.{ paasta_cluster } .paasta:6443' ,
930
932
'spark.executorEnv.PAASTA_SERVICE' : paasta_service ,
931
933
'spark.executorEnv.PAASTA_INSTANCE' : paasta_instance ,
932
934
'spark.executorEnv.PAASTA_CLUSTER' : paasta_cluster ,
@@ -935,7 +937,6 @@ def _get_k8s_spark_env(
935
937
'spark.kubernetes.pyspark.pythonVersion' : '3' ,
936
938
'spark.kubernetes.container.image' : docker_img ,
937
939
'spark.kubernetes.namespace' : 'paasta-spark' ,
938
- 'spark.kubernetes.container.image.pullPolicy' : 'Always' ,
939
940
'spark.kubernetes.executor.label.yelp.com/paasta_service' : _paasta_service ,
940
941
'spark.kubernetes.executor.label.yelp.com/paasta_instance' : _paasta_instance ,
941
942
'spark.kubernetes.executor.label.yelp.com/paasta_cluster' : _paasta_cluster ,
@@ -954,14 +955,20 @@ def _get_k8s_spark_env(
954
955
'spark.kubernetes.authenticate.serviceAccountName' : service_account_name ,
955
956
},
956
957
)
957
- else :
958
+ elif not include_self_managed_configs :
959
+ spark_env .update ({
960
+ 'spark.master' : f'k8s://{ k8s_server_address } ' ,
961
+ })
962
+ elif include_self_managed_configs :
958
963
spark_env .update (
959
964
{
965
+ 'spark.master' : f'k8s://https://k8s.{ paasta_cluster } .paasta:6443' ,
960
966
'spark.kubernetes.authenticate.caCertFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -ca.crt' ,
961
967
'spark.kubernetes.authenticate.clientKeyFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -client.key' ,
962
968
'spark.kubernetes.authenticate.clientCertFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -client.crt' ,
963
969
},
964
970
)
971
+
965
972
return spark_env
966
973
967
974
@@ -1094,6 +1101,8 @@ def get_spark_conf(
1094
1101
docker_img : str ,
1095
1102
aws_creds : Tuple [Optional [str ], Optional [str ], Optional [str ]],
1096
1103
extra_volumes : Optional [List [Mapping [str , str ]]] = None ,
1104
+ use_eks : bool = False ,
1105
+ k8s_server_address : Optional [str ] = None ,
1097
1106
# the follow arguments only being used for mesos
1098
1107
extra_docker_params : Optional [MutableMapping [str , str ]] = None ,
1099
1108
with_secret : bool = True ,
@@ -1201,6 +1210,8 @@ def get_spark_conf(
1201
1210
extra_volumes ,
1202
1211
paasta_pool ,
1203
1212
service_account_name = service_account_name ,
1213
+ include_self_managed_configs = not use_eks ,
1214
+ k8s_server_address = k8s_server_address ,
1204
1215
))
1205
1216
elif cluster_manager == 'local' :
1206
1217
spark_conf .update (_get_local_spark_env (
0 commit comments