diff --git a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala index 9a152eca9..7f3febf8f 100644 --- a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala +++ b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala @@ -121,47 +121,53 @@ class AuronUniffleShuffleReader[K, C]( val emptyPartitionIds = new util.ArrayList[Int] for (partition <- startPartition until endPartition) { - if (partitionToExpectBlocks.get(partition).isEmpty) { - logInfo(s"$partition is a empty partition") + val expectedBlocks = partitionToExpectBlocks.get(partition) + if (expectedBlocks == null || expectedBlocks.isEmpty) { + logDebug(s"$partition is a empty partition") emptyPartitionIds.add(partition) } else { val shuffleServerInfoList: util.List[ShuffleServerInfo] = partitionToShuffleServers.get(partition) - // This mechanism of expectedTaskIdsBitmap filter is to filter out the most of data. - // especially for AQE skew optimization - val expectedTaskIdsBitmapFilterEnable: Boolean = - !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE) || shuffleServerInfoList.size > 1 - val retryMax: Int = rssConf.getInteger( - RssClientConfig.RSS_CLIENT_RETRY_MAX, - RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE) - val retryIntervalMax: Long = rssConf.getLong( - RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, - RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE) - val shuffleReadClient: ShuffleReadClient = - ShuffleClientFactory.getInstance.createShuffleReadClient( - ShuffleClientFactory.newReadBuilder - .appId(appId) - .shuffleId(shuffleId) - .partitionId(partition) - .basePath(basePath) - .partitionNumPerRange(1) - .partitionNum(partitionNum) - .blockIdBitmap(partitionToExpectBlocks.get(partition)) - .taskIdBitmap(taskIdBitmap) - .shuffleServerInfoList(shuffleServerInfoList) - .hadoopConf(hadoopConf) - .shuffleDataDistributionType(dataDistributionType) - .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable) - .retryMax(retryMax) - .retryIntervalMax(retryIntervalMax) - .rssConf(rssConf)) - val iterator: RssShuffleDataIterWrapper[K, C] = - new RssShuffleDataIterWrapper[K, C]( - dependency, - shuffleReadClient, - readMetrics, - rssConf) - shuffleDataIterList.add(iterator) + if (shuffleServerInfoList == null || shuffleServerInfoList.isEmpty) { + logWarning(s"$partition has no shuffle servers, treat as empty partition") + emptyPartitionIds.add(partition) + } else { + // This mechanism of expectedTaskIdsBitmap filter is to filter out the most of data. + // especially for AQE skew optimization + val expectedTaskIdsBitmapFilterEnable: Boolean = + !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE) || shuffleServerInfoList.size > 1 + val retryMax: Int = rssConf.getInteger( + RssClientConfig.RSS_CLIENT_RETRY_MAX, + RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE) + val retryIntervalMax: Long = rssConf.getLong( + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE) + val shuffleReadClient: ShuffleReadClient = + ShuffleClientFactory.getInstance.createShuffleReadClient( + ShuffleClientFactory.newReadBuilder + .appId(appId) + .shuffleId(shuffleId) + .partitionId(partition) + .basePath(basePath) + .partitionNumPerRange(1) + .partitionNum(partitionNum) + .blockIdBitmap(expectedBlocks) + .taskIdBitmap(taskIdBitmap) + .shuffleServerInfoList(shuffleServerInfoList) + .hadoopConf(hadoopConf) + .shuffleDataDistributionType(dataDistributionType) + .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable) + .retryMax(retryMax) + .retryIntervalMax(retryIntervalMax) + .rssConf(rssConf)) + val iterator: RssShuffleDataIterWrapper[K, C] = + new RssShuffleDataIterWrapper[K, C]( + dependency, + shuffleReadClient, + readMetrics, + rssConf) + shuffleDataIterList.add(iterator) + } } } if (!emptyPartitionIds.isEmpty) {