1
+ import base64
1
2
import functools
2
3
import hashlib
3
4
import itertools
34
35
DEFAULT_EXECUTOR_CORES = 2
35
36
DEFAULT_EXECUTOR_INSTANCES = 2
36
37
DEFAULT_EXECUTOR_MEMORY = '4g'
38
+ DEFAULT_K8S_LABEL_LENGTH = 63
37
39
38
40
39
41
NON_CONFIGURABLE_SPARK_OPTS = {
@@ -446,6 +448,12 @@ def _get_k8s_spark_env(
446
448
volumes : Optional [List [Mapping [str , str ]]],
447
449
paasta_pool : str ,
448
450
) -> Dict [str , str ]:
451
+ # RFC 1123: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
452
+ # technically only paasta instance can be longer than 63 chars. But we apply the normalization regardless.
453
+ # NOTE: this affects only k8s labels, not the pod names.
454
+ _paasta_cluster = _get_k8s_resource_name_limit_size_with_hash (paasta_cluster )
455
+ _paasta_service = _get_k8s_resource_name_limit_size_with_hash (paasta_service )
456
+ _paasta_instance = _get_k8s_resource_name_limit_size_with_hash (paasta_instance )
449
457
spark_env = {
450
458
'spark.master' : f'k8s://https://k8s.{ paasta_cluster } .paasta:6443' ,
451
459
'spark.executorEnv.PAASTA_SERVICE' : paasta_service ,
@@ -460,19 +468,37 @@ def _get_k8s_spark_env(
460
468
'spark.kubernetes.authenticate.clientKeyFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -client.key' ,
461
469
'spark.kubernetes.authenticate.clientCertFile' : f'{ K8S_AUTH_FOLDER } /{ paasta_cluster } -client.crt' ,
462
470
'spark.kubernetes.container.image.pullPolicy' : 'Always' ,
463
- 'spark.kubernetes.executor.label.yelp.com/paasta_service' : paasta_service ,
464
- 'spark.kubernetes.executor.label.yelp.com/paasta_instance' : paasta_instance ,
465
- 'spark.kubernetes.executor.label.yelp.com/paasta_cluster' : paasta_cluster ,
466
- 'spark.kubernetes.executor.label.paasta.yelp.com/service' : paasta_service ,
467
- 'spark.kubernetes.executor.label.paasta.yelp.com/instance' : paasta_instance ,
468
- 'spark.kubernetes.executor.label.paasta.yelp.com/cluster' : paasta_cluster ,
471
+ 'spark.kubernetes.executor.label.yelp.com/paasta_service' : _paasta_service ,
472
+ 'spark.kubernetes.executor.label.yelp.com/paasta_instance' : _paasta_instance ,
473
+ 'spark.kubernetes.executor.label.yelp.com/paasta_cluster' : _paasta_cluster ,
474
+ 'spark.kubernetes.executor.label.paasta.yelp.com/service' : _paasta_service ,
475
+ 'spark.kubernetes.executor.label.paasta.yelp.com/instance' : _paasta_instance ,
476
+ 'spark.kubernetes.executor.label.paasta.yelp.com/cluster' : _paasta_cluster ,
469
477
'spark.kubernetes.node.selector.yelp.com/pool' : paasta_pool ,
470
478
'spark.kubernetes.executor.label.yelp.com/pool' : paasta_pool ,
471
479
** _get_k8s_docker_volumes_conf (volumes ),
472
480
}
473
481
return spark_env
474
482
475
483
484
+ def _get_k8s_resource_name_limit_size_with_hash (name : str , limit : int = 63 , suffix : int = 4 ) -> str :
485
+ """ Returns `name` unchanged if it's length does not exceed the `limit`.
486
+ Otherwise, returns truncated `name` with it's hash of size `suffix`
487
+ appended.
488
+
489
+ base32 encoding is chosen as it satisfies the common requirement in
490
+ various k8s names to be alphanumeric.
491
+
492
+ NOTE: This function is the same as paasta/paasta_tools/kubernetes_tools.py
493
+ """
494
+ if len (name ) > limit :
495
+ digest = hashlib .md5 (name .encode ()).digest ()
496
+ hash = base64 .b32encode (digest ).decode ().replace ('=' , '' ).lower ()
497
+ return f'{ name [:(limit - suffix - 1 )]} -{ hash [:suffix ]} '
498
+ else :
499
+ return name
500
+
501
+
476
502
def stringify_spark_env (spark_env : Mapping [str , str ]) -> str :
477
503
return ' ' .join ([f'--conf { k } ={ v } ' for k , v in spark_env .items ()])
478
504
0 commit comments