Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,23 @@ fn try_create_array_map(

let range = ArrayMap::calculate_range(min_val, max_val);
let num_row: usize = batches.iter().map(|x| x.num_rows()).sum();
let dense_ratio = (num_row as f64) / ((range + 1) as f64);

// TODO: support create ArrayMap<u64>
if num_row >= u32::MAX as usize {
return Ok(None);
}

if range >= perfect_hash_join_small_build_threshold as u64
&& dense_ratio <= perfect_hash_join_min_key_density
{
// When the key range spans the full integer domain (e.g. i64::MIN to i64::MAX),
// range is u64::MAX and `range + 1` below would overflow.
if range == usize::MAX as u64 {
return Ok(None);
}

// If range equals usize::MAX, then range + 1 would overflow to 0, which would cause
// ArrayMap to allocate an invalid zero-sized array or cause indexing issues.
// This check prevents such overflow and ensures valid array allocation.
if range == usize::MAX as u64 {
let dense_ratio = (num_row as f64) / ((range + 1) as f64);

if range >= perfect_hash_join_small_build_threshold as u64
&& dense_ratio <= perfect_hash_join_min_key_density
{
return Ok(None);
}

Expand Down Expand Up @@ -2142,7 +2142,9 @@ mod tests {
test::exec::MockExec,
};

use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
use arrow::array::{
Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array,
};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::Schema;
Expand Down Expand Up @@ -5559,6 +5561,38 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> {
let task_ctx = prepare_task_ctx(8192, true);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))],
)?;
let left = TestMemoryExec::try_new_exec(
&[vec![batch.clone()]],
Arc::clone(&schema),
None,
)?;
let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?;
let on: JoinOn = vec![(
Arc::new(Column::new_with_schema("a", &left.schema())?) as _,
Arc::new(Column::new_with_schema("a", &right.schema())?) as _,
)];
let (_columns, batches, _metrics) = join_collect(
left,
right,
on,
&JoinType::Inner,
NullEquality::NullEqualsNothing,
task_ctx,
)
.await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
Ok(())
}

#[apply(hash_join_exec_configs)]
#[tokio::test]
async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(
Expand Down
Loading