diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java index 722f5f7bc98..d13f8f070d4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java @@ -122,6 +122,7 @@ public Set> optionalOptions() { options.add(IcebergDataSinkOptions.PARTITION_KEY); options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED); options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL); + options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java index 02d693aba3b..2d3269f55d5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java @@ -136,7 +136,7 @@ public void addPostCommitTopology( // Shuffle by different table id. DataStream> keyedStream = committableMessageDataStream.partitionCustom( - (bucket, numPartitions) -> bucket % numPartitions, + Math::floorMod, (committableMessage) -> { if (committableMessage instanceof CommittableWithLineage) { WriteResultWrapper multiTableCommittable = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java index 8a5e5e4d0f0..126491b73f0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java @@ -40,6 +40,7 @@ void testCreateDataSink() { Configuration conf = Configuration.fromMap(ImmutableMap.builder().build()); conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse"); + conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4); DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java index a150e7188fa..f73a8c9217f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java @@ -150,7 +150,8 @@ public void testCompationOperator() throws IOException, InterruptedException { } CompactionOperator compactionOperator = new CompactionOperator( - catalogOptions, CompactionOptions.builder().commitInterval(1).build()); + catalogOptions, + CompactionOptions.builder().commitInterval(1).parallelism(4).build()); compactionOperator.processElement( new StreamRecord<>( new CommittableWithLineage<>(