diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a03cc36958fc..038eb96b7b45 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -155,23 +155,23 @@ fn try_create_array_map( let range = ArrayMap::calculate_range(min_val, max_val); let num_row: usize = batches.iter().map(|x| x.num_rows()).sum(); - let dense_ratio = (num_row as f64) / ((range + 1) as f64); // TODO: support create ArrayMap if num_row >= u32::MAX as usize { return Ok(None); } - if range >= perfect_hash_join_small_build_threshold as u64 - && dense_ratio <= perfect_hash_join_min_key_density - { + // When the key range spans the full integer domain (e.g. i64::MIN to i64::MAX), + // range is u64::MAX and `range + 1` below would overflow. + if range == usize::MAX as u64 { return Ok(None); } - // If range equals usize::MAX, then range + 1 would overflow to 0, which would cause - // ArrayMap to allocate an invalid zero-sized array or cause indexing issues. - // This check prevents such overflow and ensures valid array allocation. - if range == usize::MAX as u64 { + let dense_ratio = (num_row as f64) / ((range + 1) as f64); + + if range >= perfect_hash_join_small_build_threshold as u64 + && dense_ratio <= perfect_hash_join_min_key_density + { return Ok(None); } @@ -2142,7 +2142,9 @@ mod tests { test::exec::MockExec, }; - use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array}; + use arrow::array::{ + Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array, + }; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field}; use arrow_schema::Schema; @@ -5559,6 +5561,38 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_perfect_hash_join_overflow_full_int64_range() -> Result<()> { + let task_ctx = prepare_task_ctx(8192, true); + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![i64::MIN, i64::MAX]))], + )?; + let left = TestMemoryExec::try_new_exec( + &[vec![batch.clone()]], + Arc::clone(&schema), + None, + )?; + let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?; + let on: JoinOn = vec![( + Arc::new(Column::new_with_schema("a", &left.schema())?) as _, + Arc::new(Column::new_with_schema("a", &right.schema())?) as _, + )]; + let (_columns, batches, _metrics) = join_collect( + left, + right, + on, + &JoinType::Inner, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + Ok(()) + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn test_phj_null_equals_null_build_no_nulls_probe_has_nulls(