diff --git a/audbackend/core/backend/artifactory.py b/audbackend/core/backend/artifactory.py index d112a835..206aefc8 100644 --- a/audbackend/core/backend/artifactory.py +++ b/audbackend/core/backend/artifactory.py @@ -209,6 +209,7 @@ def _copy_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Copy file on backend.""" @@ -263,6 +264,7 @@ def _get_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Get file from backend.""" @@ -287,6 +289,7 @@ def _move_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Move file on backend.""" diff --git a/audbackend/core/backend/base.py b/audbackend/core/backend/base.py index 45fb3b7d..38cd13c5 100644 --- a/audbackend/core/backend/base.py +++ b/audbackend/core/backend/base.py @@ -176,6 +176,7 @@ def _copy_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Copy file on backend. @@ -189,7 +190,12 @@ def _copy_file( """ with tempfile.TemporaryDirectory() as tmp: tmp_path = audeer.path(tmp, "~") - tmp_path = self.get_file(src_path, tmp_path, verbose=verbose) + tmp_path = self.get_file( + src_path, + tmp_path, + num_workers=num_workers, + verbose=verbose, + ) self.put_file(tmp_path, dst_path, verbose=verbose) def copy_file( @@ -197,6 +203,7 @@ def copy_file( src_path: str, dst_path: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ): @@ -219,6 +226,7 @@ def copy_file( Args: src_path: source path to file on backend dst_path: destination path to file on backend + num_workers: number of parallel jobs validate: verify file was successfully copied verbose: show debug messages @@ -246,6 +254,7 @@ def copy_file( self._copy_file, src_path, dst_path, + num_workers, verbose, ) @@ -435,6 +444,7 @@ def get_archive( dst_root: str, *, tmp_root: str = None, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ) -> list[str]: @@ -459,6 +469,7 @@ def get_archive( dst_root: local destination directory tmp_root: directory under which archive is temporarily extracted. Defaults to temporary directory of system + num_workers: number of parallel jobs validate: verify archive was successfully retrieved from the backend verbose: show debug messages @@ -496,6 +507,7 @@ def get_archive( self.get_file( src_path, local_archive, + num_workers=num_workers, validate=validate, verbose=verbose, ) @@ -510,6 +522,7 @@ def _get_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): # pragma: no cover r"""Get file from backend.""" @@ -520,6 +533,7 @@ def get_file( src_path: str, dst_path: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ) -> str: @@ -546,6 +560,7 @@ def get_file( Args: src_path: path to file on backend dst_path: destination path to local file + num_workers: number of parallel jobs validate: verify file was successfully retrieved from the backend verbose: show debug messages @@ -594,6 +609,7 @@ def get_file( self._get_file, src_path, tmp_path, + num_workers, verbose, ) audeer.move_file(tmp_path, dst_path) @@ -738,6 +754,7 @@ def _move_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Move file on backend. @@ -749,7 +766,7 @@ def _move_file( if backend supports a native way to move files. """ - self.copy_file(src_path, dst_path, verbose=verbose) + self.copy_file(src_path, dst_path, num_workers=num_workers, verbose=verbose) self.remove_file(src_path) def move_file( @@ -757,6 +774,7 @@ def move_file( src_path: str, dst_path: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ): @@ -783,6 +801,7 @@ def move_file( Args: src_path: source path to file on backend dst_path: destination path to file on backend + num_workers: number of parallel jobs validate: verify file was successfully moved verbose: show debug messages @@ -812,6 +831,7 @@ def move_file( self.copy_file( src_path, dst_path, + num_workers=num_workers, validate=True, verbose=verbose, ) @@ -821,6 +841,7 @@ def move_file( self._move_file, src_path, dst_path, + num_workers, verbose, ) else: diff --git a/audbackend/core/backend/filesystem.py b/audbackend/core/backend/filesystem.py index 32f07258..47207ba0 100644 --- a/audbackend/core/backend/filesystem.py +++ b/audbackend/core/backend/filesystem.py @@ -53,6 +53,7 @@ def _copy_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Copy file on backend.""" @@ -114,6 +115,7 @@ def _get_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Get file from backend.""" @@ -142,6 +144,7 @@ def _move_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Move file on backend.""" diff --git a/audbackend/core/backend/minio.py b/audbackend/core/backend/minio.py index ed180960..0b93fa44 100644 --- a/audbackend/core/backend/minio.py +++ b/audbackend/core/backend/minio.py @@ -2,7 +2,9 @@ import getpass import mimetypes import os +import signal import tempfile +import threading import minio @@ -198,6 +200,7 @@ def _copy_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Copy file on backend.""" @@ -209,7 +212,7 @@ def _copy_file( if self._size(src_path) / 1024 / 1024 / 1024 >= 4.9: with tempfile.TemporaryDirectory() as tmp_dir: tmp_path = audeer.path(tmp_dir, os.path.basename(src_path)) - self._get_file(src_path, tmp_path, verbose) + self._get_file(src_path, tmp_path, num_workers, verbose) self._put_file(tmp_path, dst_path, checksum, verbose) else: self._client.copy_object( @@ -264,35 +267,96 @@ def _get_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Get file from backend.""" src_path = self.path(src_path) src_size = self._client.stat_object(self.repository, src_path).size - chunk = 4 * 1024 - with audeer.progress_bar(total=src_size, disable=not verbose) as pbar: - desc = audeer.format_display_message( - f"Download {os.path.basename(str(src_path))}", pbar=True - ) - pbar.set_description_str(desc) - pbar.refresh() - dst_size = 0 - try: - response = self._client.get_object(self.repository, src_path) - with open(dst_path, "wb") as dst_fp: - while src_size > dst_size: - data = response.read(chunk) - n_data = len(data) - if n_data > 0: - dst_fp.write(data) - dst_size += n_data - pbar.update(n_data) - except Exception as e: # pragma: no cover - raise RuntimeError(f"Error downloading file: {e}") - finally: - response.close() - response.release_conn() + # Create cancellation event for handling interrupts + cancel_event = threading.Event() + + # Install signal handler to set cancel_event on Ctrl+C + def signal_handler(signum, frame): + cancel_event.set() # pragma: no cover + + original_handler = signal.signal(signal.SIGINT, signal_handler) + + # Setup progress bar + desc = audeer.format_display_message( + f"Download {os.path.basename(str(src_path))}", + pbar=verbose, + ) + pbar = audeer.progress_bar(total=src_size, desc=desc, disable=not verbose) + + try: + if num_workers == 1: + # Simple single-threaded download + with pbar: + self._download_file(src_path, dst_path, pbar, cancel_event) + else: + # Multi-threaded download with pre-allocated file + with open(dst_path, "wb") as f: + f.truncate(src_size) + + # Create and run download tasks + tasks = [] + # Ensure num_workers does not exceed src_size + num_workers = min(num_workers, src_size) if src_size > 0 else 1 + chunk_size = src_size // num_workers + for i in range(num_workers): + offset = i * chunk_size + length = chunk_size if i < num_workers - 1 else src_size - offset + tasks.append( + ([src_path, dst_path, pbar, cancel_event, offset, length], {}) + ) + + with pbar: + audeer.run_tasks( + self._download_file, tasks, num_workers=num_workers + ) + except KeyboardInterrupt: + # Clean up partial file + if os.path.exists(dst_path): + os.remove(dst_path) + raise + finally: + # Restore original signal handler + signal.signal(signal.SIGINT, original_handler) + + def _download_file( + self, + src_path: str, + dst_path: str, + pbar, + cancel_event: threading.Event = None, + offset: int = 0, + length: int | None = None, + ): + """Download file or part of file.""" + chunk_size = 4 * 1024 # 4 KB + + # Get the data stream + kwargs = {"offset": offset} + if length is not None: + kwargs["length"] = length + response = self._client.get_object(self.repository, src_path, **kwargs) + + try: + with open(dst_path, "r+b" if offset else "wb") as f: + if offset: + f.seek(offset) + + while data := response.read(chunk_size): + # Check if cancellation was requested + if cancel_event and cancel_event.is_set(): + raise KeyboardInterrupt("Download cancelled by user") + f.write(data) + pbar.update(len(data)) + finally: + response.close() + response.release_conn() def _ls( self, @@ -311,10 +375,11 @@ def _move_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): r"""Move file on backend.""" - self._copy_file(src_path, dst_path, verbose) + self._copy_file(src_path, dst_path, num_workers, verbose) self._remove_file(src_path) def _open( diff --git a/audbackend/core/interface/unversioned.py b/audbackend/core/interface/unversioned.py index 86c6581b..dbe938a5 100644 --- a/audbackend/core/interface/unversioned.py +++ b/audbackend/core/interface/unversioned.py @@ -73,6 +73,7 @@ def copy_file( src_path: str, dst_path: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ): @@ -95,6 +96,7 @@ def copy_file( Args: src_path: source path to file on backend dst_path: destination path to file on backend + num_workers: number of parallel jobs validate: verify file was successfully copied verbose: show debug messages @@ -123,6 +125,7 @@ def copy_file( self.backend.copy_file( src_path, dst_path, + num_workers=num_workers, validate=validate, verbose=verbose, ) @@ -284,6 +287,7 @@ def get_file( src_path: str, dst_path: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ) -> str: @@ -310,6 +314,7 @@ def get_file( Args: src_path: path to file on backend dst_path: destination path to local file + num_workers: number of parallel jobs validate: verify file was successfully retrieved from the backend verbose: show debug messages @@ -342,6 +347,7 @@ def get_file( return self.backend.get_file( src_path, dst_path, + num_workers=num_workers, validate=validate, verbose=verbose, ) @@ -422,6 +428,7 @@ def move_file( src_path: str, dst_path: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ): @@ -448,6 +455,7 @@ def move_file( Args: src_path: source path to file on backend dst_path: destination path to file on backend + num_workers: number of parallel jobs validate: verify file was successfully moved verbose: show debug messages @@ -478,6 +486,7 @@ def move_file( self.backend.move_file( src_path, dst_path, + num_workers=num_workers, validate=validate, verbose=verbose, ) diff --git a/audbackend/core/interface/versioned.py b/audbackend/core/interface/versioned.py index 2cee33bb..e9acdc75 100644 --- a/audbackend/core/interface/versioned.py +++ b/audbackend/core/interface/versioned.py @@ -92,6 +92,7 @@ def copy_file( dst_path: str, *, version: str = None, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ): @@ -118,8 +119,9 @@ def copy_file( Args: src_path: source path to file on backend dst_path: destination path to file on backend - validate: verify file was successfully copied version: version string + num_workers: number of parallel jobs + validate: verify file was successfully copied verbose: show debug messages Raises: @@ -157,6 +159,7 @@ def copy_file( self.backend.copy_file( src_path_with_version, dst_path_with_version, + num_workers=num_workers, validate=validate, verbose=verbose, ) @@ -332,6 +335,7 @@ def get_file( dst_path: str, version: str, *, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ) -> str: @@ -359,6 +363,7 @@ def get_file( src_path: path to file on backend dst_path: destination path to local file version: version string + num_workers: number of parallel jobs validate: verify file was successfully retrieved from the backend verbose: show debug messages @@ -394,6 +399,7 @@ def get_file( return self.backend.get_file( src_path_with_version, dst_path, + num_workers=num_workers, validate=validate, verbose=verbose, ) @@ -576,6 +582,7 @@ def move_file( dst_path: str, *, version: str = None, + num_workers: int = 1, validate: bool = False, verbose: bool = False, ): @@ -607,6 +614,7 @@ def move_file( src_path: source path to file on backend dst_path: destination path to file on backend version: version string + num_workers: number of parallel jobs validate: verify file was successfully moved verbose: show debug messages @@ -647,6 +655,7 @@ def move_file( self.backend.move_file( src_path_with_version, dst_path_with_version, + num_workers=num_workers, validate=validate, verbose=verbose, ) diff --git a/benchmarks/README.rst b/benchmarks/README.rst new file mode 100644 index 00000000..08baef29 --- /dev/null +++ b/benchmarks/README.rst @@ -0,0 +1,30 @@ +audbackend Benchmarks +===================== + +Collection of benchmark scripts to evaluate functionality. + + +Parallel file loading +--------------------- + +The ``Minio`` backend supports parallel loading of files. +It can be benchmarked with: + +.. code-block:: bash + + $ uv run --python 3.12 minio-parallel.py + +Run on a server with 10 +Intel(R) Xeon(R) Platinum 8275CL CPUs @ 3.00GHz +it resulted in + +=========== ======== ============== ============== +num_workers num_iter elapsed (avg) elapsed (std) +=========== ======== ============== ============== +1 10 0:01:05.592122 0:00:04.613981 +2 10 0:00:23.792445 0:00:03.151314 +3 10 0:00:15.051508 0:00:00.020850 +4 10 0:00:12.270467 0:00:00.744683 +5 10 0:00:13.566350 0:00:00.284529 +10 10 0:00:13.096010 0:00:00.575895 +=========== ======== ============== ============== diff --git a/benchmarks/minio-parallel.py b/benchmarks/minio-parallel.py new file mode 100644 index 00000000..4aa7d7fc --- /dev/null +++ b/benchmarks/minio-parallel.py @@ -0,0 +1,71 @@ +# /// script +# dependencies = [ +# "audbackend[all]", +# "numpy", +# "pandas", +# "tabulate", +# ] +# [tool.uv.sources] +# audbackend = { path = ".", editable = true } +# /// +import datetime +import os +import time + +import numpy as np +import pandas as pd + +import audeer + +import audbackend + + +def main(): + host = "s3.dualstack.eu-north-1.amazonaws.com" + repository = "audmodel-internal" + src_path = "/alm/audeering-omni/stage1_2/torch/7289b57d.zip" + version = "1.0.0" + dst_path = "./tmp.zip" + num_iter = 10 + + ds = [] + + for num_workers in audeer.progress_bar([1, 2, 3, 4, 5, 10]): + backend = audbackend.backend.Minio(host, repository) + backend.open() + + elapsed = [] + + for _ in range(num_iter): + if os.path.exists(dst_path): + os.remove(dst_path) + + interface = audbackend.interface.Maven(backend) + + t = time.time() + interface.get_file( + src_path=src_path, + dst_path=dst_path, + version=version, + num_workers=num_workers, + ) + elapsed.append(time.time() - t) + + ds.append( + { + "num_workers": num_workers, + "num_iter": num_iter, + "elapsed(avg)": str(datetime.timedelta(seconds=np.mean(elapsed))), + "elapsed(std)": str(datetime.timedelta(seconds=np.std(elapsed))), + } + ) + + backend.close() + + df = pd.DataFrame(ds) + df.to_csv("results.csv", index=False) + df.to_markdown("results.md", index=False) + + +if __name__ == "__main__": + main() diff --git a/docs/developer-guide.rst b/docs/developer-guide.rst index 8c8a9c08..4226c389 100644 --- a/docs/developer-guide.rst +++ b/docs/developer-guide.rst @@ -443,6 +443,7 @@ we provide a more efficient implementation. self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): with self._db as db: @@ -474,6 +475,7 @@ but it is more efficient if we provide one. self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): with self._db as db: @@ -499,6 +501,7 @@ from the backend. self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): with self._db as db: diff --git a/pyproject.toml b/pyproject.toml index d3f0d987..6c92b7cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ classifiers = [ ] requires-python = '>=3.10' dependencies = [ - 'audeer >=1.20.0', + 'audeer >=2.3.1', 'pywin32; sys_platform == "win32"', ] # Get version dynamically from git diff --git a/tests/bad_file_system.py b/tests/bad_file_system.py index 7e6a3e5a..4078f192 100644 --- a/tests/bad_file_system.py +++ b/tests/bad_file_system.py @@ -30,8 +30,10 @@ def _get_file( src_path: str, dst_path: str, verbose: bool, + num_workers: int, + chunk_size: int, ): - super()._get_file(src_path, dst_path, verbose) + super()._get_file(src_path, dst_path, verbose, num_workers, chunk_size) # raise error after file was retrieved raise InterruptedError() diff --git a/tests/singlefolder.py b/tests/singlefolder.py index 7b15f029..b2b261cf 100644 --- a/tests/singlefolder.py +++ b/tests/singlefolder.py @@ -115,6 +115,7 @@ def _get_file( self, src_path: str, dst_path: str, + num_workers: int, verbose: bool, ): with self.Map(self._path, self._lock) as m: diff --git a/tests/test_backend_minio.py b/tests/test_backend_minio.py index 1778161c..705d303c 100644 --- a/tests/test_backend_minio.py +++ b/tests/test_backend_minio.py @@ -1,4 +1,7 @@ +import filecmp import os +import signal +import threading import pytest @@ -7,6 +10,18 @@ import audbackend +def create_file_exact_size(filename, size_mb): + """Create binary file of given size.""" + size_bytes = size_mb * 1024 * 1024 + with open(filename, "wb") as f: + remaining = size_bytes + chunk_size = 8192 # Write in 8KB chunks + while remaining > 0: + to_write = min(chunk_size, remaining) + f.write(b"A" * to_write) # Can use any byte pattern + remaining -= to_write + + @pytest.fixture(scope="function", autouse=False) def hide_credentials(): defaults = { @@ -357,3 +372,152 @@ def test_open_close(host, repository): audbackend.backend.Minio.create(host, repository) backend.open() backend.close() + + +@pytest.mark.parametrize( + "interface", + [(audbackend.backend.Minio, audbackend.interface.Unversioned)], + indirect=True, +) +def test_get_file(tmpdir, interface): + r"""Test getting file. + + Args: + tmpdir: tmpdir fixture + interface: interface fixture + + """ + tmp_path = audeer.path(tmpdir, "file.bin") + create_file_exact_size(tmp_path, 2) + backend_path = "/file.bin" + interface.put_file(tmp_path, backend_path) + + dst_path1 = audeer.path(tmpdir, "dst1.bin") + dst_path2 = audeer.path(tmpdir, "dst2.bin") + interface.get_file(backend_path, dst_path1, num_workers=1) + interface.get_file(backend_path, dst_path2, num_workers=2) + + # Check both downloaded files are the same + assert os.path.getsize(dst_path1) == os.path.getsize(dst_path2) + assert audeer.md5(dst_path1) == audeer.md5(dst_path2) + assert filecmp.cmp(dst_path1, dst_path2, shallow=False) + + +@pytest.mark.parametrize( + "interface", + [(audbackend.backend.Minio, audbackend.interface.Unversioned)], + indirect=True, +) +def test_interrupt_signal_handler(tmpdir, interface): + r"""Test that signal handler sets cancel_event correctly. + + This tests the signal handler setup + that sets the cancel_event when SIGINT is received. + + Args: + tmpdir: tmpdir fixture + interface: interface fixture + + """ + # Create a cancel_event like _get_file does + cancel_event = threading.Event() + + # Create the signal handler as defined in _get_file + def signal_handler(signum, frame): + cancel_event.set() + + # Install the handler + original_handler = signal.signal(signal.SIGINT, signal_handler) + + try: + # Verify event is not set initially + assert not cancel_event.is_set() + + # Call the signal handler directly (simulating Ctrl+C) + signal_handler(signal.SIGINT, None) + + # Verify event was set by the handler (minio.py:289) + assert cancel_event.is_set() + finally: + # Restore original handler + signal.signal(signal.SIGINT, original_handler) + + +@pytest.mark.parametrize( + "interface", + [(audbackend.backend.Minio, audbackend.interface.Unversioned)], + indirect=True, +) +def test_interrupt_via_cancel_event(tmpdir, interface): + r"""Test that cancel_event check during download raises KeyboardInterrupt. + + This tests the interrupt handling mechanism + where the cancel_event is checked during file download. + Directly calls _download_file with a cancel_event that gets set. + + Args: + tmpdir: tmpdir fixture + interface: interface fixture + + """ + # Create and upload a test file + tmp_path = audeer.path(tmpdir, "file.bin") + create_file_exact_size(tmp_path, 2) # 2 MB file + backend_path = "/file.bin" + interface.put_file(tmp_path, backend_path) + + # Create a cancel_event and set it immediately + # This will cause the download loop to raise KeyboardInterrupt + cancel_event = threading.Event() + cancel_event.set() + + # Prepare download + src_path = interface._backend.path(backend_path) + dst_path = audeer.path(tmpdir, "download.bin") + pbar = audeer.progress_bar(total=100, disable=True) + + # Attempt download with cancel_event already set - should raise KeyboardInterrupt + with pytest.raises(KeyboardInterrupt, match="Download cancelled by user"): + interface._backend._download_file(src_path, dst_path, pbar, cancel_event) + + +@pytest.mark.parametrize( + "interface", + [(audbackend.backend.Minio, audbackend.interface.Unversioned)], + indirect=True, +) +def test_interrupt_cleanup(tmpdir, interface, monkeypatch): + r"""Test that KeyboardInterrupt cleans up partial file. + + This tests the exception handler + that removes partial files when download is interrupted. + + Args: + tmpdir: tmpdir fixture + interface: interface fixture + monkeypatch: monkeypatch fixture + + """ + # Create and upload test file + tmp_path = audeer.path(tmpdir, "file.bin") + create_file_exact_size(tmp_path, 2) + backend_path = "/file.bin" + interface.put_file(tmp_path, backend_path) + + # Mock _download_file to raise KeyboardInterrupt + def mock_download(*args, **kwargs): + # Create partial file before interrupting + dst_path = args[1] + with open(dst_path, "wb") as f: + f.write(b"partial data") + raise KeyboardInterrupt("Simulated interrupt") + + monkeypatch.setattr(interface._backend, "_download_file", mock_download) + + # Attempt download + dst_path = audeer.path(tmpdir, "download.bin") + with pytest.raises(KeyboardInterrupt): + interface.get_file(backend_path, dst_path, num_workers=1) + + # Verify cleanup happened + assert not os.path.exists(dst_path)