Skip to content

Sanity check failed when sort and aggregate on a multi-partitioned table #18989

@Ke-Wng

Description

@Ke-Wng

Describe the bug

When creating a logical plan that involves a sequence of Sort -> Aggregate -> Sort -> Aggregate, and the input table is multi-partitioned, the physical planner fails during the SanityCheckPlan phase.

It appears that the optimizer fails to inject a necessary RepartitionExec node between the two Aggregate operations, leading to a distribution requirement mismatch panic.

To Reproduce

Here is a minimal reproduction script (using datafusion and tokio crate):

use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::logical_expr::col;
use datafusion::prelude::*;

#[tokio::main]
async fn main() {
    let ctx = SessionContext::default();

    let schema = Arc::new(Schema::new(vec![
        Field::new("ts", DataType::Int64, false),
        Field::new("region", DataType::Utf8, false),
        Field::new("value", DataType::Float64, false),
    ]));

    // create an empty but multi-partitioned MemTable
    let mem_table = MemTable::try_new(schema.clone(), vec![vec![], vec![]]).unwrap();
    ctx.register_table("metrics", Arc::new(mem_table)).unwrap();

    // aggregate and sort twice
    let data_frame = ctx
        .table("metrics")
        .await
        .unwrap()
        .aggregate(
            vec![col("region"), col("ts")],
            vec![count_udaf().call(vec![col("value")])],
        )
        .unwrap()
        .sort(vec![
            col("region").sort(true, true),
            col("ts").sort(true, true),
        ])
        .unwrap()
        .aggregate(
            vec![col("ts")],
            vec![count_udaf().call(vec![col("count(metrics.value)")])],
        )
        .unwrap()
        .sort(vec![col("ts").sort(true, true)])
        .unwrap();

    println!(
        "Logical Plan:\n{}",
        data_frame.logical_plan().display_indent()
    );

    data_frame.show().await.unwrap();
}

Expected behavior

The query should execute successfully, but it panics:

Logical Plan:
Sort: metrics.ts ASC NULLS FIRST
  Aggregate: groupBy=[[metrics.ts]], aggr=[[count(count(metrics.value))]]
    Sort: metrics.region ASC NULLS FIRST, metrics.ts ASC NULLS FIRST
      Aggregate: groupBy=[[metrics.region, metrics.ts]], aggr=[[count(metrics.value)]]
        TableScan: metrics

thread 'main' panicked at src/main.rs:51:29:
called `Result::unwrap()` on an `Err` value: Context("SanityCheckPlan", Plan("Plan: [\"AggregateExec: mode=SinglePartitioned, gby=[ts@0 as ts], aggr=[count(count(metrics.value))]\", \"  ProjectionExec: expr=[ts@1 as ts, count(metrics.value)@2 as count(metrics.value)]\", \"    AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, ts@1 as ts], aggr=[count(metrics.value)]\", \"      CoalesceBatchesExec: target_batch_size=8192\", \"        RepartitionExec: partitioning=Hash([region@0, ts@1], 64), input_partitions=2\", \"          AggregateExec: mode=Partial, gby=[region@1 as region, ts@0 as ts], aggr=[count(metrics.value)]\", \"            DataSourceExec: partitions=2, partition_sizes=[0, 0]\"] does not satisfy distribution requirements: HashPartitioned[[ts@0]]). Child-0 output partitioning: Hash([region@0, ts@0], 64)"))

Additional context

If the input table is single-partitioned, or any sort operation is missing, the query executes successfully.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions