Conversation
|
a bit ironic |
This is an alternative approach to apache#19687 Instead of reading the entire range in the json FileOpener, implement an AlignedBoundaryStream which scans the range for newlines as the FileStream requests data from the stream, by wrapping the original stream returned by the ObjectStore. This eliminated the overhead of the extra two get_opts requests needed by calculate_range and more importantly, it allows for efficient read-ahead implementations by the underlying ObjectStore. Previously this was inefficient because the streams opened by calculate_range included a stream from (start - 1) to file_size and another one from (end - 1) to end_of_file, just to find the two relevant newlines.
e3b5355 to
f5b3811
Compare
|
Thanks for this PR @ariel-miculas Do you have any benchmark results for this change? Even some example queries @Weijun-H do you know of any benchmarks to run? |
|
No, I'm having troubles coming up with a realistic benchmark. The previous benchmark https://github.com/apache/datafusion/pull/19687/changes#diff-5358b38b6265d769b66b614f7ba88ed9320f7a9fce5197330b7c01c2a8a3ed3b incorrectly assumes that all the requested bytes (via get_opts) will be read, while you can actually request a 10GiB stream of bytes and read only 16KiB from it. As a result, the benchmark of the previous PR for reducing the read amplification shows impressive improvements, but it hides the fact that it breaks the parallelization between data fetching and json decoding (by doing all the data fetching in the JsonOpener instead of allowing FileStream to do its magic). So I'm not sure how to write a benchmark that can prove at the same time that:
|
| terminator: u8, | ||
| /// Effective end boundary. Set to `u64::MAX` when `end >= file_size` | ||
| /// (last partition), so `FetchingChunks` never transitions to | ||
| /// `ScanningLastTerminator` and simply streams to EOF. |
There was a problem hiding this comment.
... streams to EOF is not clear to me. What do you mean ?
There was a problem hiding this comment.
It means we passthrough all the chunks to the json decoder (the caller which polls AlignedBoundaryStream), staying in the FetchingChunks phase until we consume the entire inner stream; this only happens when raw_end >= file_size, i.e. for the last file range in a file, in which case there's nothing else to scan past raw_end for a terminator (nor is there any need to do so). So we consume only the initial stream, but since that one includes the end of the file, we passthrough all the remaining chunks until end of file (EOF) is reached.
There was a problem hiding this comment.
Thanks!
How about changing it to this ?
... and simply streams until EOF is reached
I am not native English speaker and "verb to EOF" does not sound correct to me.
There was a problem hiding this comment.
you're right, until is the right word here
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
| terminator: u8, | ||
| /// Effective end boundary. Set to `u64::MAX` when `end >= file_size` | ||
| /// (last partition), so `FetchingChunks` never transitions to | ||
| /// `ScanningLastTerminator` and simply streams to EOF. |
There was a problem hiding this comment.
Thanks!
How about changing it to this ?
... and simply streams until EOF is reached
I am not native English speaker and "verb to EOF" does not sound correct to me.
| ) | ||
| .await?; | ||
|
|
||
| // Last partition reads to EOF — no end-boundary scanning needed. |
There was a problem hiding this comment.
// Last partition reads until EOF is reached — no end-boundary scanning needed.
| let pos_after = this.abs_pos(); | ||
|
|
||
| // When end == u64::MAX (last partition), this is always | ||
| // true and we stream straight through to EOF. |
There was a problem hiding this comment.
// true and we stream straight through until EOF is reached.
| async fn test_no_trailing_newline() { | ||
| // Last partition of a file that does not end with a newline. | ||
| // end >= file_size → this.end = u64::MAX, so Passthrough streams | ||
| // straight to EOF and yields the final incomplete line as-is. |
There was a problem hiding this comment.
// straight until EOF is reached and yields the final incomplete line as-is.
|
thanks for reviewing, @martin-g ! |
Which issue does this PR close?
Rationale for this change
This is an alternative approach to
Instead of reading the entire range in the json FileOpener, implement an
AlignedBoundaryStream which scans the range for newlines as the FileStream
requests data from the stream, by wrapping the original stream returned by the
ObjectStore.
This eliminated the overhead of the extra two get_opts requests needed by
calculate_range and more importantly, it allows for efficient read-ahead
implementations by the underlying ObjectStore. Previously this was inefficient
because the streams opened by calculate_range included a stream from
(start - 1)to file_size and another one from(end - 1)to end_of_file, just tofind the two relevant newlines.
What changes are included in this PR?
Added the AlignedBoundaryStream which wraps a stream returned by the object
store and finds the delimiting newlines for a particular file range. Notably it doesn't
do any standalone reads (unlike the calculate_range function), eliminating two calls
to get_opts.
Are these changes tested?
Yes, added unit tests.
Are there any user-facing changes?
No