Skip to content
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6757d72
Add num_workers and chunk_size for downloads
hagenw Oct 15, 2025
8e3eb10
Continue implementation
hagenw Oct 16, 2025
972a423
Fix tests
hagenw Oct 22, 2025
d7aea51
Move arguments to classes
hagenw Oct 22, 2025
4eaa0a3
Add benchmark script
hagenw Oct 22, 2025
88dd39d
Revert "Move arguments to classes"
hagenw Oct 23, 2025
cae11bd
Add num_workers argument to interfaces
hagenw Oct 23, 2025
efd0a23
Fix test
hagenw Oct 23, 2025
bb972f5
Remove unused code
hagenw Oct 23, 2025
4d0abdf
Try to fix pbar
hagenw Oct 23, 2025
4a796f9
Update benchmark script
hagenw Oct 23, 2025
886205d
Debug pbar
hagenw Oct 23, 2025
fd36881
Debug pbar
hagenw Oct 23, 2025
a3caa95
Improve progress bars
hagenw Oct 23, 2025
24876ca
Add test for using num_workers
hagenw Oct 23, 2025
d8310f2
Fix linter
hagenw Oct 23, 2025
515e9e0
Implement other solution for single worker
hagenw Oct 27, 2025
d36aea8
Try larger chunks
hagenw Oct 27, 2025
7635588
Test very small chunk_size
hagenw Oct 27, 2025
003a89c
DEBUG faster implementation
hagenw Oct 27, 2025
02d2693
Fix writing in the same file
hagenw Oct 27, 2025
f18af4c
Clean up code
hagenw Oct 27, 2025
bb3968b
Depend on audeer>=2.3.1
hagenw Oct 27, 2025
acf4db5
Update benchmark script
hagenw Nov 10, 2025
f1f9319
Ensure interruption works for multi-threading
hagenw Nov 10, 2025
6df134e
Ensure same error message for num_workers=1
hagenw Nov 10, 2025
5f23b04
Fix linter
hagenw Nov 10, 2025
7bc5c0c
Add tests for Crtl+C
hagenw Nov 10, 2025
b32d4b2
Fix coverage
hagenw Nov 10, 2025
347eeb7
Move benchmark script
hagenw Nov 11, 2025
8bf0e76
Add benchmark results
hagenw Nov 11, 2025
caa26b3
Ensure we do not have 0 chunk sizes
hagenw Nov 11, 2025
3e7a09b
Ensure offset is always passed on
hagenw Nov 11, 2025
0f21cec
Fix typo in README
hagenw Nov 11, 2025
688f1fd
Setup pbar only once
hagenw Nov 11, 2025
db99d24
Revert "Setup pbar only once"
hagenw Nov 11, 2025
b4cbc11
Move to finally
hagenw Nov 11, 2025
9d11bab
Try to fix
hagenw Nov 11, 2025
2d7d692
Revert "Try to fix"
hagenw Nov 11, 2025
ca813d3
Revert "Move to finally"
hagenw Nov 11, 2025
c3c3088
Use single pbar
hagenw Nov 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions audbackend/core/backend/artifactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def _copy_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Copy file on backend."""
Expand Down Expand Up @@ -263,6 +264,7 @@ def _get_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Get file from backend."""
Expand All @@ -287,6 +289,7 @@ def _move_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Move file on backend."""
Expand Down
25 changes: 23 additions & 2 deletions audbackend/core/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _copy_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Copy file on backend.
Expand All @@ -189,14 +190,20 @@ def _copy_file(
"""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = audeer.path(tmp, "~")
tmp_path = self.get_file(src_path, tmp_path, verbose=verbose)
tmp_path = self.get_file(
src_path,
tmp_path,
num_workers=num_workers,
verbose=verbose,
)
self.put_file(tmp_path, dst_path, verbose=verbose)

def copy_file(
self,
src_path: str,
dst_path: str,
*,
num_workers: int = 1,
validate: bool = False,
verbose: bool = False,
):
Expand All @@ -219,6 +226,7 @@ def copy_file(
Args:
src_path: source path to file on backend
dst_path: destination path to file on backend
num_workers: number of parallel jobs
validate: verify file was successfully copied
verbose: show debug messages

Expand Down Expand Up @@ -246,6 +254,7 @@ def copy_file(
self._copy_file,
src_path,
dst_path,
num_workers,
verbose,
)

Expand Down Expand Up @@ -435,6 +444,7 @@ def get_archive(
dst_root: str,
*,
tmp_root: str = None,
num_workers: int = 1,
validate: bool = False,
verbose: bool = False,
) -> list[str]:
Expand All @@ -459,6 +469,7 @@ def get_archive(
dst_root: local destination directory
tmp_root: directory under which archive is temporarily extracted.
Defaults to temporary directory of system
num_workers: number of parallel jobs
validate: verify archive was successfully
retrieved from the backend
verbose: show debug messages
Expand Down Expand Up @@ -496,6 +507,7 @@ def get_archive(
self.get_file(
src_path,
local_archive,
num_workers=num_workers,
validate=validate,
verbose=verbose,
)
Expand All @@ -510,6 +522,7 @@ def _get_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
): # pragma: no cover
r"""Get file from backend."""
Expand All @@ -520,6 +533,7 @@ def get_file(
src_path: str,
dst_path: str,
*,
num_workers: int = 1,
validate: bool = False,
verbose: bool = False,
) -> str:
Expand All @@ -546,6 +560,7 @@ def get_file(
Args:
src_path: path to file on backend
dst_path: destination path to local file
num_workers: number of parallel jobs
validate: verify file was successfully
retrieved from the backend
verbose: show debug messages
Expand Down Expand Up @@ -594,6 +609,7 @@ def get_file(
self._get_file,
src_path,
tmp_path,
num_workers,
verbose,
)
audeer.move_file(tmp_path, dst_path)
Expand Down Expand Up @@ -738,6 +754,7 @@ def _move_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Move file on backend.
Expand All @@ -749,14 +766,15 @@ def _move_file(
if backend supports a native way to move files.

"""
self.copy_file(src_path, dst_path, verbose=verbose)
self.copy_file(src_path, dst_path, num_workers=num_workers, verbose=verbose)
self.remove_file(src_path)

def move_file(
self,
src_path: str,
dst_path: str,
*,
num_workers: int = 1,
validate: bool = False,
verbose: bool = False,
):
Expand All @@ -783,6 +801,7 @@ def move_file(
Args:
src_path: source path to file on backend
dst_path: destination path to file on backend
num_workers: number of parallel jobs
validate: verify file was successfully moved
verbose: show debug messages

Expand Down Expand Up @@ -812,6 +831,7 @@ def move_file(
self.copy_file(
src_path,
dst_path,
num_workers=num_workers,
validate=True,
verbose=verbose,
)
Expand All @@ -821,6 +841,7 @@ def move_file(
self._move_file,
src_path,
dst_path,
num_workers,
verbose,
)
else:
Expand Down
3 changes: 3 additions & 0 deletions audbackend/core/backend/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def _copy_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Copy file on backend."""
Expand Down Expand Up @@ -114,6 +115,7 @@ def _get_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Get file from backend."""
Expand Down Expand Up @@ -142,6 +144,7 @@ def _move_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Move file on backend."""
Expand Down
108 changes: 83 additions & 25 deletions audbackend/core/backend/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import getpass
import mimetypes
import os
import signal
import tempfile
import threading

import minio

Expand Down Expand Up @@ -198,6 +200,7 @@ def _copy_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Copy file on backend."""
Expand All @@ -209,7 +212,7 @@ def _copy_file(
if self._size(src_path) / 1024 / 1024 / 1024 >= 4.9:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = audeer.path(tmp_dir, os.path.basename(src_path))
self._get_file(src_path, tmp_path, verbose)
self._get_file(src_path, tmp_path, num_workers, verbose)
self._put_file(tmp_path, dst_path, checksum, verbose)
else:
self._client.copy_object(
Expand Down Expand Up @@ -264,35 +267,89 @@ def _get_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Get file from backend."""
src_path = self.path(src_path)
src_size = self._client.stat_object(self.repository, src_path).size
chunk = 4 * 1024
with audeer.progress_bar(total=src_size, disable=not verbose) as pbar:
desc = audeer.format_display_message(
f"Download {os.path.basename(str(src_path))}", pbar=True
)
pbar.set_description_str(desc)
pbar.refresh()

dst_size = 0
try:
response = self._client.get_object(self.repository, src_path)
with open(dst_path, "wb") as dst_fp:
while src_size > dst_size:
data = response.read(chunk)
n_data = len(data)
if n_data > 0:
dst_fp.write(data)
dst_size += n_data
pbar.update(n_data)
except Exception as e: # pragma: no cover
raise RuntimeError(f"Error downloading file: {e}")
finally:
response.close()
response.release_conn()
# Setup progress bar
desc = audeer.format_display_message(
f"Download {os.path.basename(str(src_path))}",
pbar=verbose,
)
pbar = audeer.progress_bar(total=src_size, desc=desc, disable=not verbose)

# Create cancellation event for handling interrupts
cancel_event = threading.Event()

# Install signal handler to set cancel_event on Ctrl+C
def signal_handler(signum, frame):
cancel_event.set() # pragma: no cover

original_handler = signal.signal(signal.SIGINT, signal_handler)

try:
if num_workers == 1:
# Simple single-threaded download
self._download_file(src_path, dst_path, pbar, cancel_event)
else:
# Multi-threaded download with pre-allocated file
with open(dst_path, "wb") as f:
f.truncate(src_size)

# Create and run download tasks
tasks = []
chunk_size = src_size // num_workers
for i in range(num_workers):
offset = i * chunk_size
length = chunk_size if i < num_workers - 1 else src_size - offset
tasks.append(
([src_path, dst_path, pbar, cancel_event, offset, length], {})
)

audeer.run_tasks(self._download_file, tasks, num_workers=num_workers)
except KeyboardInterrupt:
# Clean up partial file
if os.path.exists(dst_path):
os.remove(dst_path)
raise
finally:
# Restore original signal handler
signal.signal(signal.SIGINT, original_handler)

def _download_file(
self,
src_path: str,
dst_path: str,
pbar,
cancel_event: threading.Event = None,
offset: int = 0,
length: int | None = None,
):
"""Download file or part of file."""
chunk_size = 4 * 1024 # 4 KB

# Get the data stream
kwargs = {"offset": offset, "length": length} if length else {}
response = self._client.get_object(self.repository, src_path, **kwargs)

try:
with open(dst_path, "r+b" if offset else "wb") as f:
if offset:
f.seek(offset)

with pbar:
while data := response.read(chunk_size):
# Check if cancellation was requested
if cancel_event and cancel_event.is_set():
raise KeyboardInterrupt("Download cancelled by user")
f.write(data)
pbar.update(len(data))
finally:
response.close()
response.release_conn()

def _ls(
self,
Expand All @@ -311,10 +368,11 @@ def _move_file(
self,
src_path: str,
dst_path: str,
num_workers: int,
verbose: bool,
):
r"""Move file on backend."""
self._copy_file(src_path, dst_path, verbose)
self._copy_file(src_path, dst_path, num_workers, verbose)
self._remove_file(src_path)

def _open(
Expand Down
Loading