Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 296 additions & 4 deletions src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use http_body_util::BodyExt;
use reqwest::header::ToStrError;
use std::ops::Range;
use std::sync::Arc;
use tracing::info;
use tracing::{info, warn};

/// A client that can perform a get request
#[async_trait]
Expand Down Expand Up @@ -215,6 +215,11 @@ impl<T: GetClient> GetContext<T> {
},
// Retry all response body errors
(Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => {
// Full range already read; can't form valid 0-byte Range, return EOF
if range.start >= range.end {
return Ok(None);
}

let sleep = ctx.retry_ctx.backoff();
info!(
"Encountered error while reading response body: {}. Retrying in {}s",
Expand All @@ -237,13 +242,43 @@ impl<T: GetClient> GetContext<T> {
.map_err(Self::err)?;

let (parts, retry_body) = request.into_parts();

// Verify we got a 206 Partial Content response
if parts.status != StatusCode::PARTIAL_CONTENT {
warn!(
"Retry request for range {:?} returned {} instead of 206 Partial Content",
range, parts.status
);
// Return the original error
return Err(Self::err(e));
}

let retry_etag = get_etag(&parts.headers).map_err(Self::err)?;

if etag != &retry_etag {
// Return the original error
return Err(Self::err(e));
}

// Verify Content-Range matches our request
if let Ok(content_range) = parse_range(&parts.headers) {
if content_range.range != range {
warn!(
"Retry request for range {:?} returned Content-Range {:?}",
range, content_range.range
);
// Return the original error
return Err(Self::err(e));
}
} else {
warn!(
"Retry request for range {:?} missing or invalid Content-Range header",
range
);
// Return the original error
return Err(Self::err(e));
}

body = retry_body;
}
(Err(e), _) => return Err(Self::err(e)),
Expand Down Expand Up @@ -483,17 +518,22 @@ mod tests {
}
#[cfg(all(test, feature = "http", not(target_arch = "wasm32")))]
mod http_tests {
use super::{GetClient, GetClientExt};
use crate::client::header::HeaderConfig;
use crate::client::mock_server::MockServer;
use crate::client::{HttpError, HttpErrorKind, HttpResponseBody};
use crate::client::retry::{RetryContext, RetryExt};
use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpErrorKind, HttpResponseBody};
use crate::http::HttpBuilder;
use crate::path::Path;
use crate::{ClientOptions, ObjectStoreExt, RetryConfig};
use crate::{ClientOptions, GetOptions, ObjectStoreExt, Result, RetryConfig};
use async_trait::async_trait;
use bytes::Bytes;
use futures::FutureExt;
use http::header::{CONTENT_LENGTH, CONTENT_RANGE, ETAG, RANGE};
use http::{Response, StatusCode};
use http::{Method, Response, StatusCode};
use hyper::body::Frame;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
use std::time::Duration;

Expand Down Expand Up @@ -548,6 +588,118 @@ mod http_tests {
}
}

/// A minimal GetClient for testing, similar in structure to the cloud store clients.
/// Can optionally inject an error after N bytes have been read.
struct TestGetClient {
client: HttpClient,
base_url: String,
retry_config: RetryConfig,
/// If set, inject an error after this many bytes
error_after_bytes: Option<usize>,
}

impl TestGetClient {
async fn new(base_url: String, retry_config: RetryConfig) -> Self {
let client = HttpClient::new(reqwest::Client::new());
Self {
client,
base_url,
retry_config,
error_after_bytes: None,
}
}

fn with_error_after_bytes(mut self, n: usize) -> Self {
self.error_after_bytes = Some(n);
self
}
}

/// Wraps a body and injects an error after N bytes.
/// Simulates client-side errors (like reqwest timeouts) that can occur
/// even after full Content-Length has been received.
struct ErrorAfterBody {
inner: HttpResponseBody,
remaining: usize,
}

impl hyper::body::Body for ErrorAfterBody {
type Data = Bytes;
type Error = HttpError;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.remaining == 0 {
return Poll::Ready(Some(Err(HttpError::new(
HttpErrorKind::Unknown,
ChunkedErr {},
))));
}

let inner = Pin::new(&mut self.inner);
match ready!(inner.poll_frame(cx)) {
None => Poll::Ready(None),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
Some(Ok(frame)) => {
if let Some(data) = frame.data_ref() {
self.remaining = self.remaining.saturating_sub(data.len());
}
Poll::Ready(Some(Ok(frame)))
}
}
}
}

#[async_trait]
impl GetClient for TestGetClient {
const STORE: &'static str = "TEST";
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
user_defined_metadata_prefix: None,
};

fn retry_config(&self) -> &RetryConfig {
&self.retry_config
}

async fn get_request(
&self,
ctx: &mut RetryContext,
path: &Path,
options: GetOptions,
) -> Result<crate::client::HttpResponse> {
let url = format!("{}/{}", self.base_url, path);
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
};

let response = self
.client
.request(method, url)
.with_get_options(options)
.retryable_request()
.send(ctx)
.await
.map_err(|e| e.error("TEST", path.to_string()))?;

if let Some(n) = self.error_after_bytes {
let (parts, body) = response.into_parts();
let wrapped = ErrorAfterBody {
inner: body,
remaining: n,
};
Ok(http::Response::from_parts(parts, HttpResponseBody::new(wrapped)))
} else {
Ok(response)
}
}
}

#[tokio::test]
async fn test_stream_retry() {
let mock = MockServer::new().await;
Expand Down Expand Up @@ -695,4 +847,144 @@ mod http_tests {
"Generic HTTP error: HTTP error: request or response body error"
);
}

// Should detect 200 instead of 206 on range retry
#[tokio::test]
async fn test_retry_validates_partial_content_status() {
let mock = MockServer::new().await;
let retry = RetryConfig {
backoff: Default::default(),
max_retries: 3,
retry_timeout: Duration::from_secs(1000),
};

let client = Arc::new(TestGetClient::new(mock.url().to_string(), retry).await);
let path = Path::from("test");

// Deliver partial data then error
mock.push(
Response::builder()
.header(CONTENT_LENGTH, 10)
.header(ETAG, "test-etag")
.body(Chunked::new(vec![
Ok(Bytes::from_static(b"hello")),
Err(()),
]))
.unwrap(),
);

// Server ignores Range and returns 200 OK instead of 206
mock.push_fn(|req| {
assert_eq!(
req.headers().get(RANGE).unwrap().to_str().unwrap(),
"bytes=5-9"
);

Response::builder()
.status(StatusCode::OK)
.header(CONTENT_LENGTH, 10)
.header(ETAG, "test-etag")
.body("helloworld".to_string())
.unwrap()
});

// Should fail - returns original error since retry got 200 instead of 206
client
.get_opts(&path, GetOptions::default())
.await
.unwrap()
.bytes()
.await
.unwrap_err();
}

// Should not retry when all bytes delivered before error
#[tokio::test]
async fn test_no_retry_when_complete() {
let mock = MockServer::new().await;
let retry = RetryConfig {
backoff: Default::default(),
max_retries: 3,
retry_timeout: Duration::from_secs(1000),
};

// Inject error after 5 bytes (the full content)
let client = Arc::new(
TestGetClient::new(mock.url().to_string(), retry)
.await
.with_error_after_bytes(5),
);
let path = Path::from("test");

// Server returns 5 bytes
mock.push(
Response::builder()
.header(CONTENT_LENGTH, 5)
.header(ETAG, "test-etag")
.body("hello".to_string())
.unwrap(),
);

// Error after Content-Length bytes should not trigger retry
let result = client
.get_opts(&path, GetOptions::default())
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(result.as_ref(), b"hello");
}

// Should detect mismatched Content-Range header
#[tokio::test]
async fn test_retry_validates_content_range_header() {
let mock = MockServer::new().await;
let retry = RetryConfig {
backoff: Default::default(),
max_retries: 3,
retry_timeout: Duration::from_secs(1000),
};

let options = ClientOptions::new().with_allow_http(true);
let store = HttpBuilder::new()
.with_client_options(options)
.with_retry(retry)
.with_url(mock.url())
.build()
.unwrap();

let path = Path::from("test");

// Deliver partial data then error
mock.push(
Response::builder()
.header(CONTENT_LENGTH, 10)
.header(ETAG, "test-etag")
.body(Chunked::new(vec![
Ok(Bytes::from_static(b"hello")),
Err(()),
]))
.unwrap(),
);

// Server returns 206 but with wrong Content-Range
mock.push_fn(|req| {
assert_eq!(
req.headers().get(RANGE).unwrap().to_str().unwrap(),
"bytes=5-9"
);

Response::builder()
.status(StatusCode::PARTIAL_CONTENT)
.header(CONTENT_LENGTH, 5)
.header(ETAG, "test-etag")
.header(CONTENT_RANGE, "bytes 0-4/10")
.body("hello".to_string())
.unwrap()
});

// Returns original error since Content-Range doesn't match requested range
store.get(&path).await.unwrap().bytes().await.unwrap_err();
}
}
Loading