@@ -454,6 +454,7 @@ def get_dra_configs(self, spark_opts: Dict[str, str]) -> Dict[str, str]:
454454 return spark_opts
455455
456456 spark_app_name = spark_opts .get ('spark.app.name' , '' )
457+ is_jupyterhub = _is_jupyterhub_job (spark_app_name )
457458
458459 log .info (
459460 TextColors .yellow (
@@ -470,7 +471,7 @@ def get_dra_configs(self, spark_opts: Dict[str, str]) -> Dict[str, str]:
470471 )
471472 cached_executor_idle_timeout = self .default_spark_srv_conf ['spark.dynamicAllocation.cachedExecutorIdleTimeout' ]
472473 if 'spark.dynamicAllocation.cachedExecutorIdleTimeout' not in spark_opts :
473- if _is_jupyterhub_job ( spark_app_name ) :
474+ if is_jupyterhub :
474475 # increase cachedExecutorIdleTimeout by 15 minutes in case of Jupyterhub
475476 cached_executor_idle_timeout = str (int (cached_executor_idle_timeout [:- 1 ]) + 900 ) + 's'
476477 log .info (
@@ -486,57 +487,21 @@ def get_dra_configs(self, spark_opts: Dict[str, str]) -> Dict[str, str]:
486487 cached_executor_idle_timeout ,
487488 )
488489
489- min_ratio_executors = None
490- default_dra_min_executor_ratio = self .default_spark_srv_conf ['spark.yelp.dra.minExecutorRatio' ]
491- if 'spark.dynamicAllocation.minExecutors' not in spark_opts :
492- # the ratio of total executors to be used as minExecutors
493- min_executor_ratio = spark_opts .get ('spark.yelp.dra.minExecutorRatio' , default_dra_min_executor_ratio )
494- # set minExecutors default as a ratio of spark.executor.instances
495- num_instances = int (
496- spark_opts .get (
497- 'spark.executor.instances' ,
498- self .default_spark_srv_conf ['spark.executor.instances' ],
499- ),
500- )
501- min_executors = int (num_instances * float (min_executor_ratio ))
502- # minExecutors should not be more than initialExecutors
503- if 'spark.dynamicAllocation.initialExecutors' in spark_opts :
504- min_executors = min (min_executors , int (spark_opts ['spark.dynamicAllocation.initialExecutors' ]))
505- # minExecutors should not be more than maxExecutors
506- if 'spark.dynamicAllocation.maxExecutors' in spark_opts :
507- min_executors = min (
508- min_executors , int (int (spark_opts ['spark.dynamicAllocation.maxExecutors' ]) *
509- float (min_executor_ratio )),
510- )
511-
512- min_ratio_executors = min_executors
490+ # Min executors
491+ min_executors = int (spark_opts .get ('spark.dynamicAllocation.minExecutors' , 0 ))
513492
514- warn_msg = f'\n Setting { TextColors .yellow ("spark.dynamicAllocation.minExecutors" )} as'
515-
516- # set minExecutors equal to 0 for Jupyter Spark sessions
517- if _is_jupyterhub_job (spark_app_name ):
518- min_executors = 0
519- warn_msg = (
520- f'Looks like you are launching Spark session from a Jupyter notebook. '
521- f'{ warn_msg } { min_executors } to save spark costs when any spark action is not running'
522- )
523- else :
524- warn_msg = f'{ warn_msg } { min_executors } '
525-
526- spark_opts ['spark.dynamicAllocation.minExecutors' ] = str (min_executors )
493+ # Initial executors for Jupyter
494+ initial_executors = int (spark_opts .get ('spark.dynamicAllocation.initialExecutors' , min_executors ))
495+ if is_jupyterhub and 'spark.dynamicAllocation.initialExecutors' not in spark_opts :
496+ initial_executors = int (spark_opts .get ('spark.dynamicAllocation.minExecutors' , 0 ))
497+ spark_opts ['spark.dynamicAllocation.initialExecutors' ] = str (initial_executors )
527498 log .info (
528- f'\n { warn_msg } . If you wish to change the value of minimum executors, please provide '
529- f'the exact value of spark.dynamicAllocation.minExecutors in your spark args\n ' ,
499+ f'\n Setting { TextColors .yellow ("spark.dynamicAllocation.initialExecutors" )} as { initial_executors } . '
500+ f'If you wish to change the value of initial executors, please provide the exact value of '
501+ f'spark.dynamicAllocation.initialExecutors in your spark args\n ' ,
530502 )
531503
532- if not _is_jupyterhub_job (spark_app_name ) and 'spark.yelp.dra.minExecutorRatio' not in spark_opts :
533- log .debug (
534- f'\n spark.yelp.dra.minExecutorRatio not provided. This specifies the ratio of total executors '
535- f'to be used as minimum executors for Dynamic Resource Allocation. More info: y/spark-dra. '
536- f'Using default ratio: { default_dra_min_executor_ratio } . If you wish to change this value, '
537- f'please provide the desired spark.yelp.dra.minExecutorRatio in your spark args\n ' ,
538- )
539-
504+ # Max executors
540505 if 'spark.dynamicAllocation.maxExecutors' not in spark_opts :
541506 # set maxExecutors default equal to spark.executor.instances
542507 max_executors = int (
@@ -546,34 +511,18 @@ def get_dra_configs(self, spark_opts: Dict[str, str]) -> Dict[str, str]:
546511 ),
547512 )
548513 # maxExecutors should not be less than initialExecutors
549- if 'spark.dynamicAllocation.initialExecutors' in spark_opts :
550- max_executors = max (max_executors , int (spark_opts ['spark.dynamicAllocation.initialExecutors' ]))
551-
514+ max_executors = max (max_executors , initial_executors )
552515 spark_opts ['spark.dynamicAllocation.maxExecutors' ] = str (max_executors )
553516 log .info (
554517 f'\n Setting { TextColors .yellow ("spark.dynamicAllocation.maxExecutors" )} as { max_executors } . '
555518 f'If you wish to change the value of maximum executors, please provide the exact value of '
556519 f'spark.dynamicAllocation.maxExecutors in your spark args\n ' ,
557520 )
558521
559- # TODO: add regex to better match Jupyterhub Spark session app name
560- if 'jupyterhub' in spark_app_name and 'spark.dynamicAllocation.initialExecutors' not in spark_opts :
561- if min_ratio_executors is not None :
562- # set initialExecutors default equal to minimum executors calculated above using
563- # 'spark.yelp.dra.minExecutorRatio' and `default_dra_min_executor_ratio` for Jupyter Spark sessions
564- initial_executors = min_ratio_executors
565- else :
566- # otherwise set initial executors equal to minimum executors
567- initial_executors = int (spark_opts ['spark.dynamicAllocation.minExecutors' ])
568-
569- spark_opts ['spark.dynamicAllocation.initialExecutors' ] = str (initial_executors )
570- log .info (
571- f'\n Setting { TextColors .yellow ("spark.dynamicAllocation.initialExecutors" )} as { initial_executors } . '
572- f'If you wish to change the value of initial executors, please provide the exact value of '
573- f'spark.dynamicAllocation.initialExecutors in your spark args\n ' ,
574- )
575-
576- spark_opts ['spark.executor.instances' ] = spark_opts ['spark.dynamicAllocation.minExecutors' ]
522+ # We uses spark.executor.instances to define maxExecutors, however
523+ # In Saprk: if 'spark.executor.instances' > initialExecutors, it'll be used as the initialExecutors
524+ # Set 'spark.executor.instances' to initialExecutors to avoid this
525+ spark_opts ['spark.executor.instances' ] = str (initial_executors )
577526 return spark_opts
578527
579528 def _cap_executor_resources (
0 commit comments