diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs
index da9210465fde..4d7415a4a8f5 100644
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs
@@ -90,7 +90,7 @@ public async Task ProcessPartAsync(
partNumber, offset);
// Write part data to file at the calculated offset
- await WritePartToFileAsync(offset, response, cancellationToken)
+ await WritePartToFileAsync(partNumber, offset, response, cancellationToken)
.ConfigureAwait(false);
_logger.DebugFormat("FilePartDataHandler: [Part {0}] File write completed successfully",
@@ -192,6 +192,7 @@ private long GetPartOffset(GetObjectResponse response, int partNumber)
/// Writes part data from GetObjectResponse ResponseStream to the file at the specified offset.
///
private async Task WritePartToFileAsync(
+ int partNumber,
long offset,
GetObjectResponse response,
CancellationToken cancellationToken)
@@ -213,7 +214,7 @@ private async Task WritePartToFileAsync(
// Seek to the correct offset for this part
fileStream.Seek(offset, SeekOrigin.Begin);
- _logger.DebugFormat("FilePartDataHandler: Writing {0} bytes to file at offset {1}",
+ _logger.DebugFormat("FilePartDataHandler: [Part {0}] Writing {1} bytes to file at offset {2}", partNumber,
response.ContentLength, offset);
// Use GetObjectResponse's stream copy logic which includes:
@@ -232,7 +233,7 @@ await response.WriteResponseStreamAsync(
await fileStream.FlushAsync(cancellationToken)
.ConfigureAwait(false);
- _logger.DebugFormat("FilePartDataHandler: Successfully wrote {0} bytes at offset {1}",
+ _logger.DebugFormat("FilePartDataHandler: [Part {0}] Successfully wrote {1} bytes at offset {2}", partNumber,
response.ContentLength, offset);
}
}
diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs
index 59d14e889b28..010243c8c7bd 100644
--- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs
+++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs
@@ -173,6 +173,41 @@ public Exception DownloadException
}
}
+ ///
+ /// Discovers the download strategy (single-part vs multipart) by making an initial GetObject request.
+ ///
+ /// Cancellation token to cancel the discovery operation.
+ ///
+ /// A containing information about the object size, part count,
+ /// and the initial GetObject response.
+ ///
+ ///
+ /// IMPORTANT - HTTP Semaphore Lifecycle:
+ ///
+ /// This method acquires an HTTP concurrency slot from the configured semaphore and downloads Part 1.
+ /// The semaphore slot is HELD until completes processing Part 1.
+ /// Callers MUST call after this method to release the semaphore.
+ /// Failure to call will cause the semaphore slot to remain held indefinitely,
+ /// potentially blocking other downloads and causing deadlocks.
+ ///
+ /// Concurrency Implications:
+ ///
+ /// With limited HTTP concurrency (e.g., ConcurrentServiceRequests=1 for shared throttlers in directory downloads),
+ /// concurrent calls to this method will block until previous downloads complete their full lifecycle
+ /// (discover → start). This is by design to ensure the entire I/O operation (network + disk) is
+ /// within the concurrency limit. For single-slot throttlers, downloads must be processed sequentially:
+ /// complete one download's full lifecycle before starting the next.
+ ///
+ /// Typical Usage Pattern:
+ ///
+ /// var discovery = await manager.DiscoverDownloadStrategyAsync(cancellationToken);
+ /// await manager.StartDownloadsAsync(discovery, progressCallback, cancellationToken);
+ /// await manager.DownloadCompletionTask; // Wait for multipart downloads to finish
+ ///
+ ///
+ /// Thrown if the manager has been disposed.
+ /// Thrown if discovery has already been performed.
+ /// Thrown if the operation is cancelled.
///
public async Task DiscoverDownloadStrategyAsync(CancellationToken cancellationToken)
{
@@ -209,6 +244,50 @@ public async Task DiscoverDownloadStrategyAsync(Cancell
}
}
+ ///
+ /// Processes Part 1 and starts downloading remaining parts for multipart downloads.
+ /// Returns immediately after processing Part 1 to allow the consumer to begin reading.
+ ///
+ ///
+ /// The discovery result from containing object metadata
+ /// and the initial GetObject response.
+ ///
+ ///
+ /// Optional progress callback that will be invoked as parts are downloaded. For multipart downloads,
+ /// progress is aggregated across all concurrent part downloads.
+ ///
+ /// Cancellation token to cancel the download operation.
+ ///
+ /// A task that completes after Part 1 is processed. For multipart downloads, remaining parts
+ /// continue downloading in the background (monitor via ).
+ ///
+ ///
+ /// HTTP Semaphore Release:
+ ///
+ /// This method processes Part 1 (downloaded during )
+ /// and releases the HTTP semaphore slot that was acquired during discovery.
+ /// The semaphore is released after both the network download and disk write
+ /// operations complete for Part 1. This ensures the ConcurrentServiceRequests limit
+ /// controls the entire I/O operation (network + disk), not just the network download.
+ ///
+ /// Background Processing (Multipart Only):
+ ///
+ /// For multipart downloads (when TotalParts > 1), this method starts a background task
+ /// to download and process remaining parts (Part 2+) and returns immediately. This allows the
+ /// consumer to start reading from the buffer without waiting for all downloads to complete,
+ /// which prevents deadlocks when the buffer fills up before the consumer begins reading.
+ /// Monitor to detect when all background downloads have finished.
+ ///
+ /// Single-Part Downloads:
+ ///
+ /// For single-part downloads (when TotalParts = 1), this method processes Part 1 synchronously
+ /// and returns immediately. No background task is created, and
+ /// will already be completed when this method returns.
+ ///
+ ///
+ /// Thrown if the manager has been disposed.
+ /// Thrown if is null.
+ /// Thrown if the operation is cancelled.
///
public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, EventHandler progressCallback, CancellationToken cancellationToken)
{
@@ -229,9 +308,6 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
try
{
- // Prepare the data handler (e.g., create temp files for file-based downloads)
- await _dataHandler.PrepareAsync(discoveryResult, cancellationToken).ConfigureAwait(false);
-
// Create delegate once and reuse for all parts
var wrappedCallback = progressCallback != null
? new EventHandler(DownloadPartProgressEventCallback)
@@ -239,6 +315,9 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
try
{
+ // Prepare the data handler (e.g., create temp files for file-based downloads)
+ await _dataHandler.PrepareAsync(discoveryResult, cancellationToken).ConfigureAwait(false);
+
// Attach progress callback to Part 1's response if provided
if (wrappedCallback != null)
{
@@ -246,17 +325,26 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
}
// Process Part 1 from InitialResponse (applies to both single-part and multipart)
- _logger.DebugFormat("MultipartDownloadManager: Buffering Part 1 from discovery response");
+ // NOTE: Semaphore is still held from discovery phase and will be released in finally block
+ _logger.DebugFormat("MultipartDownloadManager: Processing Part 1 from discovery response");
await _dataHandler.ProcessPartAsync(1, discoveryResult.InitialResponse, cancellationToken).ConfigureAwait(false);
+
+ _logger.DebugFormat("MultipartDownloadManager: Part 1 processing completed");
}
finally
{
// Always detach the event handler to prevent memory leak
- // This runs whether ProcessPartAsync succeeds or throws
if (wrappedCallback != null)
{
discoveryResult.InitialResponse.WriteObjectProgressEvent -= wrappedCallback;
}
+
+ // Release semaphore after BOTH network download AND disk write complete for Part 1
+ // This ensures ConcurrentServiceRequests controls the entire I/O operation,
+ // consistent with Parts 2+ (see CreateDownloadTaskAsync)
+ _httpConcurrencySlots.Release();
+ _logger.DebugFormat("MultipartDownloadManager: [Part 1] HTTP concurrency slot released (Available: {0}/{1})",
+ _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
}
if (discoveryResult.IsSinglePart)
@@ -374,7 +462,9 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
- // Limit HTTP concurrency
+ // Limit HTTP concurrency for both network download AND disk write
+ // The semaphore is held until AFTER ProcessPartAsync completes to ensure
+ // ConcurrentServiceRequests controls the entire I/O operation
await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false);
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNumber);
@@ -438,25 +528,27 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
}
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] ETag validation passed", partNumber);
+
+ _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing part (handler will decide: stream or buffer)", partNumber);
+
+ // Delegate data handling to the handler
+ // IMPORTANT: Handler takes ownership of response and is responsible for disposing it in ALL cases:
+ // - If streaming: StreamingDataSource takes ownership and disposes when consumer finishes reading
+ // - If buffering: Handler disposes immediately after copying data to buffer
+ // - On error: Handler disposes in its catch block before rethrowing
+ await _dataHandler.ProcessPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false);
+ ownsResponse = false; // Ownership transferred to handler
+
+ _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing completed successfully", partNumber);
}
finally
{
+ // Release semaphore after BOTH network download AND disk write complete
+ // This ensures ConcurrentServiceRequests limits the entire I/O operation
_httpConcurrencySlots.Release();
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released (Available: {1}/{2})",
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
}
-
- _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing part (handler will decide: stream or buffer)", partNumber);
-
- // Delegate data handling to the handler
- // IMPORTANT: Handler takes ownership of response and is responsible for disposing it in ALL cases:
- // - If streaming: StreamingDataSource takes ownership and disposes when consumer finishes reading
- // - If buffering: Handler disposes immediately after copying data to buffer
- // - On error: Handler disposes in its catch block before rethrowing
- await _dataHandler.ProcessPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false);
- ownsResponse = false; // Ownership transferred to handler
-
- _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing completed successfully", partNumber);
}
catch (Exception ex)
{
@@ -491,59 +583,66 @@ private async Task DiscoverUsingPartStrategyAsync(Cance
await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false);
GetObjectResponse firstPartResponse = null;
+
+ // NOTE: Semaphore is NOT released here - it will be released in StartDownloadsAsync
+ // after Part 1 is processed. This ensures the semaphore controls both network download
+ // AND disk write for Part 1, consistent with Parts 2+ (see CreateDownloadTaskAsync)
+
try
{
// SEP Part GET Step 2: "send the request and wait for the response in a non-blocking fashion"
firstPartResponse = await _s3Client.GetObjectAsync(firstPartRequest, cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _httpConcurrencySlots.Release();
- _logger.DebugFormat("MultipartDownloadManager: [Part 1 Discovery] HTTP concurrency slot released");
- }
-
- if (firstPartResponse == null)
- throw new InvalidOperationException("Failed to retrieve object from S3");
-
- // SEP Part GET Step 3: Save ETag for later IfMatch validation in subsequent requests
- _savedETag = firstPartResponse.ETag;
-
- // SEP Part GET Step 3: "check the response. First parse total content length from ContentRange
- // of the GetObject response and save the value in a variable. The length is the numeric value
- // after / delimiter. For example, given ContentRange=bytes 0-1/5, 5 is the total content length.
- // Then check PartsCount."
- if (firstPartResponse.PartsCount.HasValue && firstPartResponse.PartsCount.Value > 1)
- {
- // SEP Part GET Step 3: "If PartsCount in the response is larger than 1, it indicates there
- // are more parts available to download. The S3 Transfer Manager MUST save etag from the
- // response to a variable."
- _discoveredPartCount = firstPartResponse.PartsCount.Value;
- // Parse total content length from ContentRange header
- // For example, "bytes 0-5242879/52428800" -> extract 52428800
- var totalObjectSize = ExtractTotalSizeFromContentRange(firstPartResponse.ContentRange);
+ if (firstPartResponse == null)
+ throw new InvalidOperationException("Failed to retrieve object from S3");
- // SEP Part GET Step 7 will use this response for creating DownloadResponse
- // Keep the response with its stream (will be buffered in StartDownloadsAsync)
- return new DownloadDiscoveryResult
+ // SEP Part GET Step 3: Save ETag for later IfMatch validation in subsequent requests
+ _savedETag = firstPartResponse.ETag;
+
+ // SEP Part GET Step 3: "check the response. First parse total content length from ContentRange
+ // of the GetObject response and save the value in a variable. The length is the numeric value
+ // after / delimiter. For example, given ContentRange=bytes 0-1/5, 5 is the total content length.
+ // Then check PartsCount."
+ if (firstPartResponse.PartsCount.HasValue && firstPartResponse.PartsCount.Value > 1)
{
- TotalParts = firstPartResponse.PartsCount.Value,
- ObjectSize = totalObjectSize,
- InitialResponse = firstPartResponse // Keep response with stream
- };
+ // SEP Part GET Step 3: "If PartsCount in the response is larger than 1, it indicates there
+ // are more parts available to download. The S3 Transfer Manager MUST save etag from the
+ // response to a variable."
+ _discoveredPartCount = firstPartResponse.PartsCount.Value;
+
+ // Parse total content length from ContentRange header
+ // For example, "bytes 0-5242879/52428800" -> extract 52428800
+ var totalObjectSize = ExtractTotalSizeFromContentRange(firstPartResponse.ContentRange);
+
+ // SEP Part GET Step 7 will use this response for creating DownloadResponse
+ // Keep the response with its stream (will be buffered in StartDownloadsAsync)
+ return new DownloadDiscoveryResult
+ {
+ TotalParts = firstPartResponse.PartsCount.Value,
+ ObjectSize = totalObjectSize,
+ InitialResponse = firstPartResponse // Keep response with stream
+ };
+ }
+ else
+ {
+ // SEP Part GET Step 3: "If PartsCount is 1, go to Step 7."
+ _discoveredPartCount = 1;
+
+ // Single part upload - return the response for immediate use (SEP Step 7)
+ return new DownloadDiscoveryResult
+ {
+ TotalParts = 1,
+ ObjectSize = firstPartResponse.ContentLength,
+ InitialResponse = firstPartResponse // Keep response with stream
+ };
+ }
}
- else
+ catch
{
- // SEP Part GET Step 3: "If PartsCount is 1, go to Step 7."
- _discoveredPartCount = 1;
-
- // Single part upload - return the response for immediate use (SEP Step 7)
- return new DownloadDiscoveryResult
- {
- TotalParts = 1,
- ObjectSize = firstPartResponse.ContentLength,
- InitialResponse = firstPartResponse // Keep response with stream
- };
+ // On error, release semaphore and dispose response before rethrowing
+ _httpConcurrencySlots.Release();
+ firstPartResponse?.Dispose();
+ throw;
}
}
@@ -568,84 +667,91 @@ private async Task DiscoverUsingRangeStrategyAsync(Canc
await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false);
GetObjectResponse firstRangeResponse = null;
+
+ // NOTE: Semaphore is NOT released here - it will be released in StartDownloadsAsync
+ // after Part 1 is processed. This ensures the semaphore controls both network download
+ // AND disk write for Part 1, consistent with Parts 2+ (see CreateDownloadTaskAsync)
+
try
{
// SEP Ranged GET Step 2: "send the request and wait for the response in a non-blocking fashion"
firstRangeResponse = await _s3Client.GetObjectAsync(firstRangeRequest, cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _httpConcurrencySlots.Release();
- _logger.DebugFormat("MultipartDownloadManager: [Part 1 Discovery] HTTP concurrency slot released");
- }
-
- // Defensive null check
- if (firstRangeResponse == null)
- throw new InvalidOperationException("Failed to retrieve object from S3");
-
- // SEP Ranged GET Step 5: "save Etag from the response to a variable"
- // (for IfMatch validation in subsequent requests)
- _savedETag = firstRangeResponse.ETag;
-
- // SEP Ranged GET Step 3: "parse total content length from ContentRange of the GetObject response
- // and save the value in a variable. The length is the numeric value after / delimiter.
- // For example, given ContentRange=bytes0-1/5, 5 is the total content length."
- // Check if ContentRange is null (object smaller than requested range)
- if (firstRangeResponse.ContentRange == null)
- {
- // No ContentRange means we got the entire small object
- _discoveredPartCount = 1;
- return new DownloadDiscoveryResult
+ // Defensive null check
+ if (firstRangeResponse == null)
+ throw new InvalidOperationException("Failed to retrieve object from S3");
+
+ // SEP Ranged GET Step 5: "save Etag from the response to a variable"
+ // (for IfMatch validation in subsequent requests)
+ _savedETag = firstRangeResponse.ETag;
+
+ // SEP Ranged GET Step 3: "parse total content length from ContentRange of the GetObject response
+ // and save the value in a variable. The length is the numeric value after / delimiter.
+ // For example, given ContentRange=bytes0-1/5, 5 is the total content length."
+ // Check if ContentRange is null (object smaller than requested range)
+ if (firstRangeResponse.ContentRange == null)
{
- TotalParts = 1,
- ObjectSize = firstRangeResponse.ContentLength,
- InitialResponse = firstRangeResponse // Keep response with stream
- };
- }
-
-
- // Parse total object size from ContentRange (e.g., "bytes 0-5242879/52428800" -> 52428800)
- var totalContentLength = ExtractTotalSizeFromContentRange(firstRangeResponse.ContentRange);
-
- // SEP Ranged GET Step 4: "compare the parsed total content length from Step 3 with ContentLength
- // of the response. If the parsed total content length equals to the value from ContentLength,
- // it indicates this request contains all of the data. The request is finished, return the response."
- if (totalContentLength == firstRangeResponse.ContentLength)
- {
- // Single part: total size equals returned ContentLength
- // This request contains all of the data
- _discoveredPartCount = 1;
+ // No ContentRange means we got the entire small object
+ _discoveredPartCount = 1;
+
+ return new DownloadDiscoveryResult
+ {
+ TotalParts = 1,
+ ObjectSize = firstRangeResponse.ContentLength,
+ InitialResponse = firstRangeResponse // Keep response with stream
+ };
+ }
+
+
+ // Parse total object size from ContentRange (e.g., "bytes 0-5242879/52428800" -> 52428800)
+ var totalContentLength = ExtractTotalSizeFromContentRange(firstRangeResponse.ContentRange);
+
+ // SEP Ranged GET Step 4: "compare the parsed total content length from Step 3 with ContentLength
+ // of the response. If the parsed total content length equals to the value from ContentLength,
+ // it indicates this request contains all of the data. The request is finished, return the response."
+ if (totalContentLength == firstRangeResponse.ContentLength)
+ {
+ // Single part: total size equals returned ContentLength
+ // This request contains all of the data
+ _discoveredPartCount = 1;
+
+ return new DownloadDiscoveryResult
+ {
+ TotalParts = 1,
+ ObjectSize = totalContentLength,
+ InitialResponse = firstRangeResponse // Keep response with stream
+ };
+ }
+ // SEP Ranged GET Step 4: "If they do not match, it indicates there are more parts available
+ // to download. Add a validation to verify that ContentLength equals to the targetPartSizeBytes."
+ if (firstRangeResponse.ContentLength != targetPartSize)
+ {
+ throw new InvalidOperationException(
+ $"Expected first part size {targetPartSize} bytes, but received {firstRangeResponse.ContentLength} bytes. " +
+ $"Total object size is {totalContentLength} bytes.");
+ }
+
+ // SEP Ranged GET Step 5: "calculate number of requests required by performing integer division
+ // of total contentLength/targetPartSizeBytes. Save the number of ranged GET requests in a variable."
+ _discoveredPartCount = (int)Math.Ceiling((double)totalContentLength / targetPartSize);
+
+ // SEP Ranged GET Step 9 will use this response for creating DownloadResponse
+ // Keep the response with its stream (will be buffered in StartDownloadsAsync)
return new DownloadDiscoveryResult
{
- TotalParts = 1,
+ TotalParts = _discoveredPartCount,
ObjectSize = totalContentLength,
InitialResponse = firstRangeResponse // Keep response with stream
};
}
-
- // SEP Ranged GET Step 4: "If they do not match, it indicates there are more parts available
- // to download. Add a validation to verify that ContentLength equals to the targetPartSizeBytes."
- if (firstRangeResponse.ContentLength != targetPartSize)
+ catch
{
- throw new InvalidOperationException(
- $"Expected first part size {targetPartSize} bytes, but received {firstRangeResponse.ContentLength} bytes. " +
- $"Total object size is {totalContentLength} bytes.");
+ // On error, release semaphore and dispose response before rethrowing
+ _httpConcurrencySlots.Release();
+ firstRangeResponse?.Dispose();
+ throw;
}
-
- // SEP Ranged GET Step 5: "calculate number of requests required by performing integer division
- // of total contentLength/targetPartSizeBytes. Save the number of ranged GET requests in a variable."
- _discoveredPartCount = (int)Math.Ceiling((double)totalContentLength / targetPartSize);
-
- // SEP Ranged GET Step 9 will use this response for creating DownloadResponse
- // Keep the response with its stream (will be buffered in StartDownloadsAsync)
- return new DownloadDiscoveryResult
- {
- TotalParts = _discoveredPartCount,
- ObjectSize = totalContentLength,
- InitialResponse = firstRangeResponse // Keep response with stream
- };
}
private GetObjectRequest CreateGetObjectRequest()
diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs
index 7ea1c89af832..fbbd1a410975 100644
--- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs
+++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs
@@ -1951,74 +1951,6 @@ public async Task DiscoverUsingRangeStrategy_CallsWaitForCapacityAsync()
mockDataHandler.Verify(x => x.WaitForCapacityAsync(It.IsAny()), Times.Once);
}
- [TestMethod]
- public async Task DiscoverUsingPartStrategy_AcquiresAndReleasesHttpSlot()
- {
- // Arrange - Use real SemaphoreSlim to track HTTP concurrency usage
- var httpThrottler = new SemaphoreSlim(2, 2); // 2 concurrent requests max
- var initialCount = httpThrottler.CurrentCount;
-
- var mockDataHandler = CreateMockDataHandler();
- var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse(
- 8 * 1024 * 1024, 3, 24 * 1024 * 1024, "test-etag");
-
- var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
- (req, ct) => Task.FromResult(mockResponse));
-
- var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
- downloadType: MultipartDownloadType.PART);
- var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
-
- // Use shared HTTP throttler to track usage
- var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
-
- // Act
- var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
-
- // Assert
- Assert.IsNotNull(result);
- Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
- "HTTP concurrency slot should be released after discovery completes");
-
- // Cleanup
- httpThrottler.Dispose();
- }
-
- [TestMethod]
- public async Task DiscoverUsingRangeStrategy_AcquiresAndReleasesHttpSlot()
- {
- // Arrange - Use real SemaphoreSlim to track HTTP concurrency usage
- var httpThrottler = new SemaphoreSlim(2, 2); // 2 concurrent requests max
- var initialCount = httpThrottler.CurrentCount;
-
- var mockDataHandler = CreateMockDataHandler();
- var totalObjectSize = 52428800; // 50MB
- var partSize = 8388608; // 8MB
- var mockResponse = MultipartDownloadTestHelpers.CreateRangeResponse(
- 0, partSize - 1, totalObjectSize, "test-etag");
-
- var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
- (req, ct) => Task.FromResult(mockResponse));
-
- var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
- partSize: partSize,
- downloadType: MultipartDownloadType.RANGE);
- var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
-
- // Use shared HTTP throttler to track usage
- var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
-
- // Act
- var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
-
- // Assert
- Assert.IsNotNull(result);
- Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
- "HTTP concurrency slot should be released after discovery completes");
-
- // Cleanup
- httpThrottler.Dispose();
- }
[TestMethod]
public async Task MultipleDownloads_WithSharedHttpThrottler_RespectsLimits()
@@ -2044,16 +1976,22 @@ public async Task MultipleDownloads_WithSharedHttpThrottler_RespectsLimits()
var coordinator1 = new MultipartDownloadManager(mockClient1.Object, request1, config, mockDataHandler1.Object, null, sharedThrottler);
var coordinator2 = new MultipartDownloadManager(mockClient2.Object, request2, config, mockDataHandler2.Object, null, sharedThrottler);
- // Act - Start both discoveries concurrently
- var task1 = coordinator1.DiscoverDownloadStrategyAsync(CancellationToken.None);
- var task2 = coordinator2.DiscoverDownloadStrategyAsync(CancellationToken.None);
+ var discovery1 = await coordinator1.DiscoverDownloadStrategyAsync(CancellationToken.None);
+ await coordinator1.StartDownloadsAsync(discovery1, null, CancellationToken.None);
+
+ var discovery2 = await coordinator2.DiscoverDownloadStrategyAsync(CancellationToken.None);
+ await coordinator2.StartDownloadsAsync(discovery2, null, CancellationToken.None);
- await Task.WhenAll(task1, task2);
+ // Wait for all background work to complete
+ await Task.WhenAll(
+ coordinator1.DownloadCompletionTask,
+ coordinator2.DownloadCompletionTask
+ );
- // Assert - Both should complete successfully despite shared throttler limits
- Assert.IsNotNull(task1.Result);
- Assert.IsNotNull(task2.Result);
- Assert.AreEqual(1, sharedThrottler.CurrentCount, "HTTP throttler should be fully released");
+ // Assert - Both should complete successfully and semaphore should be fully released
+ Assert.IsNotNull(discovery1);
+ Assert.IsNotNull(discovery2);
+ Assert.AreEqual(1, sharedThrottler.CurrentCount, "HTTP throttler should be fully released after complete download lifecycle");
// Cleanup
coordinator1.Dispose();
@@ -2230,6 +2168,544 @@ public async Task Discovery_SinglePart_StillCallsCapacityCheck()
#endregion
+ #region Concurrency Control Tests
+
+ [TestMethod]
+ public async Task HttpSemaphore_HeldThroughProcessPartAsync()
+ {
+ // Arrange - Test that HTTP semaphore is NOT released until ProcessPartAsync completes
+ var totalParts = 2;
+ var partSize = 8 * 1024 * 1024;
+ var totalObjectSize = totalParts * partSize;
+
+ // Use our own semaphore to monitor its state
+ var concurrentRequests = 1;
+ var httpSemaphore = new SemaphoreSlim(concurrentRequests, concurrentRequests);
+
+ var part1EnteredProcessPart = new TaskCompletionSource();
+ var part1CanExitProcessPart = new TaskCompletionSource();
+ var semaphoreWasReleasedDuringPart1 = false;
+
+ var mockDataHandler = new Mock();
+
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ mockDataHandler
+ .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .Returns(async (partNum, response, ct) =>
+ {
+ if (partNum == 1)
+ {
+ // Part 1 enters ProcessPartAsync
+ part1EnteredProcessPart.SetResult(true);
+
+ // Check if semaphore has been released (it shouldn't be with the fix!)
+ if (httpSemaphore.CurrentCount > 0)
+ {
+ semaphoreWasReleasedDuringPart1 = true;
+ }
+
+ // Block Part 1 here so we can observe semaphore state
+ await part1CanExitProcessPart.Task;
+ }
+ });
+
+ mockDataHandler
+ .Setup(x => x.OnDownloadComplete(It.IsAny()));
+
+ var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
+ totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
+ downloadType: MultipartDownloadType.PART);
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(
+ concurrentRequests: concurrentRequests);
+
+ // Pass in our instrumented semaphore
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpSemaphore);
+
+ var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+
+ // Act
+ var startTask = coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
+
+ // Wait for Part 1 to enter ProcessPartAsync
+ await part1EnteredProcessPart.Task;
+
+ // Check semaphore state while Part 1 is in ProcessPartAsync
+ var semaphoreAvailableDuringProcessing = httpSemaphore.CurrentCount > 0;
+
+ // Release Part 1 to continue
+ part1CanExitProcessPart.SetResult(true);
+
+ await startTask;
+ await coordinator.DownloadCompletionTask;
+
+ // Assert - This is the deterministic test of the fix
+ Assert.IsFalse(semaphoreAvailableDuringProcessing,
+ "HTTP semaphore should NOT be released while ProcessPartAsync is executing. " +
+ "Before fix semaphore.CurrentCount would be > 0 (released early). " +
+ "After fix: semaphore.CurrentCount should be 0 (held through ProcessPartAsync).");
+
+ Assert.IsFalse(semaphoreWasReleasedDuringPart1,
+ "Semaphore should not have been released at any point during Part 1 ProcessPartAsync execution");
+
+ // Cleanup
+ httpSemaphore.Dispose();
+ }
+
+ [TestMethod]
+ public async Task HttpSemaphore_RangeStrategy_HeldThroughProcessPartAsync()
+ {
+ // Arrange - Test that RANGE strategy also holds semaphore through ProcessPartAsync
+ var totalObjectSize = 17 * 1024 * 1024; // 17MB -> 3 parts @ 8MB
+ var partSize = 8 * 1024 * 1024;
+
+ var concurrentRequests = 1;
+ var httpSemaphore = new SemaphoreSlim(concurrentRequests, concurrentRequests);
+
+ var part1EnteredProcessPart = new TaskCompletionSource();
+ var part1CanExitProcessPart = new TaskCompletionSource();
+
+ var mockDataHandler = new Mock();
+
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ mockDataHandler
+ .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .Returns(async (partNum, response, ct) =>
+ {
+ if (partNum == 1)
+ {
+ part1EnteredProcessPart.SetResult(true);
+ await part1CanExitProcessPart.Task;
+ }
+ });
+
+ mockDataHandler
+ .Setup(x => x.OnDownloadComplete(It.IsAny()));
+
+ var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
+ 3, partSize, totalObjectSize, "test-etag", usePartStrategy: false); // RANGE strategy
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
+ partSize: partSize,
+ downloadType: MultipartDownloadType.RANGE);
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(
+ concurrentRequests: concurrentRequests);
+
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpSemaphore);
+
+ var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+
+ // Act
+ var startTask = coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
+ await part1EnteredProcessPart.Task;
+
+ // Check semaphore state while Part 1 is in ProcessPartAsync
+ var semaphoreAvailableDuringProcessing = httpSemaphore.CurrentCount > 0;
+
+ part1CanExitProcessPart.SetResult(true);
+ await startTask;
+ await coordinator.DownloadCompletionTask;
+
+ // Assert
+ Assert.IsFalse(semaphoreAvailableDuringProcessing,
+ "RANGE strategy should also hold HTTP semaphore through ProcessPartAsync");
+
+ // Cleanup
+ httpSemaphore.Dispose();
+ }
+
+ #endregion
+
+ #region Semaphore Release Error Path Tests
+
+ [TestMethod]
+ public async Task StartDownloadsAsync_PrepareAsyncFails_ReleasesHttpSemaphore()
+ {
+ // Arrange - PrepareAsync fails but semaphore was acquired during discovery
+ var httpThrottler = new SemaphoreSlim(2, 2);
+ var initialCount = httpThrottler.CurrentCount;
+
+ var mockDataHandler = new Mock();
+
+ // WaitForCapacityAsync succeeds (buffer space available)
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // PrepareAsync fails BEFORE Part 1 processing
+ mockDataHandler
+ .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny()))
+ .ThrowsAsync(new InvalidOperationException("Simulated prepare failure"));
+
+ var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse(
+ 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag");
+
+ var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
+ (req, ct) => Task.FromResult(mockResponse));
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
+ downloadType: MultipartDownloadType.PART);
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
+
+ var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+
+ // After discovery, semaphore should have 1 slot held (2 total - 1 used = 1 available)
+ Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount,
+ "After discovery, semaphore should have 1 slot held");
+
+ // Act & Assert
+ try
+ {
+ await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
+ Assert.Fail("Expected InvalidOperationException to be thrown");
+ }
+ catch (InvalidOperationException ex)
+ {
+ Assert.AreEqual("Simulated prepare failure", ex.Message);
+ }
+
+ Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
+ "HTTP semaphore should be released when PrepareAsync fails");
+
+ // Cleanup
+ httpThrottler.Dispose();
+ }
+
+ [TestMethod]
+ public async Task StartDownloadsAsync_Part1ProcessingFails_ReleasesHttpSemaphore()
+ {
+ // Arrange - Test that finally block correctly releases semaphore when Part 1 processing fails
+ var httpThrottler = new SemaphoreSlim(2, 2);
+ var initialCount = httpThrottler.CurrentCount;
+
+ var mockDataHandler = new Mock();
+
+ // WaitForCapacityAsync succeeds
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // PrepareAsync succeeds
+ mockDataHandler
+ .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // ProcessPartAsync fails for Part 1
+ mockDataHandler
+ .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny()))
+ .ThrowsAsync(new InvalidOperationException("Simulated Part 1 processing failure"));
+
+ var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse(
+ 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag");
+
+ var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
+ (req, ct) => Task.FromResult(mockResponse));
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
+ downloadType: MultipartDownloadType.PART);
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
+
+ var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+
+ // After discovery, semaphore should have 1 slot held
+ Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount,
+ "After discovery, semaphore should have 1 slot held");
+
+ // Act & Assert
+ try
+ {
+ await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
+ Assert.Fail("Expected InvalidOperationException to be thrown");
+ }
+ catch (InvalidOperationException ex)
+ {
+ Assert.AreEqual("Simulated Part 1 processing failure", ex.Message);
+ }
+
+ // Assert - Finally block should release semaphore
+ Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
+ "HTTP semaphore should be released by finally block when Part 1 processing fails");
+
+ // Cleanup
+ httpThrottler.Dispose();
+ }
+
+ [TestMethod]
+ public async Task Discovery_WaitForCapacityFails_DoesNotReleaseHttpSemaphore()
+ {
+ // Arrange - Test that semaphore is NOT released when it was never acquired
+ var httpThrottler = new SemaphoreSlim(2, 2);
+ var initialCount = httpThrottler.CurrentCount;
+
+ var mockDataHandler = new Mock();
+
+ // WaitForCapacityAsync fails BEFORE HTTP semaphore is acquired
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .ThrowsAsync(new InvalidOperationException("Simulated capacity wait failure"));
+
+ var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
+
+ // Act & Assert
+ try
+ {
+ await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+ Assert.Fail("Expected InvalidOperationException to be thrown");
+ }
+ catch (InvalidOperationException ex)
+ {
+ Assert.AreEqual("Simulated capacity wait failure", ex.Message);
+ }
+
+ // Assert - Semaphore should NOT be released (it was never acquired)
+ Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
+ "HTTP semaphore should NOT be released when it was never acquired (failed before WaitAsync)");
+
+ // Cleanup
+ httpThrottler.Dispose();
+ }
+
+ [TestMethod]
+ public async Task StartDownloadsAsync_BackgroundPartHttpFails_ReleasesHttpSemaphore()
+ {
+ // Arrange - Test that background part download failures properly release semaphore
+ var totalParts = 3;
+ var partSize = 8 * 1024 * 1024;
+ var totalObjectSize = totalParts * partSize;
+
+ var httpThrottler = new SemaphoreSlim(2, 2);
+ var initialCount = httpThrottler.CurrentCount;
+
+ var mockDataHandler = new Mock();
+
+ // WaitForCapacityAsync succeeds for all parts
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // PrepareAsync succeeds
+ mockDataHandler
+ .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // ProcessPartAsync succeeds for Part 1, but not called for Part 2 (HTTP fails first)
+ mockDataHandler
+ .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // ReleaseCapacity is called on failure
+ mockDataHandler
+ .Setup(x => x.ReleaseCapacity());
+
+ mockDataHandler
+ .Setup(x => x.OnDownloadComplete(It.IsAny()));
+
+ var callCount = 0;
+ var mockClient = new Mock();
+ mockClient.Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny()))
+ .Returns(() =>
+ {
+ callCount++;
+ if (callCount == 1)
+ {
+ // Discovery call succeeds
+ return Task.FromResult(MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse(
+ partSize, totalParts, totalObjectSize, "test-etag"));
+ }
+ else
+ {
+ // Background part HTTP request fails
+ throw new InvalidOperationException("Simulated HTTP failure for background part");
+ }
+ });
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
+ downloadType: MultipartDownloadType.PART);
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1);
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
+
+ var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+
+ // After discovery, semaphore should have 1 slot held (for Part 1)
+ Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount,
+ "After discovery, semaphore should have 1 slot held");
+
+ // Act
+ await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
+
+ // Wait for background task to fail
+ try
+ {
+ await coordinator.DownloadCompletionTask;
+ }
+ catch (InvalidOperationException)
+ {
+ // Expected failure from background task
+ }
+
+ // Assert - Semaphore should be fully released (Part 1 released in StartDownloadsAsync,
+ // Parts 2 and 3 released in CreateDownloadTaskAsync catch blocks)
+ Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
+ "HTTP semaphore should be fully released after background part HTTP failure");
+
+ // Verify ReleaseCapacity was called twice (once for Part 2 that failed, once for Part 3 that got cancelled)
+ // With sequential capacity acquisition, Part 3 acquired capacity before Part 2's HTTP call failed
+ mockDataHandler.Verify(x => x.ReleaseCapacity(), Times.Exactly(2),
+ "ReleaseCapacity should be called for both Part 2 (failed) and Part 3 (cancelled after acquiring capacity)");
+
+ // Cleanup
+ httpThrottler.Dispose();
+ }
+
+ [TestMethod]
+ public async Task StartDownloadsAsync_BackgroundPartProcessingFails_ReleasesHttpSemaphore()
+ {
+ // Arrange - Test that background part ProcessPartAsync failures properly release semaphore
+ var totalParts = 3;
+ var partSize = 8 * 1024 * 1024;
+ var totalObjectSize = totalParts * partSize;
+
+ var httpThrottler = new SemaphoreSlim(2, 2);
+ var initialCount = httpThrottler.CurrentCount;
+
+ var mockDataHandler = new Mock();
+
+ // WaitForCapacityAsync succeeds for all parts
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // PrepareAsync succeeds
+ mockDataHandler
+ .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // ProcessPartAsync succeeds for Part 1, fails for Part 2
+ var processCallCount = 0;
+ mockDataHandler
+ .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .Returns((partNum, response, ct) =>
+ {
+ processCallCount++;
+ if (partNum == 1)
+ {
+ return Task.CompletedTask; // Part 1 succeeds
+ }
+ throw new InvalidOperationException($"Simulated processing failure for Part {partNum}");
+ });
+
+ // ReleaseCapacity is called on failure
+ mockDataHandler
+ .Setup(x => x.ReleaseCapacity());
+
+ mockDataHandler
+ .Setup(x => x.OnDownloadComplete(It.IsAny()));
+
+ var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
+ totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
+ downloadType: MultipartDownloadType.PART);
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1);
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
+
+ var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+
+ // After discovery, semaphore should have 1 slot held
+ Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount,
+ "After discovery, semaphore should have 1 slot held");
+
+ // Act
+ await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
+
+ // Wait for background task to fail
+ try
+ {
+ await coordinator.DownloadCompletionTask;
+ }
+ catch (InvalidOperationException)
+ {
+ // Expected failure from background task
+ }
+
+ // Assert - Semaphore should be fully released
+ Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
+ "HTTP semaphore should be fully released after background part processing failure");
+
+ // Verify ReleaseCapacity was called twice (once for Part 2 that failed, once for Part 3 that may have continued)
+ // With sequential capacity acquisition, Part 3 acquired capacity before Part 2's processing failed
+ mockDataHandler.Verify(x => x.ReleaseCapacity(), Times.Exactly(2),
+ "ReleaseCapacity should be called for both Part 2 (failed) and Part 3 (cancelled/failed after acquiring capacity)");
+
+ // Cleanup
+ httpThrottler.Dispose();
+ }
+
+ [TestMethod]
+ public async Task Discovery_HttpRequestAfterCapacityFails_ReleasesHttpSemaphore()
+ {
+ // Arrange - Test semaphore release when HTTP request fails after capacity is acquired
+ var httpThrottler = new SemaphoreSlim(2, 2);
+ var initialCount = httpThrottler.CurrentCount;
+
+ var mockDataHandler = new Mock();
+
+ // WaitForCapacityAsync succeeds (capacity acquired)
+ mockDataHandler
+ .Setup(x => x.WaitForCapacityAsync(It.IsAny()))
+ .Returns(Task.CompletedTask);
+
+ // HTTP request fails AFTER both capacity types are acquired
+ var mockClient = new Mock();
+ mockClient
+ .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny()))
+ .ThrowsAsync(new InvalidOperationException("Simulated S3 failure after capacity acquired"));
+
+ var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
+ var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
+ var coordinator = new MultipartDownloadManager(
+ mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler);
+
+ // Act & Assert
+ try
+ {
+ await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
+ Assert.Fail("Expected InvalidOperationException to be thrown");
+ }
+ catch (InvalidOperationException ex)
+ {
+ Assert.AreEqual("Simulated S3 failure after capacity acquired", ex.Message);
+ }
+
+ // Assert - HTTP semaphore should be released by catch block in discovery
+ Assert.AreEqual(initialCount, httpThrottler.CurrentCount,
+ "HTTP semaphore should be released when HTTP request fails in discovery");
+
+ // Cleanup
+ httpThrottler.Dispose();
+ }
+
+ #endregion
+
#region ContentRange and Part Range Calculation Tests
[TestMethod]