diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 590f6f09e8b9..89ee55993b8b 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -25,8 +25,9 @@ use super::SendableRecordBatchStream; use crate::stream::RecordBatchReceiverStream; use crate::{ColumnStatistics, Statistics}; -use arrow::array::Array; +use arrow::array::{Array, StringViewArray}; use arrow::datatypes::Schema; +use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; use datafusion_common::{Result, plan_err}; @@ -190,7 +191,7 @@ pub fn can_project( .max() .is_some_and(|&i| i >= schema.fields().len()) { - Err(arrow::error::ArrowError::SchemaError(format!( + Err(ArrowError::SchemaError(format!( "project index {} out of bounds, max field {}", columns.iter().max().unwrap(), schema.fields().len() @@ -204,6 +205,31 @@ pub fn can_project( } } +/// Return a new `RecordBatch` with [`StringViewArray::gc`] called on such columns (if any). +pub(crate) fn gc_stringview_arrays( + batch: RecordBatch, +) -> Result { + let mut new_columns: Vec> = Vec::with_capacity(batch.num_columns()); + + let mut arr_mutated = false; + for array in batch.columns() { + if let Some(string_view_array) = array.as_any().downcast_ref::() + { + let new_array = string_view_array.gc(); + new_columns.push(Arc::new(new_array)); + arr_mutated = true; + } else { + new_columns.push(Arc::clone(array)); + } + } + + if arr_mutated { + RecordBatch::try_new(batch.schema(), new_columns) + } else { + Ok(batch) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 342b2f50357c..efd55ece29a0 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -31,6 +31,7 @@ use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::coalesce::LimitedBatchCoalescer; +use crate::common::gc_stringview_arrays; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; @@ -44,7 +45,7 @@ use crate::{ check_if_same_properties, }; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; +use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, StringViewArray}; use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; @@ -617,8 +618,14 @@ impl BatchPartitioner { batch.schema(), columns, &options, - ) - .unwrap(); + )?; + + // When `StringViewArray`s are present, the `take_arrays` call above + // re-uses data buffers from the original array. This causes the memory + // pool to count the same data buffers multiple times, once for each + // consumer of the repartition. + // So we gc the output arrays, which creates new data buffers. + let batch = gc_stringview_arrays(batch)?; partitioned_batches.push(Ok((partition, batch))); @@ -1770,7 +1777,7 @@ mod tests { {collect, expressions::col}, }; - use arrow::array::{ArrayRef, StringArray, UInt32Array}; + use arrow::array::{ArrayRef, StringArray, StringViewArray, UInt32Array}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_string_array; use datafusion_common::exec_err; @@ -2544,6 +2551,33 @@ mod tests { .collect() } + fn test_schema_string_view() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("key", DataType::UInt32, false), + Field::new("val", DataType::Utf8View, false), + ])) + } + + /// Create a batch with StringViewArray data for compaction tests. + /// Strings are >12 bytes to force out-of-line storage in the buffer + /// (strings <=12 bytes are inlined in the view and don't reference the buffer). + fn create_string_view_batch(num_rows: usize, num_partitions: usize) -> RecordBatch { + let schema = test_schema_string_view(); + let keys: Vec = (0..num_rows).map(|i| (i % num_partitions) as u32).collect(); + let vals: Vec = (0..num_rows) + .map(|i| format!("string_value_{i:0>20}")) + .collect(); + let val_refs: Vec<&str> = vals.iter().map(|s| s.as_str()).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt32Array::from(keys)) as ArrayRef, + Arc::new(StringViewArray::from(val_refs)) as ArrayRef, + ], + ) + .unwrap() + } + #[tokio::test] async fn test_repartition_ordering_with_spilling() -> Result<()> { // Test that repartition preserves ordering when spilling occurs @@ -2617,6 +2651,31 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn hash_repartition_string_view_compaction() -> Result<()> { + let schema = test_schema_string_view(); + let num_partitions = 8; + let batch = create_string_view_batch(800, num_partitions); + let partitions = vec![vec![batch]]; + + let output_partitions = repartition( + &schema, + partitions, + Partitioning::Hash(vec![col("key", &schema)?], num_partitions), + ) + .await?; + + assert_eq!(num_partitions, output_partitions.len()); + + let total_rows: usize = output_partitions + .iter() + .map(|x| x.iter().map(|x| x.num_rows()).sum::()) + .sum(); + assert_eq!(total_rows, 800); + + Ok(()) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2c5c82e723f4..cd2f9f6c9546 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use parking_lot::RwLock; -use crate::common::spawn_buffered; +use crate::common::{gc_stringview_arrays, spawn_buffered}; use crate::execution_plan::{ Boundedness, CardinalityEffect, EmissionType, has_same_children_properties, }; @@ -483,33 +483,10 @@ impl ExternalSorter { fn organize_stringview_arrays( globally_sorted_batches: &mut Vec, ) -> Result<()> { - let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len()); - - for batch in globally_sorted_batches.drain(..) { - let mut new_columns: Vec> = - Vec::with_capacity(batch.num_columns()); - - let mut arr_mutated = false; - for array in batch.columns() { - if let Some(string_view_array) = - array.as_any().downcast_ref::() - { - let new_array = string_view_array.gc(); - new_columns.push(Arc::new(new_array)); - arr_mutated = true; - } else { - new_columns.push(Arc::clone(array)); - } - } - - let organized_batch = if arr_mutated { - RecordBatch::try_new(batch.schema(), new_columns)? - } else { - batch - }; - - organized_batches.push(organized_batch); - } + let organized_batches = globally_sorted_batches + .drain(..) + .map(gc_stringview_arrays) + .collect::>()?; *globally_sorted_batches = organized_batches;