Skip to content

Commit 6824578

Browse files
committed
make copy_input_file async
1 parent 93a3004 commit 6824578

File tree

2 files changed

+11
-11
lines changed

2 files changed

+11
-11
lines changed

jupyter_scheduler/scheduler.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
from jupyter_scheduler.orm import Job, JobDefinition, create_session
4242
from jupyter_scheduler.utils import (
4343
copy_directory,
44+
copy_file,
4445
create_output_directory,
4546
create_output_filename,
47+
spawn_process,
4648
)
4749

4850

@@ -422,11 +424,9 @@ def db_session(self):
422424
return self._db_session
423425

424426
def copy_input_file(self, input_uri: str, copy_to_path: str):
425-
"""Copies the input file to the staging directory"""
427+
"""Copies the input file to the staging directory in a new process."""
426428
input_filepath = os.path.join(self.root_dir, input_uri)
427-
with fsspec.open(input_filepath) as input_file:
428-
with fsspec.open(copy_to_path, "wb") as output_file:
429-
output_file.write(input_file.read())
429+
spawn_process(target=copy_file, input_filepath=input_filepath, copy_to_path=copy_to_path)
430430

431431
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
432432
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
@@ -478,13 +478,6 @@ def create_job(self, model: CreateJob) -> str:
478478
else:
479479
self.copy_input_file(model.input_uri, staging_paths["input"])
480480

481-
# The MP context forces new processes to not be forked on Linux.
482-
# This is necessary because `asyncio.get_event_loop()` is bugged in
483-
# forked processes in Python versions below 3.12. This method is
484-
# called by `jupyter_core` by `nbconvert` in the default executor.
485-
#
486-
# See: https://github.com/python/cpython/issues/66285
487-
# See also: https://github.com/jupyter/jupyter_core/pull/362
488481
mp_ctx = mp.get_context("spawn")
489482
p = mp_ctx.Process(
490483
target=self.execution_manager_class(

jupyter_scheduler/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ def copy_directory(
116116
return copied_files
117117

118118

119+
def copy_file(input_filepath: str, copy_to_path: str):
120+
"""Copies the file at input_filepath to copy_to_path"""
121+
with fsspec.open(input_filepath) as input_file:
122+
with fsspec.open(copy_to_path, "wb") as output_file:
123+
output_file.write(input_file.read())
124+
125+
119126
def spawn_process(target, *args, **kwargs) -> SpawnProcess:
120127
"""
121128
Spawns a new process using the 'spawn' context with the given target and

0 commit comments

Comments
 (0)