Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use super::SendableRecordBatchStream;
use crate::stream::RecordBatchReceiverStream;
use crate::{ColumnStatistics, Statistics};

use arrow::array::Array;
use arrow::array::{Array, StringViewArray};
use arrow::datatypes::Schema;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, plan_err};
Expand Down Expand Up @@ -190,7 +191,7 @@ pub fn can_project(
.max()
.is_some_and(|&i| i >= schema.fields().len())
{
Err(arrow::error::ArrowError::SchemaError(format!(
Err(ArrowError::SchemaError(format!(
"project index {} out of bounds, max field {}",
columns.iter().max().unwrap(),
schema.fields().len()
Expand All @@ -204,6 +205,31 @@ pub fn can_project(
}
}

/// Return a new `RecordBatch` with [`StringViewArray::gc`] called on such columns (if any).
pub(crate) fn gc_stringview_arrays(
batch: RecordBatch,
) -> Result<RecordBatch, ArrowError> {
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) = array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)
} else {
Ok(batch)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
67 changes: 63 additions & 4 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use crate::coalesce::LimitedBatchCoalescer;
use crate::common::gc_stringview_arrays;
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::hash_utils::create_hashes;
use crate::metrics::{BaselineMetrics, SpillMetrics};
Expand All @@ -44,7 +45,7 @@ use crate::{
check_if_same_properties,
};

use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -617,8 +618,14 @@ impl BatchPartitioner {
batch.schema(),
columns,
&options,
)
.unwrap();
)?;

// When `StringViewArray`s are present, the `take_arrays` call above
// re-uses data buffers from the original array. This causes the memory
// pool to count the same data buffers multiple times, once for each
// consumer of the repartition.
// So we gc the output arrays, which creates new data buffers.
let batch = gc_stringview_arrays(batch)?;

partitioned_batches.push(Ok((partition, batch)));

Expand Down Expand Up @@ -1770,7 +1777,7 @@ mod tests {
{collect, expressions::col},
};

use arrow::array::{ArrayRef, StringArray, UInt32Array};
use arrow::array::{ArrayRef, StringArray, StringViewArray, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::exec_err;
Expand Down Expand Up @@ -2544,6 +2551,33 @@ mod tests {
.collect()
}

fn test_schema_string_view() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("key", DataType::UInt32, false),
Field::new("val", DataType::Utf8View, false),
]))
}

/// Create a batch with StringViewArray data for compaction tests.
/// Strings are >12 bytes to force out-of-line storage in the buffer
/// (strings <=12 bytes are inlined in the view and don't reference the buffer).
fn create_string_view_batch(num_rows: usize, num_partitions: usize) -> RecordBatch {
let schema = test_schema_string_view();
let keys: Vec<u32> = (0..num_rows).map(|i| (i % num_partitions) as u32).collect();
let vals: Vec<String> = (0..num_rows)
.map(|i| format!("string_value_{i:0>20}"))
.collect();
let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(UInt32Array::from(keys)) as ArrayRef,
Arc::new(StringViewArray::from(val_refs)) as ArrayRef,
],
)
.unwrap()
}

#[tokio::test]
async fn test_repartition_ordering_with_spilling() -> Result<()> {
// Test that repartition preserves ordering when spilling occurs
Expand Down Expand Up @@ -2617,6 +2651,31 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn hash_repartition_string_view_compaction() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not actually exercise the regression it is meant to cover.
It only checks that repartition returns all rows. That would also pass before the gc() change.

As a result, we still do not have a test that would catch the over-counting bug if this logic regresses.

Please add an assertion that observes the compaction or accounting behavior directly. For example:

  • Check that the total get_array_memory_size() across the repartitioned outputs stays close to the original batch, instead of scaling with the number of output partitions.
  • Test spill behavior under a tight memory limit (e.g., spilled bytes).
  • Verify StringViewArray buffer ownership after repartition, so outputs no longer all retain the original shared payload buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did try to add a test that checks for memory size specifically, but it seemed a bit fragile to assert on those numbers. Let me try the other approaches, thanks!

let schema = test_schema_string_view();
let num_partitions = 8;
let batch = create_string_view_batch(800, num_partitions);
let partitions = vec![vec![batch]];

let output_partitions = repartition(
&schema,
partitions,
Partitioning::Hash(vec![col("key", &schema)?], num_partitions),
)
.await?;

assert_eq!(num_partitions, output_partitions.len());

let total_rows: usize = output_partitions
.iter()
.map(|x| x.iter().map(|x| x.num_rows()).sum::<usize>())
.sum();
assert_eq!(total_rows, 800);

Ok(())
}
}

#[cfg(test)]
Expand Down
33 changes: 5 additions & 28 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;

use parking_lot::RwLock;

use crate::common::spawn_buffered;
use crate::common::{gc_stringview_arrays, spawn_buffered};
use crate::execution_plan::{
Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
};
Expand Down Expand Up @@ -483,33 +483,10 @@ impl ExternalSorter {
fn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());

for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

let organized_batch = if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)?
} else {
batch
};

organized_batches.push(organized_batch);
}
let organized_batches = globally_sorted_batches
.drain(..)
.map(gc_stringview_arrays)
.collect::<Result<_, _>>()?;

*globally_sorted_batches = organized_batches;

Expand Down
Loading