Skip to content

[GLUTEN-11605][VL] Write per-block column statistics in shuffle writer#11769

Open
acvictor wants to merge 2 commits intoapache:mainfrom
acvictor:acvictor/writerChanges
Open

[GLUTEN-11605][VL] Write per-block column statistics in shuffle writer#11769
acvictor wants to merge 2 commits intoapache:mainfrom
acvictor:acvictor/writerChanges

Conversation

@acvictor
Copy link
Contributor

@acvictor acvictor commented Mar 16, 2026

What changes are proposed in this pull request?

This PR adds per-block column statistics (min/max/hasNull) to the shuffle writer pipeline as a prerequisite for block-level pruning using dynamic filters at the shuffle reader. When spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled is true, the shuffle writer computes per-column min/max statistics from raw Arrow buffers during evictBuffers() and serializes them as a kStatisticsPayload block before each non-dictionary payload in the output file. This mirrors how parquet row group statistics enable predicate pushdown.

How was this patch tested?

Added new tests and also ran the CI with config set to true.

Was this patch authored or co-authored using generative AI tooling?

No

Related issue: #11605

@github-actions github-actions bot added the VELOX label Mar 16, 2026
@acvictor acvictor force-pushed the acvictor/writerChanges branch 6 times, most recently from cb073fd to 19b8d5a Compare March 16, 2026 11:46
@acvictor acvictor force-pushed the acvictor/writerChanges branch from 19b8d5a to 50e0444 Compare March 16, 2026 13:51
@acvictor acvictor marked this pull request as ready for review March 17, 2026 13:57
@acvictor
Copy link
Contributor Author

@marin-ma @zhztheplayer this is ready for review. I will push one more commit to again disable by default.

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acvictor Thanks for adding this feature! Please check my comments below.

As adding the statistics can grow the shuffle data size, have you tested the overall growth of shuffled data size for tpch/tpcds benchmarks?

}
// Check each bit — return early on first null found.
for (uint32_t i = 0; i < numRows; ++i) {
if (!arrow::bit_util::GetBit(validityBuffer->data(), i)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps check each bytes by comparing with 0xff can be faster

// Reuse the dynamic filter config to also enable block statistics collection,
// since stats are only useful when dynamic filter pushdown is active.
const auto& confMap = ctx->getConfMap();
auto it = confMap.find("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other option values are passing through function args. Can you add a new arg enableBlockStatistics?

mergedStats.merge(*append->blockStats_);
result->setBlockStats(std::move(mergedStats));
} else if (source->hasBlockStats()) {
result->setBlockStats(*source->blockStats_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only source or append has blockStats, should we either discard it or compute for the missing side and merge them?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants