|
4 | 4 | import tarfile |
5 | 5 | import traceback |
6 | 6 | from abc import ABC, abstractmethod |
7 | | -from typing import Dict, Optional |
| 7 | +from typing import Dict, List, Optional |
8 | 8 |
|
9 | 9 | import fsspec |
10 | 10 | import nbconvert |
|
14 | 14 | from jupyter_scheduler.models import DescribeJob, JobFeature, Status |
15 | 15 | from jupyter_scheduler.orm import Job, create_session |
16 | 16 | from jupyter_scheduler.parameterize import add_parameters |
17 | | -from jupyter_scheduler.utils import copy_file, get_utc_timestamp |
| 17 | +from jupyter_scheduler.utils import copy_directory, copy_file, get_utc_timestamp |
18 | 18 |
|
19 | 19 |
|
20 | 20 | class ExecutionManager(ABC): |
@@ -110,11 +110,28 @@ def before_start(self): |
110 | 110 | self.copy_input(self.input_uri, self.staging_paths["input"]) |
111 | 111 |
|
112 | 112 | def copy_input(self, input_uri: str, copy_to_path: str): |
| 113 | + """Copies the input file or input file and input folder to the staging directory""" |
113 | 114 | if self.package_input_folder: |
114 | | - self.copy_input_folder(input_uri, copy_to_path) |
| 115 | + copied_files = self.copy_input_folder(input_uri, copy_to_path) |
| 116 | + input_notebook_filename = os.path.basename(input_uri) |
| 117 | + with self.db_session() as session: |
| 118 | + job = session.query(Job).filter(Job.job_id == self.model.job_id).one() |
| 119 | + job.packaged_files = [ |
| 120 | + file for file in copied_files if file != input_notebook_filename |
| 121 | + ] |
| 122 | + session.commit() |
115 | 123 | else: |
116 | 124 | self.copy_input_file(input_uri, copy_to_path) |
117 | 125 |
|
| 126 | + def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]: |
| 127 | + """Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory""" |
| 128 | + input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri)) |
| 129 | + staging_dir = os.path.dirname(nb_copy_to_path) |
| 130 | + return copy_directory( |
| 131 | + source_dir=input_dir_path, |
| 132 | + destination_dir=staging_dir, |
| 133 | + ) |
| 134 | + |
118 | 135 | def copy_input_file(self, input_uri: str, copy_to_path: str): |
119 | 136 | """Copies the input file to the staging directory in a new process.""" |
120 | 137 | input_filepath = os.path.join(self.root_dir, input_uri) |
|
0 commit comments