Skip to content

Commit 07354cd

Browse files
committed
optimize task creation
stack-info: PR: #4219, branch: GarrettBeatty/gcbeatty/taskoptimization/2 add cancellation stack-info: PR: #4221, branch: GarrettBeatty/gcbeatty/taskoptimization/4
1 parent f2e9a53 commit 07354cd

File tree

2 files changed

+319
-15
lines changed

2 files changed

+319
-15
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -379,19 +379,38 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
379379
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);
380380

381381
// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
382-
await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false);
382+
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);
383383

384384
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);
385385

386-
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
387-
downloadTasks.Add(task);
386+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
387+
partNum, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
388+
389+
// Acquire HTTP slot in the loop before creating task
390+
// Loop will block here if all slots are in use
391+
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);
392+
393+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);
394+
395+
try
396+
{
397+
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
398+
downloadTasks.Add(task);
399+
}
400+
catch (Exception ex)
401+
{
402+
// If task creation fails, release the HTTP slot we just acquired
403+
_httpConcurrencySlots.Release();
404+
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
405+
throw;
406+
}
388407
}
389408

390409
var expectedTaskCount = downloadTasks.Count;
391410
_logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);
392411

393412
// Wait for all downloads to complete (fails fast on first exception)
394-
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, cancellationToken).ConfigureAwait(false);
413+
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);
395414

396415
_logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully");
397416

@@ -418,7 +437,27 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
418437
catch (Exception ex)
419438
{
420439
_downloadException = ex;
421-
_logger.Error(ex, "MultipartDownloadManager: Background download task failed");
440+
441+
442+
443+
// Cancel all remaining downloads immediately to prevent cascading timeout errors
444+
// This ensures that when one part fails, other tasks stop gracefully instead of
445+
// continuing until they hit their own timeout/cancellation errors
446+
// Check if cancellation was already requested to avoid ObjectDisposedException
447+
if (!internalCts.IsCancellationRequested)
448+
{
449+
try
450+
{
451+
internalCts.Cancel();
452+
_logger.DebugFormat("MultipartDownloadManager: Cancelled all in-flight downloads due to error");
453+
}
454+
catch (ObjectDisposedException)
455+
{
456+
// CancellationTokenSource was already disposed, ignore
457+
_logger.DebugFormat("MultipartDownloadManager: CancellationTokenSource already disposed during cancellation");
458+
}
459+
}
460+
422461
_dataHandler.OnDownloadComplete(ex);
423462
throw;
424463
}
@@ -440,6 +479,22 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
440479
_downloadException = ex;
441480
_logger.Error(ex, "MultipartDownloadManager: Download failed");
442481

482+
// Cancel all remaining downloads immediately to prevent cascading timeout errors
483+
// Check if cancellation was already requested to avoid ObjectDisposedException
484+
if (!internalCts.IsCancellationRequested)
485+
{
486+
try
487+
{
488+
internalCts.Cancel();
489+
_logger.DebugFormat("MultipartDownloadManager: Cancelled all in-flight downloads due to error");
490+
}
491+
catch (ObjectDisposedException)
492+
{
493+
// CancellationTokenSource was already disposed, ignore
494+
_logger.DebugFormat("MultipartDownloadManager: CancellationTokenSource already disposed during cancellation");
495+
}
496+
}
497+
443498
_dataHandler.OnDownloadComplete(ex);
444499

445500
// Dispose the CancellationTokenSource if background task was never started
@@ -459,15 +514,8 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
459514

460515
try
461516
{
462-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
463-
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
464-
465-
// Limit HTTP concurrency for both network download AND disk write
466-
// The semaphore is held until AFTER ProcessPartAsync completes to ensure
467-
// ConcurrentServiceRequests controls the entire I/O operation
468-
await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false);
469-
470-
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNumber);
517+
// HTTP slot was already acquired in the for loop before this task was created
518+
// We just need to use it and release it when done
471519

472520
try
473521
{
@@ -544,7 +592,7 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
544592
finally
545593
{
546594
// Release semaphore after BOTH network download AND disk write complete
547-
// This ensures ConcurrentServiceRequests limits the entire I/O operation
595+
// Slot was acquired in the for loop before this task was created
548596
_httpConcurrencySlots.Release();
549597
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released (Available: {1}/{2})",
550598
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);

0 commit comments

Comments
 (0)