Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,38 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);

// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false);
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);

_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);

var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
downloadTasks.Add(task);
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
partNum, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);

// Acquire HTTP slot in the loop before creating task
// Loop will block here if all slots are in use
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);

_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);

try
{
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
downloadTasks.Add(task);
}
catch (Exception ex)
{
// If task creation fails, release the HTTP slot we just acquired
_httpConcurrencySlots.Release();
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
throw;
}
}

var expectedTaskCount = downloadTasks.Count;
_logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);

// Wait for all downloads to complete (fails fast on first exception)
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, cancellationToken).ConfigureAwait(false);
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);

_logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully");

Expand All @@ -418,7 +437,27 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
catch (Exception ex)
{
_downloadException = ex;
_logger.Error(ex, "MultipartDownloadManager: Background download task failed");



// Cancel all remaining downloads immediately to prevent cascading timeout errors
// This ensures that when one part fails, other tasks stop gracefully instead of
// continuing until they hit their own timeout/cancellation errors
// Check if cancellation was already requested to avoid ObjectDisposedException
if (!internalCts.IsCancellationRequested)
{
try
{
internalCts.Cancel();
_logger.DebugFormat("MultipartDownloadManager: Cancelled all in-flight downloads due to error");
}
catch (ObjectDisposedException)
{
// CancellationTokenSource was already disposed, ignore
_logger.DebugFormat("MultipartDownloadManager: CancellationTokenSource already disposed during cancellation");
}
}

_dataHandler.OnDownloadComplete(ex);
throw;
}
Expand All @@ -440,6 +479,22 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
_downloadException = ex;
_logger.Error(ex, "MultipartDownloadManager: Download failed");

// Cancel all remaining downloads immediately to prevent cascading timeout errors
// Check if cancellation was already requested to avoid ObjectDisposedException
if (!internalCts.IsCancellationRequested)
{
try
{
internalCts.Cancel();
_logger.DebugFormat("MultipartDownloadManager: Cancelled all in-flight downloads due to error");
}
catch (ObjectDisposedException)
{
// CancellationTokenSource was already disposed, ignore
_logger.DebugFormat("MultipartDownloadManager: CancellationTokenSource already disposed during cancellation");
}
}

_dataHandler.OnDownloadComplete(ex);

// Dispose the CancellationTokenSource if background task was never started
Expand All @@ -459,15 +514,8 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even

try
{
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);

// Limit HTTP concurrency for both network download AND disk write
// The semaphore is held until AFTER ProcessPartAsync completes to ensure
// ConcurrentServiceRequests controls the entire I/O operation
await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false);

_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNumber);
// HTTP slot was already acquired in the for loop before this task was created
// We just need to use it and release it when done

try
{
Expand Down Expand Up @@ -544,7 +592,7 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
finally
{
// Release semaphore after BOTH network download AND disk write complete
// This ensures ConcurrentServiceRequests limits the entire I/O operation
// Slot was acquired in the for loop before this task was created
_httpConcurrencySlots.Release();
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released (Available: {1}/{2})",
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
Expand Down
Loading