From f5b3811053e37c9c86bd6892c0a813b581a5fe35 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Mon, 9 Mar 2026 18:42:38 +0200 Subject: [PATCH 1/9] perf: optimize the json newline scanning This is an alternative approach to https://github.com/apache/datafusion/pull/19687 Instead of reading the entire range in the json FileOpener, implement an AlignedBoundaryStream which scans the range for newlines as the FileStream requests data from the stream, by wrapping the original stream returned by the ObjectStore. This eliminated the overhead of the extra two get_opts requests needed by calculate_range and more importantly, it allows for efficient read-ahead implementations by the underlying ObjectStore. Previously this was inefficient because the streams opened by calculate_range included a stream from (start - 1) to file_size and another one from (end - 1) to end_of_file, just to find the two relevant newlines. --- Cargo.lock | 1 + Cargo.toml | 2 +- datafusion/datasource-json/Cargo.toml | 1 + .../datasource-json/src/boundary_stream.rs | 867 ++++++++++++++++++ datafusion/datasource-json/src/mod.rs | 34 + datafusion/datasource-json/src/source.rs | 299 +++++- 6 files changed, 1176 insertions(+), 28 deletions(-) create mode 100644 datafusion/datasource-json/src/boundary_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 8101e2fe22d92..852411bf23e41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2065,6 +2065,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 08d585d3ef906..38135b0e684e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,7 +185,7 @@ strum = "0.28.0" strum_macros = "0.28.0" tempfile = "3" testcontainers-modules = { version = "0.15" } -tokio = { version = "1.48", features = ["macros", "rt", "sync"] } +tokio = { version = "1.48", features = ["macros", "rt", "sync", "fs"] } tokio-stream = "0.1" tokio-util = "0.7" url = "2.5.7" diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index b5947ea5c4c67..95f1d218948ad 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -46,6 +46,7 @@ futures = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } +tokio-util = { workspace = true, features = ["io"] } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs new file mode 100644 index 0000000000000..ceeb17203ffa2 --- /dev/null +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -0,0 +1,867 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Streaming boundary-aligned wrapper for newline-delimited JSON range reads. +//! +//! [`AlignedBoundaryStream`] wraps a raw byte stream and lazily aligns to +//! record (newline) boundaries, avoiding the need for separate `get_opts` +//! calls to locate boundary positions. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::stream::{BoxStream, Stream}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; + +/// How far past `raw_end` the initial bounded fetch covers. If the terminating +/// newline is not found within this window, `ScanningLastTerminator` issues +/// successive same-sized GETs until the newline is located or EOF is reached. +pub const END_SCAN_LOOKAHEAD: u64 = 16 * 1024; // 16 KiB + +/// Phase of the boundary alignment state machine. +#[derive(Debug)] +enum Phase { + /// Scanning for the first newline to align the start boundary. + ScanningFirstTerminator, + /// Passing through aligned data, tracking byte position. + FetchingChunks, + /// Past the end boundary, scanning for terminating newline. + ScanningLastTerminator, + /// Stream is exhausted. + Done, +} + +/// A stream wrapper that lazily aligns byte boundaries to newline characters. +/// +/// Given a raw byte stream starting from `fetch_start` (which is `start - 1` +/// for non-zero starts, or `0`), this stream: +/// +/// 1. Skips bytes until the first newline is found (start alignment) +/// 2. Passes through data until the `end` boundary is reached +/// 3. Continues past `end` to find the terminating newline (end alignment) +/// +/// When the initial byte stream is exhausted during step 3 and the file has +/// not been fully read, `ScanningLastTerminator` issues additional bounded +/// `get_opts` calls (`END_SCAN_LOOKAHEAD` bytes each) until the newline is +/// found or EOF is reached. +pub struct AlignedBoundaryStream { + inner: BoxStream<'static, object_store::Result>, + terminator: u8, + /// Effective end boundary. Set to `u64::MAX` when `end >= file_size` + /// (last partition), so `FetchingChunks` never transitions to + /// `ScanningLastTerminator` and simply streams to EOF. + end: u64, + /// Cumulative bytes consumed from `inner` (relative to `fetch_start`). + bytes_consumed: u64, + /// The offset where the current `inner` stream begins. + fetch_start: u64, + phase: Phase, + /// Remainder bytes from `ScanningFirstTerminator` that still need + /// end-boundary processing. Consumed by `FetchingChunks` before polling + /// `inner`. + pending: Option, + store: Arc, + location: object_store::path::Path, + /// Total file size; overflow stops when `abs_pos() >= file_size`. + file_size: u64, +} + +/// Fetch a bounded byte range from `store` and return it as a stream, +/// handling both `File` and `Stream` payloads. For `File` payloads the +/// underlying file is seeked to `range.start` before streaming. +async fn get_stream( + store: Arc, + location: object_store::path::Path, + range: std::ops::Range, +) -> object_store::Result>> { + let opts = GetOptions { + range: Some(GetRange::Bounded(range.clone())), + ..Default::default() + }; + let result = store.get_opts(&location, opts).await?; + let stream = match result.payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(file, _) => { + use std::io::SeekFrom; + use tokio::io::AsyncSeekExt; + let mut tokio_file = tokio::fs::File::from_std(file); + tokio_file + .seek(SeekFrom::Start(range.start)) + .await + .map_err(|e| object_store::Error::Generic { + store: "local", + source: Box::new(e), + })?; + tokio_util::io::ReaderStream::new(tokio::io::BufReader::new(tokio_file)) + .map_err(|e| object_store::Error::Generic { + store: "local", + source: Box::new(e), + }) + .boxed() + } + GetResultPayload::Stream(s) => s.boxed(), + }; + Ok(stream) +} + +impl AlignedBoundaryStream { + /// Open a ranged byte stream from `store` and return a ready-to-poll + /// `AlignedBoundaryStream`. + /// + /// Issues a single bounded `get_opts` call covering + /// `[fetch_start, raw_end + END_SCAN_LOOKAHEAD)`. If the terminating + /// newline is not found within that window, `ScanningLastTerminator` + /// automatically issues additional `END_SCAN_LOOKAHEAD`-sized GETs + /// via `store` until the newline is found or EOF is reached. + pub async fn new( + store: Arc, + location: object_store::path::Path, + raw_start: u64, + raw_end: u64, + file_size: u64, + terminator: u8, + ) -> object_store::Result { + if raw_start >= raw_end { + return Ok(Self { + inner: futures::stream::empty().boxed(), + terminator, + end: 0, + bytes_consumed: 0, + fetch_start: 0, + phase: Phase::Done, + pending: None, + store, + location, + file_size, + }); + } + + let fetch_start = if raw_start > 0 { raw_start - 1 } else { 0 }; + let initial_fetch_end = raw_end.saturating_add(END_SCAN_LOOKAHEAD).min(file_size); + + let inner = get_stream( + Arc::clone(&store), + location.clone(), + fetch_start..initial_fetch_end, + ) + .await?; + + let (fetch_start, phase) = if raw_start == 0 { + (0, Phase::FetchingChunks) + } else { + (raw_start - 1, Phase::ScanningFirstTerminator) + }; + + // Last partition reads to EOF — no end-boundary scanning needed. + let end = if raw_end >= file_size { + u64::MAX + } else { + raw_end + }; + + Ok(Self { + inner, + terminator, + end, + bytes_consumed: 0, + fetch_start, + phase, + pending: None, + store, + location, + file_size, + }) + } + + /// Current absolute position in the file. + fn abs_pos(&self) -> u64 { + self.fetch_start + self.bytes_consumed + } +} + +impl Stream for AlignedBoundaryStream { + type Item = object_store::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.phase { + Phase::Done => return Poll::Ready(None), + + Phase::ScanningFirstTerminator => { + // Find the first terminator and skip everything up to + // and including it. Store any remainder in `pending` + // so `FetchingChunks` can apply end-boundary logic to it. + match this.inner.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + this.phase = Phase::Done; + return Poll::Ready(None); + } + Poll::Ready(Some(Err(e))) => { + this.phase = Phase::Done; + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(Some(Ok(chunk))) => { + this.bytes_consumed += chunk.len() as u64; + match chunk.iter().position(|&b| b == this.terminator) { + Some(pos) => { + let remainder = chunk.slice((pos + 1)..); + // The aligned start position is where + // data begins after the newline. + let aligned_start = + this.abs_pos() - remainder.len() as u64; + if aligned_start >= this.end { + // Start alignment landed at or past + // the end boundary — no complete + // lines in this partition's range. + this.phase = Phase::Done; + return Poll::Ready(None); + } + if !remainder.is_empty() { + this.pending = Some(remainder); + } + this.phase = Phase::FetchingChunks; + continue; + } + None => continue, + } + } + } + } + + Phase::FetchingChunks => { + // Get the next chunk: pending remainder or inner stream. + let chunk = if let Some(pending) = this.pending.take() { + pending + } else { + match this.inner.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + this.phase = Phase::Done; + return Poll::Ready(None); + } + Poll::Ready(Some(Err(e))) => { + this.phase = Phase::Done; + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(Some(Ok(chunk))) => { + this.bytes_consumed += chunk.len() as u64; + chunk + } + } + }; + + let pos_after = this.abs_pos(); + + // When end == u64::MAX (last partition), this is always + // true and we stream straight through to EOF. + if pos_after < this.end { + return Poll::Ready(Some(Ok(chunk))); + } + + if pos_after == this.end { + // Chunk ends exactly at the boundary. + if chunk.last() == Some(&this.terminator) { + this.phase = Phase::Done; + } else { + // No terminator at boundary; any following data + // is past end, so switch to end-scanning. + this.phase = Phase::ScanningLastTerminator; + } + return Poll::Ready(Some(Ok(chunk))); + } + + // Chunk crosses the end boundary (`pos_after > this.end`). + // Find the first terminator at or after file position + // `this.end - 1` and yield everything up to and + // including it. + // + // `pos_before` is the absolute file position of chunk[0]. + // `chunk_in_range_len` is how many bytes of this chunk + // fall within [pos_before, this.end), so chunk[0.. + // chunk_in_range_len] is the in-range portion. + // `search_from` is the chunk index of the last in-range + // byte (file position this.end - 1). + // + // Example A: "line1\nline2\nline3\n" (18 bytes), end=8, + // one large chunk arriving with pos_after=18: + // pos_before = 18 - 18 = 0 + // chunk_in_range_len = 8 - 0 = 8 + // search_from = 7 (chunk[7] is file pos 7) + // chunk[7]='i', chunk[11]='\n' → rel=4 + // yield chunk[..7+4+1] = chunk[..12] = "line1\nline2\n" + // + // Example B: same data, 3-byte chunks, end=8. + // "lin"(pos 0-2) and "e1\n"(pos 3-5) yielded already. + // Now chunk="lin" arrives with pos_after=9: + // pos_before = 9 - 3 = 6 + // chunk_in_range_len = 8 - 6 = 2 + // search_from = 1 (chunk[1] is file pos 7) + // chunk[1]='i', no '\n' in chunk[1..] → EndScan + let pos_before = pos_after - chunk.len() as u64; + let chunk_in_range_len = (this.end - pos_before) as usize; + let search_from = chunk_in_range_len - 1; + if let Some(rel) = chunk[search_from..] + .iter() + .position(|&b| b == this.terminator) + { + this.phase = Phase::Done; + return Poll::Ready(Some(Ok( + chunk.slice(..search_from + rel + 1) + ))); + } + + // No terminator found; continue scanning in EndScan. + this.phase = Phase::ScanningLastTerminator; + return Poll::Ready(Some(Ok(chunk))); + } + + Phase::ScanningLastTerminator => { + match this.inner.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + // Inner exhausted. Issue the next overflow GET if + // the file has not been fully read yet. + let pos = this.abs_pos(); + if pos < this.file_size { + let fetch_end = pos + .saturating_add(END_SCAN_LOOKAHEAD) + .min(this.file_size); + let store = Arc::clone(&this.store); + let location = this.location.clone(); + this.inner = get_stream(store, location, pos..fetch_end) + .try_flatten_stream() + .boxed(); + continue; + } + this.phase = Phase::Done; + return Poll::Ready(None); + } + Poll::Ready(Some(Err(e))) => { + this.phase = Phase::Done; + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(Some(Ok(chunk))) => { + this.bytes_consumed += chunk.len() as u64; + if let Some(pos) = + chunk.iter().position(|&b| b == this.terminator) + { + this.phase = Phase::Done; + return Poll::Ready(Some(Ok(chunk.slice(..pos + 1)))); + } + // No terminator yet; yield and keep scanning. + return Poll::Ready(Some(Ok(chunk))); + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{CHUNK_SIZES, make_chunked_store}; + + async fn collect_stream(stream: AlignedBoundaryStream) -> Vec { + let chunks: Vec = stream.try_collect().await.unwrap(); + chunks.into_iter().flat_map(|b| b.to_vec()).collect() + } + + #[tokio::test] + async fn test_start_at_zero_no_end_scan() { + // start=0, end >= file_size → pass through everything + static DATA: &[u8] = b"line1\nline2\nline3\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 0, 100, 18, b'\n') + .await + .unwrap(); + assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_start_aligned_on_newline() { + // Data: "line1\nline2\nline3\n" + // 0 5 6 11 12 17 + // start=6 → fetch_start=5. Byte at offset 5 is '\n'. + // Should skip the leading '\n' and yield "line2\nline3\n". + static DATA: &[u8] = b"line1\nline2\nline3\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 6, 100, 18, b'\n') + .await + .unwrap(); + assert_eq!( + collect_stream(s).await, + b"line2\nline3\n", + "chunk_size={cs}" + ); + } + } + + #[tokio::test] + async fn test_start_mid_line() { + // start=3, fetch_start=2. Bytes from offset 2: "ne1\nline2\nline3\n". + // Should skip "ne1\n" and yield "line2\nline3\n". + static DATA: &[u8] = b"line1\nline2\nline3\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 3, 100, 18, b'\n') + .await + .unwrap(); + assert_eq!( + collect_stream(s).await, + b"line2\nline3\n", + "chunk_size={cs}" + ); + } + } + + #[tokio::test] + async fn test_end_boundary_mid_line() { + // Data: "line1\nline2\nline3\n" + // 0 5 6 11 12 17 + // start=0, end=8. End is mid "line2". + // Should yield "line1\nline2\n" (continue past end to find newline). + static DATA: &[u8] = b"line1\nline2\nline3\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 0, 8, 18, b'\n') + .await + .unwrap(); + assert_eq!( + collect_stream(s).await, + b"line1\nline2\n", + "chunk_size={cs}" + ); + } + } + + #[tokio::test] + async fn test_end_at_eof() { + // end >= file_size → no end scanning, pass through everything. + static DATA: &[u8] = b"line1\nline2\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 0, 12, 12, b'\n') + .await + .unwrap(); + assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_no_newline_in_range() { + // start=2, fetch_start=1. Bytes from offset 1: "bcdef" — no newline. + // No complete line → empty output. + static DATA: &[u8] = b"abcdef"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 2, 6, 6, b'\n') + .await + .unwrap(); + assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_start_and_end_alignment() { + // Data: "line1\nline2\nline3\nline4\n" + // 0 5 6 11 12 17 18 23 + // start=3, end=14, file_size=24 + // fetch_start=2, bytes from offset 2: "ne1\nline2\nline3\nline4\n" + // Start aligns past "ne1\n"; end=14 is mid "line3", scan to '\n'. + // Expected: "line2\nline3\n" + static DATA: &[u8] = b"line1\nline2\nline3\nline4\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 3, 14, 24, b'\n') + .await + .unwrap(); + assert_eq!( + collect_stream(s).await, + b"line2\nline3\n", + "chunk_size={cs}" + ); + } + } + + #[tokio::test] + async fn test_end_scan_across_chunks() { + // end boundary falls before a newline; the terminating newline must be + // found by scanning past the end in subsequent chunks. + // Data: "line1\nline2\nline3\n" (18 bytes) + // start=0, end=7 (mid "line2"), file_size=18 → "line1\nline2\n" + static DATA: &[u8] = b"line1\nline2\nline3\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 0, 7, 18, b'\n') + .await + .unwrap(); + assert_eq!( + collect_stream(s).await, + b"line1\nline2\n", + "chunk_size={cs}" + ); + } + } + + #[tokio::test] + async fn test_empty_range() { + // start >= end — no complete line can exist, regardless of data. + static DATA: &[u8] = b"line1\nline2\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + + // start > end (non-zero start) + let s = AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 10, + 5, + 20, + b'\n', + ) + .await + .unwrap(); + assert!( + collect_stream(s).await.is_empty(), + "start>end chunk_size={cs}" + ); + + // start == end == 0 (zero start, previously unguarded) + let s = AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 0, + 0, + 12, + b'\n', + ) + .await + .unwrap(); + assert!( + collect_stream(s).await.is_empty(), + "start==end==0 chunk_size={cs}" + ); + + // start == end (non-zero) + let s = AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 6, + 6, + 12, + b'\n', + ) + .await + .unwrap(); + assert!( + collect_stream(s).await.is_empty(), + "start==end==6 chunk_size={cs}" + ); + } + } + + #[tokio::test] + async fn test_start_align_across_chunks() { + // The newline needed for start alignment may arrive in any chunk. + // fetch_start=0 (start=1). Data: "abcdef\nline2\n" (13 bytes) + // Start aligns past "abcdef\n", yielding "line2\n". + static DATA: &[u8] = b"abcdef\nline2\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 1, 100, 13, b'\n') + .await + .unwrap(); + assert_eq!(collect_stream(s).await, b"line2\n", "chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_end_aligned_on_newline() { + // end falls right on a newline — line is complete, no end-scan needed. + // Data: "line1\nline2\nline3\n" + // 0 5 6 11 12 17 + // start=0, end=6 → byte 5 is '\n' → yield only "line1\n". + static DATA: &[u8] = b"line1\nline2\nline3\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 0, 6, 18, b'\n') + .await + .unwrap(); + assert_eq!(collect_stream(s).await, b"line1\n", "chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_adjacent_partitions_no_overlap() { + // Three adjacent partitions over "line1\nline2\nline3\n". + // Partition 1: [0, 6), fetch_start=0 → stream full file + // Partition 2: [6, 12), fetch_start=5 → stream from offset 5 + // Partition 3: [12, 18), fetch_start=11 → stream from offset 11 + static DATA: &[u8] = b"line1\nline2\nline3\n"; // 18 bytes + + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let r1 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 0, + 6, + 18, + b'\n', + ) + .await + .unwrap(), + ) + .await; + let r2 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 6, + 12, + 18, + b'\n', + ) + .await + .unwrap(), + ) + .await; + let r3 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 12, + 18, + 18, + b'\n', + ) + .await + .unwrap(), + ) + .await; + + assert_eq!(r1, b"line1\n", "p1 chunk_size={cs}"); + assert_eq!(r2, b"line2\n", "p2 chunk_size={cs}"); + assert_eq!(r3, b"line3\n", "p3 chunk_size={cs}"); + + let mut combined = r1; + combined.extend(r2); + combined.extend(r3); + assert_eq!(combined, DATA, "combined chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_start_align_past_end_returns_empty() { + // The first aligned start lands at or past the end boundary. + // Data: "abcdefghij\nkl\n" (14 bytes) + // 0 10 11 13 + // Partition [3, 6): start=3, end=6, fetch_start=2 + // Bytes from offset 2: "cdefghij\nkl\n". First '\n' at offset 10; + // aligned start = 11, which is >= end = 6 → empty. + static DATA: &[u8] = b"abcdefghij\nkl\n"; + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + let s = AlignedBoundaryStream::new(store, path, 3, 6, 14, b'\n') + .await + .unwrap(); + assert!(collect_stream(s).await.is_empty(), "chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_unaligned_partitions_no_overlap() { + // Partitions that don't fall on line boundaries. + // Data: "aaa\nbbb\nccc\n" (12 bytes) + // 0 3 4 7 8 11 + // Partitions: [0, 5), [5, 10), [10, 12) + static DATA: &[u8] = b"aaa\nbbb\nccc\n"; // 12 bytes + + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + + // [0, 5): no start alignment; end=5 mid "bbb", scans to '\n' at 7. + let r1 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 0, + 5, + 12, + b'\n', + ) + .await + .unwrap(), + ) + .await; + + // [5, 10): fetch_start=4, bytes from offset 4: "bbb\nccc\n". + // '\n' at pos 3 → aligned start=8 ("ccc\n"). End=10 mid "ccc", + // scans to '\n' at 11 → yields "ccc\n". + let r2 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 5, + 10, + 12, + b'\n', + ) + .await + .unwrap(), + ) + .await; + + // [10, 12): fetch_start=9, bytes from offset 9: "cc\n". + // '\n' at pos 2 → aligned start=12. end=12==file_size → end=MAX. + // Remainder after '\n' is empty; Passthrough polls inner → Done. + let r3 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 10, + 12, + 12, + b'\n', + ) + .await + .unwrap(), + ) + .await; + + assert_eq!(r1, b"aaa\nbbb\n", "p1 chunk_size={cs}"); + assert_eq!(r2, b"ccc\n", "p2 chunk_size={cs}"); + assert!(r3.is_empty(), "p3 chunk_size={cs}"); + + let mut combined = r1; + combined.extend(r2); + combined.extend(r3); + assert_eq!(combined, DATA, "combined chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_no_trailing_newline() { + // Last partition of a file that does not end with a newline. + // end >= file_size → this.end = u64::MAX, so Passthrough streams + // straight to EOF and yields the final incomplete line as-is. + static DATA: &[u8] = b"line1\nline2"; // 11 bytes, no trailing '\n' + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(DATA, cs).await; + + // Single partition covering the whole file. + let s = AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 0, + 11, + 11, + b'\n', + ) + .await + .unwrap(); + assert_eq!(collect_stream(s).await, DATA, "chunk_size={cs}"); + + // Last partition starting mid-file (start=6, fetch_start=5). + // Bytes from offset 5: "\nline2". + // StartAlign consumes '\n', remainder "line2" is yielded as-is. + let s = AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 6, + 11, + 11, + b'\n', + ) + .await + .unwrap(); + assert_eq!(collect_stream(s).await, b"line2", "tail chunk_size={cs}"); + } + } + + #[tokio::test] + async fn test_overflow_fetch() { + // First line is longer than 2 * END_SCAN_LOOKAHEAD so the initial + // bounded fetch [fetch_start, raw_end + END_SCAN_LOOKAHEAD) does not + // reach its newline. ScanningLastTerminator must issue overflow GETs + // to find it. + // + // Partition [0, 1): raw_end=1, initial_fetch_end=1+16384=16385. + // The newline is at byte 32768 > 16385 → one overflow GET required. + // Partition [1, file_size): start=1 lands mid line-1; ScanningFirstTerminator + // skips to byte 32769, then yields "line2\nline3\n". + let long_line: Vec = + std::iter::repeat_n(b'A', 2 * END_SCAN_LOOKAHEAD as usize) + .chain(std::iter::once(b'\n')) + .collect(); + let rest = b"line2\nline3\n"; + let mut data = long_line.clone(); + data.extend_from_slice(rest); + let file_size = data.len() as u64; + + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(&data, cs).await; + + let r1 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 0, + 1, + file_size, + b'\n', + ) + .await + .unwrap(), + ) + .await; + + let r2 = collect_stream( + AlignedBoundaryStream::new( + Arc::clone(&store), + path.clone(), + 1, + file_size, + file_size, + b'\n', + ) + .await + .unwrap(), + ) + .await; + + assert_eq!(r1, long_line, "p1 chunk_size={cs}"); + assert_eq!(r2, rest.as_slice(), "p2 chunk_size={cs}"); + + let mut combined = r1; + combined.extend(r2); + assert_eq!(combined, data, "combined chunk_size={cs}"); + } + } +} diff --git a/datafusion/datasource-json/src/mod.rs b/datafusion/datasource-json/src/mod.rs index 7dc0a0c7ba0f9..f7932c8a21d95 100644 --- a/datafusion/datasource-json/src/mod.rs +++ b/datafusion/datasource-json/src/mod.rs @@ -20,8 +20,42 @@ // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] +pub mod boundary_stream; pub mod file_format; pub mod source; pub mod utils; pub use file_format::*; + +#[cfg(test)] +pub(crate) mod test_utils { + use std::sync::Arc; + + use bytes::Bytes; + use object_store::chunked::ChunkedStore; + use object_store::memory::InMemory; + use object_store::path::Path; + use object_store::{ObjectStore, ObjectStoreExt, PutPayload}; + + /// Chunk sizes exercised by every parameterised test. + /// + /// `usize::MAX` is intentionally included: `ChunkedStore` treats it as + /// "one chunk containing everything", giving the single-chunk fast path. + pub const CHUNK_SIZES: &[usize] = &[1, 2, 3, 4, 5, 7, 8, 11, 13, 16, usize::MAX]; + + /// Seed a fresh `InMemory` store with `data` and wrap it in a + /// [`ChunkedStore`] that splits every GET response into `chunk_size`-byte + /// pieces. + pub async fn make_chunked_store( + data: &[u8], + chunk_size: usize, + ) -> (Arc, Path) { + let inner = Arc::new(InMemory::new()); + let path = Path::from("test"); + inner + .put(&path, PutPayload::from(Bytes::copy_from_slice(data))) + .await + .unwrap(); + (Arc::new(ChunkedStore::new(inner, chunk_size)), path) + } +} diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 2f1d5abbee599..0d4fd6e76c4c0 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -18,7 +18,7 @@ //! Execution plan for reading JSON files (line-delimited and array formats) use std::any::Any; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::BufReader; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -26,6 +26,8 @@ use std::task::{Context, Poll}; use crate::file_format::JsonDecoder; use crate::utils::{ChannelReader, JsonArrayToNdjsonReader}; +use crate::boundary_stream::AlignedBoundaryStream; + use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -33,9 +35,7 @@ use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::{ - ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range, -}; +use datafusion_datasource::{ListingTableUrl, PartitionedFile, as_file_source}; use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -282,39 +282,51 @@ impl FileOpener for JsonOpener { } Ok(Box::pin(async move { - let calculated_range = - calculate_range(&partitioned_file, &store, None).await?; + let file_size = partitioned_file.object_meta.size; + let location = &partitioned_file.object_meta.location; + + if let Some(file_range) = partitioned_file.range.as_ref() { + let raw_start = file_range.start as u64; + let raw_end = file_range.end as u64; - let range = match calculated_range { - RangeCalculation::Range(None) => None, - RangeCalculation::Range(Some(range)) => Some(range.into()), - RangeCalculation::TerminateEarly => { + if raw_start >= raw_end || raw_start >= file_size { return Ok( futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() ); } - }; - let options = GetOptions { - range, - ..Default::default() - }; + let aligned_stream = AlignedBoundaryStream::new( + Arc::clone(&store), + location.clone(), + raw_start, + raw_end, + file_size, + b'\n', + ) + .await? + .map_err(DataFusionError::from); + + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = file_compression_type + .convert_stream(aligned_stream.boxed())? + .fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + return Ok(stream.map_err(Into::into).boxed()); + } - let result = store - .get_opts(&partitioned_file.object_meta.location, options) - .await?; + // No range specified — read the entire file + let options = GetOptions::default(); + let result = store.get_opts(location, options).await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut file, _) => { - let bytes = match partitioned_file.range { - None => file_compression_type.convert_read(file)?, - Some(_) => { - file.seek(SeekFrom::Start(result.range.start as _))?; - let limit = result.range.end - result.range.start; - file_compression_type.convert_read(file.take(limit))? - } - }; + GetResultPayload::File(file, _) => { + let bytes = file_compression_type.convert_read(file)?; if newline_delimited { // NDJSON: use BufReader directly @@ -520,7 +532,11 @@ pub async fn plan_to_json( #[cfg(test)] mod tests { use super::*; + use crate::test_utils::{CHUNK_SIZES, make_chunked_store}; + use arrow::array::{Int64Array, StringArray}; + use arrow::compute; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; use bytes::Bytes; use datafusion_datasource::FileRange; use futures::TryStreamExt; @@ -819,4 +835,233 @@ mod tests { // If we reach here without hanging, cancellation worked Ok(()) } + + fn get_partition_splits() -> Vec { + vec![1usize, 2, 3, 5, 7, 10] + } + + /// Opens each byte-range partition of `path` in `store` and collects all + /// record batches produced across every partition. + async fn collect_partitioned_batches( + store: Arc, + path: &Path, + file_size: u64, + num_partitions: usize, + ) -> Result> { + let mut all_batches = Vec::new(); + for p in 0..num_partitions { + let start = (p as u64 * file_size) / num_partitions as u64; + let end = ((p as u64 + 1) * file_size) / num_partitions as u64; + + let meta = store.head(path).await?; + let mut file = PartitionedFile::new(path.to_string(), meta.size); + file.range = Some(FileRange { + start: start as i64, + end: end as i64, + }); + + let opener = JsonOpener::new( + 1024, + test_schema(), + FileCompressionType::UNCOMPRESSED, + Arc::clone(&store), + true, + ); + + let stream = opener.open(file)?.await?; + let batches: Vec<_> = stream.try_collect().await?; + all_batches.extend(batches); + } + Ok(all_batches) + } + + /// Concatenates `batches` and returns a single batch sorted ascending by + /// the first (id) column. + fn concat_and_sort_by_id(batches: &[RecordBatch]) -> Result { + let schema = test_schema(); + let combined = compute::concat_batches(&schema, batches)?; + let indices = compute::sort_to_indices(combined.column(0), None, None)?; + let sorted_cols: Vec<_> = combined + .columns() + .iter() + .map(|col| compute::take(col.as_ref(), &indices, None)) + .collect::>()?; + Ok(RecordBatch::try_new(schema, sorted_cols)?) + } + + #[tokio::test] + async fn test_ndjson_partitioned() -> Result<()> { + // Build an NDJSON file with a known number of rows. + let num_rows: usize = 20; + let mut ndjson = String::new(); + for i in 0..num_rows { + ndjson.push_str(&format!("{{\"id\": {i}, \"name\": \"user{i}\"}}\n")); + } + let ndjson_bytes = Bytes::from(ndjson); + let file_size = ndjson_bytes.len() as u64; + + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(&ndjson_bytes, cs).await; + + for num_partitions in get_partition_splits() { + let batches = collect_partitioned_batches( + Arc::clone(&store), + &path, + file_size, + num_partitions, + ) + .await?; + + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total, num_rows, + "Expected {num_rows} rows with {num_partitions} partitions" + ); + + let result = concat_and_sort_by_id(&batches)?; + let ids = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let names = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..num_rows { + assert_eq!( + ids.value(i), + i as i64, + "id mismatch at row {i} with {num_partitions} partitions" + ); + assert_eq!( + names.value(i), + format!("user{i}"), + "name mismatch at row {i} with {num_partitions} partitions" + ); + } + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_partitioned_uneven_lines() -> Result<()> { + // Lines of deliberately varying lengths so byte-range boundaries are + // more likely to land in the middle of a line. + let rows: &[(&str, &str)] = &[ + ("1", "alice"), + ("2", "bob-with-a-longer-name"), + ("3", "charlie"), + ("4", "x"), + ("5", "diana-has-an-even-longer-name-here"), + ("6", "ed"), + ("7", "francesca"), + ("8", "g"), + ("9", "hector-the-magnificent"), + ("10", "isabella"), + ]; + let num_rows = rows.len(); + + let mut ndjson = String::new(); + for (id, name) in rows { + ndjson.push_str(&format!("{{\"id\": {id}, \"name\": \"{name}\"}}\n")); + } + let ndjson_bytes = Bytes::from(ndjson); + let file_size = ndjson_bytes.len() as u64; + + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(&ndjson_bytes, cs).await; + + for num_partitions in get_partition_splits() { + let batches = collect_partitioned_batches( + Arc::clone(&store), + &path, + file_size, + num_partitions, + ) + .await?; + + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total, num_rows, + "Expected {num_rows} rows with {num_partitions} partitions" + ); + + let result = concat_and_sort_by_id(&batches)?; + let ids = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let names = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for (i, (expected_id, expected_name)) in rows.iter().enumerate() { + assert_eq!( + ids.value(i), + expected_id.parse::().unwrap(), + "id mismatch at row {i} with {num_partitions} partitions" + ); + assert_eq!( + names.value(i), + *expected_name, + "name mismatch at row {i} with {num_partitions} partitions" + ); + } + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_partitioned_single_entry() -> Result<()> { + // A single JSON object with no trailing newline. No matter how many + // byte-range partitions the file is split into, exactly one row must + // be produced in total. + let ndjson = r#"{"id": 1, "name": "alice"}"#; + let ndjson_bytes = Bytes::from(ndjson); + let file_size = ndjson_bytes.len() as u64; + + for &cs in CHUNK_SIZES { + let (store, path) = make_chunked_store(&ndjson_bytes, cs).await; + + for num_partitions in get_partition_splits() { + let batches = collect_partitioned_batches( + Arc::clone(&store), + &path, + file_size, + num_partitions, + ) + .await?; + + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total, 1, + "Expected exactly 1 row with {num_partitions} partitions" + ); + + let result = concat_and_sort_by_id(&batches)?; + let ids = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let names = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), 1); + assert_eq!(names.value(0), "alice"); + } + } + + Ok(()) + } } From b58ffa2444e6bcb476d96258f07ed0cd94fce72e Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Thu, 19 Mar 2026 12:54:07 +0200 Subject: [PATCH 2/9] fix: remove custom logic for file vs stream handling --- Cargo.toml | 2 +- .../datasource-json/src/boundary_stream.rs | 33 +++---------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 38135b0e684e3..08d585d3ef906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,7 +185,7 @@ strum = "0.28.0" strum_macros = "0.28.0" tempfile = "3" testcontainers-modules = { version = "0.15" } -tokio = { version = "1.48", features = ["macros", "rt", "sync", "fs"] } +tokio = { version = "1.48", features = ["macros", "rt", "sync"] } tokio-stream = "0.1" tokio-util = "0.7" url = "2.5.7" diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs index ceeb17203ffa2..7adfe34e39f68 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -27,8 +27,8 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::stream::{BoxStream, Stream}; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; -use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; +use futures::{StreamExt, TryFutureExt}; +use object_store::{GetOptions, GetRange, ObjectStore}; /// How far past `raw_end` the initial bounded fetch covers. If the terminating /// newline is not found within this window, `ScanningLastTerminator` issues @@ -83,9 +83,7 @@ pub struct AlignedBoundaryStream { file_size: u64, } -/// Fetch a bounded byte range from `store` and return it as a stream, -/// handling both `File` and `Stream` payloads. For `File` payloads the -/// underlying file is seeked to `range.start` before streaming. +/// Fetch a bounded byte range from `store` and return it as a stream async fn get_stream( store: Arc, location: object_store::path::Path, @@ -96,29 +94,7 @@ async fn get_stream( ..Default::default() }; let result = store.get_opts(&location, opts).await?; - let stream = match result.payload { - #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(file, _) => { - use std::io::SeekFrom; - use tokio::io::AsyncSeekExt; - let mut tokio_file = tokio::fs::File::from_std(file); - tokio_file - .seek(SeekFrom::Start(range.start)) - .await - .map_err(|e| object_store::Error::Generic { - store: "local", - source: Box::new(e), - })?; - tokio_util::io::ReaderStream::new(tokio::io::BufReader::new(tokio_file)) - .map_err(|e| object_store::Error::Generic { - store: "local", - source: Box::new(e), - }) - .boxed() - } - GetResultPayload::Stream(s) => s.boxed(), - }; - Ok(stream) + Ok(result.into_stream()) } impl AlignedBoundaryStream { @@ -382,6 +358,7 @@ impl Stream for AlignedBoundaryStream { mod tests { use super::*; use crate::test_utils::{CHUNK_SIZES, make_chunked_store}; + use futures::TryStreamExt; async fn collect_stream(stream: AlignedBoundaryStream) -> Vec { let chunks: Vec = stream.try_collect().await.unwrap(); From 2f92f12f0c80b3137faa49731e77f879d8a3f4cb Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Thu, 19 Mar 2026 13:25:25 +0200 Subject: [PATCH 3/9] fix: remove unused dependency --- Cargo.lock | 1 - datafusion/datasource-json/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 852411bf23e41..8101e2fe22d92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2065,7 +2065,6 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tokio-util", ] [[package]] diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 95f1d218948ad..b5947ea5c4c67 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -46,7 +46,6 @@ futures = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } -tokio-util = { workspace = true, features = ["io"] } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet From abb6609c8bbf00265213650ade1141c85db8874d Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Thu, 19 Mar 2026 16:42:43 +0200 Subject: [PATCH 4/9] refactor: remove some redundant code --- .../datasource-json/src/boundary_stream.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs index 7adfe34e39f68..436bd112543f1 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -129,7 +129,12 @@ impl AlignedBoundaryStream { }); } - let fetch_start = if raw_start > 0 { raw_start - 1 } else { 0 }; + let (fetch_start, phase) = if raw_start == 0 { + (0, Phase::FetchingChunks) + } else { + (raw_start - 1, Phase::ScanningFirstTerminator) + }; + let initial_fetch_end = raw_end.saturating_add(END_SCAN_LOOKAHEAD).min(file_size); let inner = get_stream( @@ -139,12 +144,6 @@ impl AlignedBoundaryStream { ) .await?; - let (fetch_start, phase) = if raw_start == 0 { - (0, Phase::FetchingChunks) - } else { - (raw_start - 1, Phase::ScanningFirstTerminator) - }; - // Last partition reads to EOF — no end-boundary scanning needed. let end = if raw_end >= file_size { u64::MAX @@ -361,8 +360,7 @@ mod tests { use futures::TryStreamExt; async fn collect_stream(stream: AlignedBoundaryStream) -> Vec { - let chunks: Vec = stream.try_collect().await.unwrap(); - chunks.into_iter().flat_map(|b| b.to_vec()).collect() + stream.try_collect::>().await.unwrap().concat() } #[tokio::test] From ddf86199c7b6767367e4550b84b384c49f1292bb Mon Sep 17 00:00:00 2001 From: Ariel Miculas-Trif Date: Fri, 20 Mar 2026 11:29:23 +0200 Subject: [PATCH 5/9] fix: remove unneeded clone Co-authored-by: Martin Grigorov --- datafusion/datasource-json/src/boundary_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs index 436bd112543f1..417c9704053c1 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -90,7 +90,7 @@ async fn get_stream( range: std::ops::Range, ) -> object_store::Result>> { let opts = GetOptions { - range: Some(GetRange::Bounded(range.clone())), + range: Some(GetRange::Bounded(range)), ..Default::default() }; let result = store.get_opts(&location, opts).await?; From 422988fc3cc2f7c2e114c39b207d0d8ea241de54 Mon Sep 17 00:00:00 2001 From: Ariel Miculas-Trif Date: Fri, 20 Mar 2026 11:30:02 +0200 Subject: [PATCH 6/9] fix: range checks for raw_start and raw_end Co-authored-by: Martin Grigorov --- datafusion/datasource-json/src/source.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 0d4fd6e76c4c0..d05c6cf97f54a 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -286,8 +286,12 @@ impl FileOpener for JsonOpener { let location = &partitioned_file.object_meta.location; if let Some(file_range) = partitioned_file.range.as_ref() { - let raw_start = file_range.start as u64; - let raw_end = file_range.end as u64; + let raw_start: u64 = file_range.start.try_into().map_err(|_| { + exec_datafusion_err!("Expected start range to fit in u64, got {}", file_range.start) + })?; + let raw_end: u64 = file_range.end.try_into().map_err(|_| { + exec_datafusion_err!("Expected end range to fit in u64, got {}", file_range.end) + })?; if raw_start >= raw_end || raw_start >= file_size { return Ok( From 88fa48ad323d1393d409cb4e0f83e8758f7fc952 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Fri, 20 Mar 2026 11:35:21 +0200 Subject: [PATCH 7/9] fix: formatting and missing import --- datafusion/datasource-json/src/source.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index d05c6cf97f54a..240697b5f8ac2 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -29,6 +29,7 @@ use crate::utils::{ChannelReader, JsonArrayToNdjsonReader}; use crate::boundary_stream::AlignedBoundaryStream; use datafusion_common::error::{DataFusionError, Result}; +use datafusion_common::exec_datafusion_err; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; @@ -287,11 +288,17 @@ impl FileOpener for JsonOpener { if let Some(file_range) = partitioned_file.range.as_ref() { let raw_start: u64 = file_range.start.try_into().map_err(|_| { - exec_datafusion_err!("Expected start range to fit in u64, got {}", file_range.start) - })?; + exec_datafusion_err!( + "Expected start range to fit in u64, got {}", + file_range.start + ) + })?; let raw_end: u64 = file_range.end.try_into().map_err(|_| { - exec_datafusion_err!("Expected end range to fit in u64, got {}", file_range.end) - })?; + exec_datafusion_err!( + "Expected end range to fit in u64, got {}", + file_range.end + ) + })?; if raw_start >= raw_end || raw_start >= file_size { return Ok( From abc003d0395a259d261a64dd9dbfab4fb1d481b0 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Fri, 20 Mar 2026 11:38:57 +0200 Subject: [PATCH 8/9] refactor: consolidate boundary checks inside AlignedBoundaryStream --- datafusion/datasource-json/src/boundary_stream.rs | 2 +- datafusion/datasource-json/src/source.rs | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs index 417c9704053c1..6fc14adde30e6 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -114,7 +114,7 @@ impl AlignedBoundaryStream { file_size: u64, terminator: u8, ) -> object_store::Result { - if raw_start >= raw_end { + if raw_start >= raw_end || raw_start >= file_size { return Ok(Self { inner: futures::stream::empty().boxed(), terminator, diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 240697b5f8ac2..5724028769160 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -300,12 +300,6 @@ impl FileOpener for JsonOpener { ) })?; - if raw_start >= raw_end || raw_start >= file_size { - return Ok( - futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() - ); - } - let aligned_stream = AlignedBoundaryStream::new( Arc::clone(&store), location.clone(), From 7871e5437a8e455ea9120dc393be35f6b0d0c285 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Fri, 20 Mar 2026 16:22:48 +0200 Subject: [PATCH 9/9] fix: minor grammar fixes in comments --- datafusion/datasource-json/src/boundary_stream.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs index 6fc14adde30e6..fc40feda6b80f 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -66,7 +66,7 @@ pub struct AlignedBoundaryStream { terminator: u8, /// Effective end boundary. Set to `u64::MAX` when `end >= file_size` /// (last partition), so `FetchingChunks` never transitions to - /// `ScanningLastTerminator` and simply streams to EOF. + /// `ScanningLastTerminator` and simply streams until EOF is reached. end: u64, /// Cumulative bytes consumed from `inner` (relative to `fetch_start`). bytes_consumed: u64, @@ -144,7 +144,7 @@ impl AlignedBoundaryStream { ) .await?; - // Last partition reads to EOF — no end-boundary scanning needed. + // Last partition reads until EOF is reached — no end-boundary scanning needed. let end = if raw_end >= file_size { u64::MAX } else { @@ -248,7 +248,7 @@ impl Stream for AlignedBoundaryStream { let pos_after = this.abs_pos(); // When end == u64::MAX (last partition), this is always - // true and we stream straight through to EOF. + // true and we stream straight through until EOF is reached. if pos_after < this.end { return Poll::Ready(Some(Ok(chunk))); } @@ -744,8 +744,8 @@ mod tests { #[tokio::test] async fn test_no_trailing_newline() { // Last partition of a file that does not end with a newline. - // end >= file_size → this.end = u64::MAX, so Passthrough streams - // straight to EOF and yields the final incomplete line as-is. + // end >= file_size → this.end = u64::MAX, so Passthrough streams straight + // until EOF is reached and yields the final incomplete line as-is. static DATA: &[u8] = b"line1\nline2"; // 11 bytes, no trailing '\n' for &cs in CHUNK_SIZES { let (store, path) = make_chunked_store(DATA, cs).await;