fix: dynamically adjust NLJ output batch size and optimize NLJ with single build vector#402
Open
zhangxffff wants to merge 5 commits intobytedance:mainfrom
Open
fix: dynamically adjust NLJ output batch size and optimize NLJ with single build vector#402zhangxffff wants to merge 5 commits intobytedance:mainfrom
zhangxffff wants to merge 5 commits intobytedance:mainfrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
This issue occurs in a Spark instance when the NLJ (Nested Loop Join) build side contains only one row with an
array<string>column, the probe side also contains anarray<string>column, both columns have large data sizes, and a join filter is present.In the latest NLJ code, the probe dictionary vector is allocated with a fixed size (32768 in Spark on Bolt) and filled with zeros. When the average row size on the probe side is large, the flat size is computed based on the probe dictionary's size, resulting in an excessively large flat size. Furthermore, during
exportToArrow, the inner probe vector is converted to an Arrow array with its full length, which causes OOM.Additionally, when a filter is present, the join probe selects one row at a time to perform a cross product with the build vector. It constructs a single-row vector, applies the filter on it, and then copies this single-row vector into the final output vector. This row-by-row approach significantly degrades performance.
Issue Number: close #401
Type of Change
Description
This PR addresses the OOM and performance issues in NLJ when handling wide probe/build rows with a filter.
Batch Size Estimator
During
prepareOutput, the probe creates a dictionary vector based onoutputBatchSize_. Before vector creation, we now updateoutputBatchSize_using the average row size (sum of probe row size and build row size). This keeps the output vector's memory consumption under control and avoids OOM in downstream operators.Single Build Row and Single Build Vector Optimization for NLJ with Filter
In the current NLJ implementation, when a filter is present,
getNextCrossProductBatchreturns (1 probe row) × (build vector) for subsequent filter processing. However, when the build vector is small (or even a single row), this results in returning a single-row vector for filtering, causing a performance issue.To address this, we now return (batchSize / buildSize probe rows) × (build vector) from
getNextCrossProductBatch. In the subsequent evaluation loop, we replace the single probe row loop with one that supports iterating over a probe × build dictionary. Since there is only one probe vector and one build vector, we can also avoid the copy inCopyBuildValuesand simply return a dictionary vector instead.Performance Impact
No Impact: This change does not affect the critical path (e.g., build system, doc, error handling).
Positive Impact: I have run benchmarks.
Click to view Benchmark Results
Negative Impact: Explained below (e.g., trade-off for correctness).
Release Note
Please describe the changes in this PR
Release Note:
Checklist (For Author)
Breaking Changes
No
Yes (Description: ...)
Click to view Breaking Changes