Skip to content

Commit f43d576

Browse files
committed
move job input file copying logic to executors
1 parent 7998afc commit f43d576

File tree

2 files changed

+40
-16
lines changed

2 files changed

+40
-16
lines changed

jupyter_scheduler/executors.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import tarfile
55
import traceback
66
from abc import ABC, abstractmethod
7-
from typing import Dict
7+
from typing import Dict, Optional
88

99
import fsspec
1010
import nbconvert
@@ -14,7 +14,7 @@
1414
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1515
from jupyter_scheduler.orm import Job, create_session
1616
from jupyter_scheduler.parameterize import add_parameters
17-
from jupyter_scheduler.utils import get_utc_timestamp
17+
from jupyter_scheduler.utils import copy_file, get_utc_timestamp
1818

1919

2020
class ExecutionManager(ABC):
@@ -29,11 +29,21 @@ class ExecutionManager(ABC):
2929
_model = None
3030
_db_session = None
3131

32-
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
32+
def __init__(
33+
self,
34+
job_id: str,
35+
root_dir: str,
36+
db_url: str,
37+
staging_paths: Dict[str, str],
38+
input_uri: Optional[str],
39+
package_input_folder: Optional[bool],
40+
):
3341
self.job_id = job_id
3442
self.staging_paths = staging_paths
3543
self.root_dir = root_dir
3644
self.db_url = db_url
45+
self.input_uri = input_uri
46+
self.package_input_folder = package_input_folder
3747

3848
@property
3949
def model(self):
@@ -97,6 +107,18 @@ def before_start(self):
97107
{"start_time": get_utc_timestamp(), "status": Status.IN_PROGRESS}
98108
)
99109
session.commit()
110+
self.copy_input(self.input_uri, self.staging_paths["input"])
111+
112+
def copy_input(self, input_uri: str, copy_to_path: str):
113+
if self.package_input_folder:
114+
self.copy_input_folder(input_uri, copy_to_path)
115+
else:
116+
self.copy_input_file(input_uri, copy_to_path)
117+
118+
def copy_input_file(self, input_uri: str, copy_to_path: str):
119+
"""Copies the input file to the staging directory in a new process."""
120+
input_filepath = os.path.join(self.root_dir, input_uri)
121+
copy_file(input_filepath=input_filepath, copy_to_path=copy_to_path)
100122

101123
def on_failure(self, e: Exception):
102124
"""Called after failure of execute"""

jupyter_scheduler/scheduler.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,10 @@ def db_session(self):
423423

424424
return self._db_session
425425

426-
def copy_input_file(self, input_uri: str, copy_to_path: str):
427-
"""Copies the input file to the staging directory in a new process."""
428-
input_filepath = os.path.join(self.root_dir, input_uri)
429-
spawn_process(target=copy_file, input_filepath=input_filepath, copy_to_path=copy_to_path)
426+
# def copy_input_file(self, input_uri: str, copy_to_path: str):
427+
# """Copies the input file to the staging directory in a new process."""
428+
# input_filepath = os.path.join(self.root_dir, input_uri)
429+
# spawn_process(target=copy_file, input_filepath=input_filepath, copy_to_path=copy_to_path)
430430

431431
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
432432
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
@@ -468,22 +468,24 @@ def create_job(self, model: CreateJob) -> str:
468468
session.commit()
469469

470470
staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
471-
if model.package_input_folder:
472-
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
473-
input_notebook_filename = os.path.basename(model.input_uri)
474-
job.packaged_files = [
475-
file for file in copied_files if file != input_notebook_filename
476-
]
477-
session.commit()
478-
else:
479-
self.copy_input_file(model.input_uri, staging_paths["input"])
471+
# if model.package_input_folder:
472+
# copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
473+
# input_notebook_filename = os.path.basename(model.input_uri)
474+
# job.packaged_files = [
475+
# file for file in copied_files if file != input_notebook_filename
476+
# ]
477+
# session.commit()
478+
# else:
479+
# self.copy_input_file(model.input_uri, staging_paths["input"])
480480

481481
p = spawn_process(
482482
target=self.execution_manager_class(
483483
job_id=job.job_id,
484484
staging_paths=staging_paths,
485485
root_dir=self.root_dir,
486486
db_url=self.db_url,
487+
input_uri=model.input_uri,
488+
package_input_folder=model.package_input_folder,
487489
).process
488490
)
489491

0 commit comments

Comments
 (0)