Skip to content

Commit 5427d4d

Browse files
committed
move job input file copying logic to executors
1 parent 7998afc commit 5427d4d

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

jupyter_scheduler/executors.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import tarfile
55
import traceback
66
from abc import ABC, abstractmethod
7-
from typing import Dict
7+
from typing import Dict, Optional
88

99
import fsspec
1010
import nbconvert
@@ -14,7 +14,7 @@
1414
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1515
from jupyter_scheduler.orm import Job, create_session
1616
from jupyter_scheduler.parameterize import add_parameters
17-
from jupyter_scheduler.utils import get_utc_timestamp
17+
from jupyter_scheduler.utils import copy_file, get_utc_timestamp
1818

1919

2020
class ExecutionManager(ABC):
@@ -29,11 +29,21 @@ class ExecutionManager(ABC):
2929
_model = None
3030
_db_session = None
3131

32-
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
32+
def __init__(
33+
self,
34+
job_id: str,
35+
root_dir: str,
36+
db_url: str,
37+
staging_paths: Dict[str, str],
38+
input_uri: Optional[str],
39+
package_input_folder: Optional[bool],
40+
):
3341
self.job_id = job_id
3442
self.staging_paths = staging_paths
3543
self.root_dir = root_dir
3644
self.db_url = db_url
45+
self.input_uri = input_uri
46+
self.package_input_folder = package_input_folder
3747

3848
@property
3949
def model(self):
@@ -98,6 +108,17 @@ def before_start(self):
98108
)
99109
session.commit()
100110

111+
def copy_input(self, input_uri: str, copy_to_path: str):
112+
if self.package_input_folder:
113+
self.copy_input_folder(input_uri, copy_to_path)
114+
else:
115+
self.copy_input_file(input_uri, copy_to_path)
116+
117+
def copy_input_file(self, input_uri: str, copy_to_path: str):
118+
"""Copies the input file to the staging directory in a new process."""
119+
input_filepath = os.path.join(self.root_dir, input_uri)
120+
copy_file(input_filepath=input_filepath, copy_to_path=copy_to_path)
121+
101122
def on_failure(self, e: Exception):
102123
"""Called after failure of execute"""
103124
job = self.model

jupyter_scheduler/scheduler.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -423,11 +423,6 @@ def db_session(self):
423423

424424
return self._db_session
425425

426-
def copy_input_file(self, input_uri: str, copy_to_path: str):
427-
"""Copies the input file to the staging directory in a new process."""
428-
input_filepath = os.path.join(self.root_dir, input_uri)
429-
spawn_process(target=copy_file, input_filepath=input_filepath, copy_to_path=copy_to_path)
430-
431426
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
432427
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
433428
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
@@ -468,22 +463,15 @@ def create_job(self, model: CreateJob) -> str:
468463
session.commit()
469464

470465
staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
471-
if model.package_input_folder:
472-
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
473-
input_notebook_filename = os.path.basename(model.input_uri)
474-
job.packaged_files = [
475-
file for file in copied_files if file != input_notebook_filename
476-
]
477-
session.commit()
478-
else:
479-
self.copy_input_file(model.input_uri, staging_paths["input"])
480466

481467
p = spawn_process(
482468
target=self.execution_manager_class(
483469
job_id=job.job_id,
484470
staging_paths=staging_paths,
485471
root_dir=self.root_dir,
486472
db_url=self.db_url,
473+
input_uri=model.input_uri,
474+
package_input_folder=model.package_input_folder,
487475
).process
488476
)
489477

0 commit comments

Comments
 (0)