-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
When creating a logical plan that involves a sequence of Sort -> Aggregate -> Sort -> Aggregate, and the input table is multi-partitioned, the physical planner fails during the SanityCheckPlan phase.
It appears that the optimizer fails to inject a necessary RepartitionExec node between the two Aggregate operations, leading to a distribution requirement mismatch panic.
To Reproduce
Here is a minimal reproduction script (using datafusion and tokio crate):
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::logical_expr::col;
use datafusion::prelude::*;
#[tokio::main]
async fn main() {
let ctx = SessionContext::default();
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
]));
// create an empty but multi-partitioned MemTable
let mem_table = MemTable::try_new(schema.clone(), vec![vec![], vec![]]).unwrap();
ctx.register_table("metrics", Arc::new(mem_table)).unwrap();
// aggregate and sort twice
let data_frame = ctx
.table("metrics")
.await
.unwrap()
.aggregate(
vec![col("region"), col("ts")],
vec![count_udaf().call(vec![col("value")])],
)
.unwrap()
.sort(vec![
col("region").sort(true, true),
col("ts").sort(true, true),
])
.unwrap()
.aggregate(
vec![col("ts")],
vec![count_udaf().call(vec![col("count(metrics.value)")])],
)
.unwrap()
.sort(vec![col("ts").sort(true, true)])
.unwrap();
println!(
"Logical Plan:\n{}",
data_frame.logical_plan().display_indent()
);
data_frame.show().await.unwrap();
}Expected behavior
The query should execute successfully, but it panics:
Logical Plan:
Sort: metrics.ts ASC NULLS FIRST
Aggregate: groupBy=[[metrics.ts]], aggr=[[count(count(metrics.value))]]
Sort: metrics.region ASC NULLS FIRST, metrics.ts ASC NULLS FIRST
Aggregate: groupBy=[[metrics.region, metrics.ts]], aggr=[[count(metrics.value)]]
TableScan: metrics
thread 'main' panicked at src/main.rs:51:29:
called `Result::unwrap()` on an `Err` value: Context("SanityCheckPlan", Plan("Plan: [\"AggregateExec: mode=SinglePartitioned, gby=[ts@0 as ts], aggr=[count(count(metrics.value))]\", \" ProjectionExec: expr=[ts@1 as ts, count(metrics.value)@2 as count(metrics.value)]\", \" AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, ts@1 as ts], aggr=[count(metrics.value)]\", \" CoalesceBatchesExec: target_batch_size=8192\", \" RepartitionExec: partitioning=Hash([region@0, ts@1], 64), input_partitions=2\", \" AggregateExec: mode=Partial, gby=[region@1 as region, ts@0 as ts], aggr=[count(metrics.value)]\", \" DataSourceExec: partitions=2, partition_sizes=[0, 0]\"] does not satisfy distribution requirements: HashPartitioned[[ts@0]]). Child-0 output partitioning: Hash([region@0, ts@0], 64)"))
Additional context
If the input table is single-partitioned, or any sort operation is missing, the query executes successfully.
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working