From cfc56c6e6b3378eb412d037c0463410a76c71690 Mon Sep 17 00:00:00 2001 From: Marcus Ewert Date: Thu, 15 Jan 2026 16:01:14 -0800 Subject: [PATCH 1/3] Do not retry if the full file has already been returned - Add defensive checks + warning logs ensuring that partial range requests receive partial range responses (206 status + the expected content_range) --- src/client/get.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/client/get.rs b/src/client/get.rs index cefb07b1..61baf2df 100644 --- a/src/client/get.rs +++ b/src/client/get.rs @@ -36,7 +36,7 @@ use http_body_util::BodyExt; use reqwest::header::ToStrError; use std::ops::Range; use std::sync::Arc; -use tracing::info; +use tracing::{info, warn}; /// A client that can perform a get request #[async_trait] @@ -215,6 +215,11 @@ impl GetContext { }, // Retry all response body errors (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => { + // Full range already read; can't form valid 0-byte Range, return EOF + if range.start >= range.end { + return Ok(None); + } + let sleep = ctx.retry_ctx.backoff(); info!( "Encountered error while reading response body: {}. Retrying in {}s", @@ -237,6 +242,17 @@ impl GetContext { .map_err(Self::err)?; let (parts, retry_body) = request.into_parts(); + + // Verify we got a 206 Partial Content response + if parts.status != StatusCode::PARTIAL_CONTENT { + warn!( + "Retry request for range {:?} returned {} instead of 206 Partial Content", + range, parts.status + ); + // Return the original error + return Err(Self::err(e)); + } + let retry_etag = get_etag(&parts.headers).map_err(Self::err)?; if etag != &retry_etag { @@ -244,6 +260,25 @@ impl GetContext { return Err(Self::err(e)); } + // Verify Content-Range matches our request + if let Ok(content_range) = parse_range(&parts.headers) { + if content_range.range != range { + warn!( + "Retry request for range {:?} returned Content-Range {:?}", + range, content_range.range + ); + // Return the original error + return Err(Self::err(e)); + } + } else { + warn!( + "Retry request for range {:?} missing or invalid Content-Range header", + range + ); + // Return the original error + return Err(Self::err(e)); + } + body = retry_body; } (Err(e), _) => return Err(Self::err(e)), From 6e17d91d806fcb81b895fbd9aee7671b8014462f Mon Sep 17 00:00:00 2001 From: Marcus Ewert Date: Sun, 18 Jan 2026 14:07:08 -0800 Subject: [PATCH 2/3] Add test for defensive check --- src/client/get.rs | 102 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/src/client/get.rs b/src/client/get.rs index 61baf2df..ae3dfa6e 100644 --- a/src/client/get.rs +++ b/src/client/get.rs @@ -730,4 +730,106 @@ mod http_tests { "Generic HTTP error: HTTP error: request or response body error" ); } + + // Should detect 200 instead of 206 on range retry + #[tokio::test] + async fn test_retry_validates_partial_content_status() { + let mock = MockServer::new().await; + let retry = RetryConfig { + backoff: Default::default(), + max_retries: 3, + retry_timeout: Duration::from_secs(1000), + }; + + let options = ClientOptions::new().with_allow_http(true); + let store = HttpBuilder::new() + .with_client_options(options) + .with_retry(retry) + .with_url(mock.url()) + .build() + .unwrap(); + + let path = Path::from("test"); + + // Deliver partial data then error + mock.push( + Response::builder() + .header(CONTENT_LENGTH, 10) + .header(ETAG, "test-etag") + .body(Chunked::new(vec![ + Ok(Bytes::from_static(b"hello")), + Err(()), + ])) + .unwrap(), + ); + + // Server ignores Range and returns 200 OK instead of 206 + mock.push_fn(|req| { + assert_eq!( + req.headers().get(RANGE).unwrap().to_str().unwrap(), + "bytes=5-9" + ); + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_LENGTH, 10) + .header(ETAG, "test-etag") + .body("helloworld".to_string()) + .unwrap() + }); + + store.get(&path).await.unwrap().bytes().await.unwrap_err(); + } + + // Should detect mismatched Content-Range header + #[tokio::test] + async fn test_retry_validates_content_range_header() { + let mock = MockServer::new().await; + let retry = RetryConfig { + backoff: Default::default(), + max_retries: 3, + retry_timeout: Duration::from_secs(1000), + }; + + let options = ClientOptions::new().with_allow_http(true); + let store = HttpBuilder::new() + .with_client_options(options) + .with_retry(retry) + .with_url(mock.url()) + .build() + .unwrap(); + + let path = Path::from("test"); + + // Deliver partial data then error + mock.push( + Response::builder() + .header(CONTENT_LENGTH, 10) + .header(ETAG, "test-etag") + .body(Chunked::new(vec![ + Ok(Bytes::from_static(b"hello")), + Err(()), + ])) + .unwrap(), + ); + + // Server returns 206 but with wrong Content-Range + mock.push_fn(|req| { + assert_eq!( + req.headers().get(RANGE).unwrap().to_str().unwrap(), + "bytes=5-9" + ); + + Response::builder() + .status(StatusCode::PARTIAL_CONTENT) + .header(CONTENT_LENGTH, 5) + .header(ETAG, "test-etag") + .header(CONTENT_RANGE, "bytes 0-4/10") + .body("hello".to_string()) + .unwrap() + }); + + // Returns original error since Content-Range doesn't match requested range + store.get(&path).await.unwrap().bytes().await.unwrap_err(); + } } From 3581c5fff3f1ad9a20520e44f938790c428f118f Mon Sep 17 00:00:00 2001 From: Marcus Ewert Date: Mon, 19 Jan 2026 15:22:32 -0800 Subject: [PATCH 3/3] more elaborate tests --- src/client/get.rs | 179 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 167 insertions(+), 12 deletions(-) diff --git a/src/client/get.rs b/src/client/get.rs index ae3dfa6e..0e97380b 100644 --- a/src/client/get.rs +++ b/src/client/get.rs @@ -518,17 +518,22 @@ mod tests { } #[cfg(all(test, feature = "http", not(target_arch = "wasm32")))] mod http_tests { + use super::{GetClient, GetClientExt}; + use crate::client::header::HeaderConfig; use crate::client::mock_server::MockServer; - use crate::client::{HttpError, HttpErrorKind, HttpResponseBody}; + use crate::client::retry::{RetryContext, RetryExt}; + use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpErrorKind, HttpResponseBody}; use crate::http::HttpBuilder; use crate::path::Path; - use crate::{ClientOptions, ObjectStoreExt, RetryConfig}; + use crate::{ClientOptions, GetOptions, ObjectStoreExt, Result, RetryConfig}; + use async_trait::async_trait; use bytes::Bytes; use futures::FutureExt; use http::header::{CONTENT_LENGTH, CONTENT_RANGE, ETAG, RANGE}; - use http::{Response, StatusCode}; + use http::{Method, Response, StatusCode}; use hyper::body::Frame; use std::pin::Pin; + use std::sync::Arc; use std::task::{Context, Poll, ready}; use std::time::Duration; @@ -583,6 +588,118 @@ mod http_tests { } } + /// A minimal GetClient for testing, similar in structure to the cloud store clients. + /// Can optionally inject an error after N bytes have been read. + struct TestGetClient { + client: HttpClient, + base_url: String, + retry_config: RetryConfig, + /// If set, inject an error after this many bytes + error_after_bytes: Option, + } + + impl TestGetClient { + async fn new(base_url: String, retry_config: RetryConfig) -> Self { + let client = HttpClient::new(reqwest::Client::new()); + Self { + client, + base_url, + retry_config, + error_after_bytes: None, + } + } + + fn with_error_after_bytes(mut self, n: usize) -> Self { + self.error_after_bytes = Some(n); + self + } + } + + /// Wraps a body and injects an error after N bytes. + /// Simulates client-side errors (like reqwest timeouts) that can occur + /// even after full Content-Length has been received. + struct ErrorAfterBody { + inner: HttpResponseBody, + remaining: usize, + } + + impl hyper::body::Body for ErrorAfterBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + if self.remaining == 0 { + return Poll::Ready(Some(Err(HttpError::new( + HttpErrorKind::Unknown, + ChunkedErr {}, + )))); + } + + let inner = Pin::new(&mut self.inner); + match ready!(inner.poll_frame(cx)) { + None => Poll::Ready(None), + Some(Err(e)) => Poll::Ready(Some(Err(e))), + Some(Ok(frame)) => { + if let Some(data) = frame.data_ref() { + self.remaining = self.remaining.saturating_sub(data.len()); + } + Poll::Ready(Some(Ok(frame))) + } + } + } + } + + #[async_trait] + impl GetClient for TestGetClient { + const STORE: &'static str = "TEST"; + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: false, + last_modified_required: false, + version_header: None, + user_defined_metadata_prefix: None, + }; + + fn retry_config(&self) -> &RetryConfig { + &self.retry_config + } + + async fn get_request( + &self, + ctx: &mut RetryContext, + path: &Path, + options: GetOptions, + ) -> Result { + let url = format!("{}/{}", self.base_url, path); + let method = match options.head { + true => Method::HEAD, + false => Method::GET, + }; + + let response = self + .client + .request(method, url) + .with_get_options(options) + .retryable_request() + .send(ctx) + .await + .map_err(|e| e.error("TEST", path.to_string()))?; + + if let Some(n) = self.error_after_bytes { + let (parts, body) = response.into_parts(); + let wrapped = ErrorAfterBody { + inner: body, + remaining: n, + }; + Ok(http::Response::from_parts(parts, HttpResponseBody::new(wrapped))) + } else { + Ok(response) + } + } + } + #[tokio::test] async fn test_stream_retry() { let mock = MockServer::new().await; @@ -741,14 +858,7 @@ mod http_tests { retry_timeout: Duration::from_secs(1000), }; - let options = ClientOptions::new().with_allow_http(true); - let store = HttpBuilder::new() - .with_client_options(options) - .with_retry(retry) - .with_url(mock.url()) - .build() - .unwrap(); - + let client = Arc::new(TestGetClient::new(mock.url().to_string(), retry).await); let path = Path::from("test"); // Deliver partial data then error @@ -778,7 +888,52 @@ mod http_tests { .unwrap() }); - store.get(&path).await.unwrap().bytes().await.unwrap_err(); + // Should fail - returns original error since retry got 200 instead of 206 + client + .get_opts(&path, GetOptions::default()) + .await + .unwrap() + .bytes() + .await + .unwrap_err(); + } + + // Should not retry when all bytes delivered before error + #[tokio::test] + async fn test_no_retry_when_complete() { + let mock = MockServer::new().await; + let retry = RetryConfig { + backoff: Default::default(), + max_retries: 3, + retry_timeout: Duration::from_secs(1000), + }; + + // Inject error after 5 bytes (the full content) + let client = Arc::new( + TestGetClient::new(mock.url().to_string(), retry) + .await + .with_error_after_bytes(5), + ); + let path = Path::from("test"); + + // Server returns 5 bytes + mock.push( + Response::builder() + .header(CONTENT_LENGTH, 5) + .header(ETAG, "test-etag") + .body("hello".to_string()) + .unwrap(), + ); + + // Error after Content-Length bytes should not trigger retry + let result = client + .get_opts(&path, GetOptions::default()) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(result.as_ref(), b"hello"); } // Should detect mismatched Content-Range header