Skip to content

[SQL] dropDuplicates and Window dedup produce incorrect results with SPJ partiallyClusteredDistribution #54378

@paryoja

Description

@paryoja

What type of issue is this?

Bug

Spark version

4.0.1 (with Iceberg 1.10.1)

Describe the bug

When using Storage-Partitioned Join (SPJ) with spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true, both dropDuplicates() and Window-based dedup (row_number()) produce incorrect results — duplicate rows that should have been removed survive in the output.

Root cause

Partial clustering splits a partition with many files across multiple tasks to improve parallelism. However, downstream dedup operations (dropDuplicates, row_number() OVER (PARTITION BY ...)) rely on the assumption that all rows with the same partition key are co-located in a single task. Since SPJ eliminates the Exchange (shuffle), each split independently deduplicates, and duplicate partition keys survive across splits.

Steps to reproduce

  1. Create two Iceberg tables partitioned by part_key:
    • Big table: 20 separate appends to partition p1 (= 20 data files), plus 1 append to p2
    • Small table: 1 append containing both p1 and p2
  2. Enable SPJ with partial clustering:
spark.sql.sources.v2.bucketing.enabled = true
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled = true
spark.sql.iceberg.planning.preserve-data-grouping = true
spark.sql.autoBroadcastJoinThreshold = -1
  1. Perform a leftsemi join on part_key, then dropDuplicates(["part_key"])
  2. The physical plan contains no Exchange node, confirming SPJ is active
import shutil
import tempfile

from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
import pytest


ICEBERG_PKG = "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1"

SCHEMA = StructType([
    StructField("id", StringType()),
    StructField("part_key", StringType()),
    StructField("value", IntegerType()),
    StructField("padding", StringType()),
])


@pytest.fixture(scope="module")
def warehouse_dir():
    d = tempfile.mkdtemp(prefix="iceberg_spj_test_")
    yield d
    shutil.rmtree(d, ignore_errors=True)


@pytest.fixture(scope="module")
def spark_iceberg(warehouse_dir):
    spark = (
        SparkSession.builder.master("local[4]")
        .appName("spj_window_dedup_bug")
        .config("spark.jars.packages", ICEBERG_PKG)
        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        )
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.local.type", "hadoop")
        .config("spark.sql.catalog.local.warehouse", warehouse_dir)
        .config("spark.ui.enabled", "false")
        .config("spark.driver.memory", "2g")
        .config("spark.sql.shuffle.partitions", "4")
        .config("spark.sql.adaptive.enabled", "false")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
    yield spark
    spark.stop()


@pytest.fixture()
def asymmetric_tables(spark_iceberg):
    """Two Iceberg tables with asymmetric file counts to trigger partial clustering."""
    big, small = "local.db.big", "local.db.small"
    for t in (big, small):
        spark_iceberg.sql(f"DROP TABLE IF EXISTS {t}")
        spark_iceberg.sql(f"""
            CREATE TABLE {t} (id STRING, part_key STRING, value INT, padding STRING)
            USING iceberg PARTITIONED BY (part_key)
        """)

    # Big table: 20 appends = 20 files for partition p1
    for i in range(20):
        data = [(f"big_{i}_{j}", "p1", i, "X" * 1000) for j in range(200)]
        spark_iceberg.createDataFrame(data, SCHEMA).writeTo(big).append()
    spark_iceberg.createDataFrame([("big_other", "p2", 99, "Y")], SCHEMA).writeTo(big).append()

    # Small table: 1 append = 1 file
    spark_iceberg.createDataFrame(
        [("small_0", "p1", 0, "Z"), ("small_1", "p2", 1, "Z")], SCHEMA
    ).writeTo(small).append()

    spark_iceberg.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
    spark_iceberg.conf.set(
        "spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true"
    )
    spark_iceberg.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
    spark_iceberg.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

    return spark_iceberg.table(big), spark_iceberg.table(small)


def _get_plan(df) -> str:
    return df._jdf.queryExecution().executedPlan().toString()


class TestSPJDedupBug:

    def test_drop_duplicates_after_join_produces_duplicates(self, asymmetric_tables):
        """dropDuplicates after SPJ join → duplicates survive (expected 2, gets >2)."""
        big, small = asymmetric_tables

        deduped = big.join(small, on="part_key", how="leftsemi").dropDuplicates(["part_key"])

        assert "Exchange" not in _get_plan(deduped)
        assert deduped.count() > 2

    def test_window_dedup_after_join_produces_duplicates(self, asymmetric_tables):
        """row_number() Window dedup after SPJ join → duplicates survive."""
        big, small = asymmetric_tables

        joined = big.join(small, on="part_key", how="leftsemi")
        w = Window.partitionBy("part_key").orderBy(F.col("value").desc())
        deduped = joined.withColumn("_r", F.row_number().over(w)).filter("_r = 1").drop("_r")

        assert "Exchange" not in _get_plan(deduped)
        assert deduped.count() > 2

    def test_disabling_partial_clustering_fixes_it(self, asymmetric_tables, spark_iceberg):
        """Setting partiallyClusteredDistribution.enabled=false → dedup works."""
        big, small = asymmetric_tables
        spark_iceberg.conf.set(
            "spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "false"
        )

        deduped = big.join(small, on="part_key", how="leftsemi").dropDuplicates(["part_key"])

        assert deduped.count() == 2

Related

SPARK-38166: Duplicates after task failure in dropDuplicates and repartition

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions