Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use std::{

use arrow::{
array::{
Array, ArrayAccessor, ArrayRef, BinaryArray, BinaryBuilder, StructArray, as_struct_array,
make_array,
Array, ArrayAccessor, ArrayData, ArrayRef, BinaryArray, BinaryBuilder,
BooleanBufferBuilder, StructArray, as_struct_array, make_array,
},
buffer::Buffer,
datatypes::{DataType, Field, Schema, SchemaRef},
ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi, from_ffi_and_data_type},
record_batch::{RecordBatch, RecordBatchOptions},
Expand Down Expand Up @@ -403,12 +404,54 @@ impl AccColumn for AccUDAFBufferRowsColumn {

fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
assert_eq!(self.num_records(), 0, "expect empty AccColumn");
let num_rows = arrays[0].len();
let ffi_imported_rows = FFI_ArrowArray::new(&arrays[0].to_data());
let array = &arrays[0];
let binary_array = downcast_any!(array, BinaryArray)?;

Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks both binary_array.offset() == 0 and binary_array.value_offsets()[0] == 0, but these checks serve different purposes. The offset() check verifies the array itself is not sliced at the array level, while value_offsets()[0] == 0 checks if the values buffer starts at offset 0. Consider adding a comment explaining this dual check to clarify why both conditions are necessary for determining if the array is "clean".

Suggested change
// "Clean" means neither the logical array nor its values buffer are sliced:
// - offset() == 0 ensures the array itself has no logical offset (not sliced at array level)
// - value_offsets()[0] == 0 ensures the values buffer also starts at 0 (not sliced in the buffer)

Copilot uses AI. Check for mistakes.
let is_clean = binary_array.offset() == 0 && binary_array.value_offsets()[0] == 0;
let array_to_export = if is_clean {
array.clone()
} else {
let len = binary_array.len();
let offsets = binary_array.value_offsets();
let start_offset_val = offsets[0];
let end_offset_val = offsets[len];
let value_slice_len = (end_offset_val - start_offset_val) as usize;

let new_offsets: Vec<i32> = offsets.iter().map(|&x| x - start_offset_val).collect();
let new_offsets_buf = Buffer::from_vec(new_offsets);

let array_data = binary_array.to_data();
let buffers = array_data.buffers();
let new_values_buf =
buffers[1].slice_with_length(start_offset_val as usize, value_slice_len);

let new_null_buf = if binary_array.null_count() == 0 {
None
} else {
let mut buffer_builder = BooleanBufferBuilder::new(len);
for i in 0..len {
buffer_builder.append(binary_array.is_valid(i));
}
Some(buffer_builder.finish().into_inner())
};
Comment on lines +428 to +436
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null buffer reconstruction iterates through all elements calling is_valid, which can be inefficient for large arrays. Consider using the existing null buffer from array_data.nulls() and slicing it appropriately, similar to how the value buffer is sliced. If slicing the null buffer directly is not straightforward due to bit alignment, the current approach is acceptable but could be optimized.

Suggested change
let new_null_buf = if binary_array.null_count() == 0 {
None
} else {
let mut buffer_builder = BooleanBufferBuilder::new(len);
for i in 0..len {
buffer_builder.append(binary_array.is_valid(i));
}
Some(buffer_builder.finish().into_inner())
};
let new_null_buf = array_data.nulls().cloned();

Copilot uses AI. Check for mistakes.

let mut data_builder = ArrayData::builder(DataType::Binary)
.len(len)
.add_buffer(new_offsets_buf)
.add_buffer(new_values_buf);

if let Some(buf) = new_null_buf {
data_builder = data_builder.null_bit_buffer(Some(buf));
}

let data = unsafe { data_builder.build_unchecked() };
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using build_unchecked bypasses ArrayData validation, which could lead to undefined behavior if the constructed data is invalid. Consider using the safe builder API pattern instead. You can replace unsafe { data_builder.build_unchecked() } with data_builder.build()? to ensure the ArrayData is valid before proceeding.

Suggested change
let data = unsafe { data_builder.build_unchecked() };
let data = data_builder.build()?;

Copilot uses AI. Check for mistakes.
make_array(data)
};

let ffi_imported_rows = FFI_ArrowArray::new(&array_to_export.to_data());
let rows = jni_call!(SparkUDAFWrapperContext(self.jcontext.as_obj())
.importRows(&ffi_imported_rows as *const FFI_ArrowArray as i64) -> JObject)?;
self.obj = jni_new_global_ref!(rows.as_obj())?;
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion that validated the number of imported rows matches the expected count has been removed. Consider adding back a validation check after the importRows call to ensure the Java side processed the correct number of rows. You can use array.len() (or binary_array.len() since it's still in scope) to validate against self.num_records() after line 454.

Suggested change
self.obj = jni_new_global_ref!(rows.as_obj())?;
self.obj = jni_new_global_ref!(rows.as_obj())?;
let expected_rows = array_to_export.len();
let imported_rows = self.num_records();
assert_eq!(
imported_rows,
expected_rows,
"SparkUDAFBufferRowsColumn::unfreeze_from_arrays: Java imported {imported_rows} rows, expected {expected_rows}"
);

Copilot uses AI. Check for mistakes.
assert_eq!(self.num_records(), num_rows, "unfreeze rows count mismatch");
Ok(())
}

Expand Down
Loading