diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java index fbc3c27973..5a49bfd273 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java +++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java @@ -29,6 +29,8 @@ public class ShufflePartitionedData { private final ShufflePartitionedBlock[] blockList; private final long totalBlockEncodedLength; private final long totalBlockDataLength; + private int duplicateBlockCount; + private long duplicateBlockSize; public ShufflePartitionedData( int partitionId, long encodedLength, long dataLength, ShufflePartitionedBlock[] blockList) { @@ -80,4 +82,20 @@ public long getTotalBlockEncodedLength() { public long getTotalBlockDataLength() { return totalBlockDataLength; } + + public int getDuplicateBlockCount() { + return duplicateBlockCount; + } + + public void setDuplicateBlockCount(int duplicateBlockCount) { + this.duplicateBlockCount = duplicateBlockCount; + } + + public long getDuplicateBlockSize() { + return duplicateBlockSize; + } + + public void setDuplicateBlockSize(long duplicateBlockSize) { + this.duplicateBlockSize = duplicateBlockSize; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java index 7823cad041..04a18df00c 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java @@ -120,6 +120,10 @@ public long getDataLength() { return dataLength; } + public long getBlockCount() { + return shuffleBlocks.size(); + } + public String getAppId() { return appId; } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index bd35750d38..b62df9c322 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -458,8 +458,7 @@ public void sendShuffleData( // after each cacheShuffleData call, the `preAllocatedSize` is updated timely. manager.releasePreAllocatedSize(toReleasedSize); alreadyReleasedSize += toReleasedSize; - manager.updateCachedBlockIds( - appId, shuffleId, spd.getPartitionId(), spd.getBlockList()); + manager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd); } } catch (ExceedHugePartitionHardLimitException e) { String errorMsg = diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index ced8a17a01..1f4d451e52 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -99,6 +99,8 @@ public class ShuffleServerMetrics { private static final String ALLOCATED_BUFFER_SIZE = "allocated_buffer_size"; private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size"; private static final String USED_BUFFER_SIZE = "used_buffer_size"; + private static final String TOTAL_IN_MEMORY_BLOCK_COUNT = "total_in_memory_block_count"; + private static final String TOTAL_IN_FLUSH_BLOCK_COUNT = "total_in_flush_block_count"; private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size"; public static final String USED_DIRECT_MEMORY_SIZE = "used_direct_memory_size"; public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = "used_direct_memory_size_by_netty"; @@ -158,6 +160,10 @@ public class ShuffleServerMetrics { public static final String TOPN_OF_TOTAL_DATA_SIZE_FOR_APP = "topN_of_total_data_size_for_app"; public static final String TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP = "topN_of_in_memory_data_size_for_app"; + public static final String TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP = + "topN_of_in_memory_block_count_for_app"; + public static final String BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP = + "bottomN_of_in_memory_avg_block_size_for_app"; public static final String TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP = "topN_of_on_localfile_data_size_for_app"; public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP = @@ -239,6 +245,8 @@ public class ShuffleServerMetrics { public static Gauge.Child gaugeAllocatedBufferSize; public static Gauge.Child gaugeInFlushBufferSize; public static Gauge.Child gaugeUsedBufferSize; + public static Gauge.Child gaugeTotalInMemoryBlockCount; + public static Gauge.Child gaugeTotalInFlushBlockCount; public static Gauge.Child gaugeReadBufferUsedSize; public static Gauge.Child gaugeWriteHandler; public static Gauge.Child gaugeMergeEventQueueSize; @@ -257,6 +265,8 @@ public class ShuffleServerMetrics { public static Gauge gaugeTotalDataSizeUsage; public static Gauge gaugeInMemoryDataSizeUsage; + public static Gauge gaugeInMemoryBlockCount; + public static Gauge gaugeInMemoryAvgBlockSize; public static Gauge gaugeOnDiskDataSizeUsage; public static Gauge gaugeOnHadoopDataSizeUsage; @@ -477,6 +487,8 @@ private static void setUpMetrics(ShuffleServerConf serverConf) { gaugeAllocatedBufferSize = metricsManager.addLabeledGauge(ALLOCATED_BUFFER_SIZE); gaugeInFlushBufferSize = metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE); gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE); + gaugeTotalInMemoryBlockCount = metricsManager.addLabeledGauge(TOTAL_IN_MEMORY_BLOCK_COUNT); + gaugeTotalInFlushBlockCount = metricsManager.addLabeledGauge(TOTAL_IN_FLUSH_BLOCK_COUNT); gaugeReadBufferUsedSize = metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE); gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER); gaugeMergeEventQueueSize = metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE); @@ -536,6 +548,20 @@ private static void setUpMetrics(ShuffleServerConf serverConf) { .labelNames("app_id") .register(metricsManager.getCollectorRegistry()); + gaugeInMemoryBlockCount = + Gauge.build() + .name(TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP) + .help("top N of in memory shuffle block count for app level") + .labelNames("app_id") + .register(metricsManager.getCollectorRegistry()); + + gaugeInMemoryAvgBlockSize = + Gauge.build() + .name(BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP) + .help("bottom N of in memory shuffle average block size for app level") + .labelNames("app_id") + .register(metricsManager.getCollectorRegistry()); + gaugeOnDiskDataSizeUsage = Gauge.build() .name(TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index e748a892f8..820c763579 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -60,6 +60,7 @@ public class ShuffleTaskInfo { private final AtomicLong totalDataSize = new AtomicLong(0); private final AtomicLong inMemoryDataSize = new AtomicLong(0); + private final AtomicLong inMemoryBlockCount = new AtomicLong(0); private final AtomicLong onLocalFileNum = new AtomicLong(0); private final AtomicLong onLocalFileDataSize = new AtomicLong(0); private final AtomicLong onHadoopFileNum = new AtomicLong(0); @@ -176,6 +177,19 @@ public long getInMemoryDataSize() { return inMemoryDataSize.get(); } + public long getInMemoryBlockCount() { + return inMemoryBlockCount.get(); + } + + public void addInMemoryBlockCount(long delta) { + inMemoryBlockCount.addAndGet(delta); + } + + public long getInMemoryAvgBlockSize() { + long blockCount = getInMemoryBlockCount(); + return blockCount <= 0 ? Long.MAX_VALUE : getInMemoryDataSize() / blockCount; + } + public long addOnLocalFileDataSize(long delta, boolean isNewlyCreated) { if (isNewlyCreated) { onLocalFileNum.incrementAndGet(); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index f2c1204643..33712244b1 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -481,12 +481,14 @@ public int updateAndGetCommitCount(String appId, int shuffleId) { } // Only for tests - public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) { - updateCachedBlockIds(appId, shuffleId, 0, spbs); + public void updateCachedBlockIds( + String appId, int shuffleId, ShufflePartitionedData shufflePartitionedData) { + updateCachedBlockIds(appId, shuffleId, 0, shufflePartitionedData); } public void updateCachedBlockIds( - String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] spbs) { + String appId, int shuffleId, int partitionId, ShufflePartitionedData shufflePartitionedData) { + ShufflePartitionedBlock[] spbs = shufflePartitionedData.getBlockList(); if (spbs == null || spbs.length == 0) { return; } @@ -512,7 +514,12 @@ public void updateCachedBlockIds( size += spb.getEncodedLength(); } } - long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size); + int blockCount = spbs.length - shufflePartitionedData.getDuplicateBlockCount(); + shuffleBufferManager.addInMemoryBlockCount(blockCount); + shuffleTaskInfo.addInMemoryBlockCount(blockCount); + long partitionSize = + shuffleTaskInfo.addPartitionDataSize( + shuffleId, partitionId, size - shufflePartitionedData.getDuplicateBlockSize()); HugePartitionUtils.markHugePartition( shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId, partitionSize); } diff --git a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java index 97ffb3cf41..b835c904a3 100644 --- a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java +++ b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java @@ -37,6 +37,8 @@ public class TopNShuffleDataSizeOfAppCalcTask { private final Gauge gaugeTotalDataSize; private final Gauge gaugeInMemoryDataSize; + private final Gauge gaugeInMemoryBlockCount; + private final Gauge gaugeInMemoryAvgBlockSize; private final Gauge gaugeOnLocalFileDataSize; private final Gauge gaugeOnHadoopDataSize; @@ -50,6 +52,8 @@ public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager, ShuffleS shuffleTaskManager = taskManager; this.gaugeTotalDataSize = ShuffleServerMetrics.gaugeTotalDataSizeUsage; this.gaugeInMemoryDataSize = ShuffleServerMetrics.gaugeInMemoryDataSizeUsage; + this.gaugeInMemoryBlockCount = ShuffleServerMetrics.gaugeInMemoryBlockCount; + this.gaugeInMemoryAvgBlockSize = ShuffleServerMetrics.gaugeInMemoryAvgBlockSize; this.gaugeOnLocalFileDataSize = ShuffleServerMetrics.gaugeOnDiskDataSizeUsage; this.gaugeOnHadoopDataSize = ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage; this.scheduler = @@ -72,6 +76,22 @@ private void calcTopNShuffleDataSize() { .set(taskInfo.getValue().getInMemoryDataSize()); } + topNTaskInfo = calcTopNInMemoryBlockCountTaskInfo(); + gaugeInMemoryBlockCount.clear(); + for (Map.Entry taskInfo : topNTaskInfo) { + gaugeInMemoryBlockCount + .labels(taskInfo.getKey()) + .set(taskInfo.getValue().getInMemoryBlockCount()); + } + + topNTaskInfo = calcBottomNInMemoryAvgBlockSizeTaskInfo(); + gaugeInMemoryAvgBlockSize.clear(); + for (Map.Entry taskInfo : topNTaskInfo) { + gaugeInMemoryAvgBlockSize + .labels(taskInfo.getKey()) + .set(taskInfo.getValue().getInMemoryAvgBlockSize()); + } + topNTaskInfo = calcTopNOnLocalFileDataSizeTaskInfo(); gaugeOnLocalFileDataSize.clear(); for (Map.Entry taskInfo : topNTaskInfo) { @@ -108,6 +128,27 @@ public List> calcTopNInMemoryDataSizeTaskInfo .collect(Collectors.toList()); } + public List> calcTopNInMemoryBlockCountTaskInfo() { + return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream() + .sorted( + (e1, e2) -> + Long.compare( + e2.getValue().getInMemoryBlockCount(), e1.getValue().getInMemoryBlockCount())) + .limit(topNShuffleDataNumber) + .collect(Collectors.toList()); + } + + public List> calcBottomNInMemoryAvgBlockSizeTaskInfo() { + return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream() + .sorted( + (e1, e2) -> + Long.compare( + e1.getValue().getInMemoryAvgBlockSize(), + e2.getValue().getInMemoryAvgBlockSize())) + .limit(topNShuffleDataNumber) + .collect(Collectors.toList()); + } + public List> calcTopNOnLocalFileDataSizeTaskInfo() { return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream() .sorted( diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index ed93312914..dee8418984 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -55,6 +55,7 @@ import org.apache.uniffle.server.ShuffleFlushManager; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.server.ShuffleTaskInfo; import org.apache.uniffle.server.ShuffleTaskManager; import org.apache.uniffle.server.buffer.lab.ChunkCreator; import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList; @@ -96,6 +97,8 @@ public class ShuffleBufferManager { protected AtomicLong inFlushSize = new AtomicLong(0L); protected AtomicLong usedMemory = new AtomicLong(0L); private AtomicLong readDataMemory = new AtomicLong(0L); + private final AtomicLong inMemoryBlockCount = new AtomicLong(0); + private final AtomicLong inFlushBlockCount = new AtomicLong(0); // appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId protected Map>> bufferPool; // appId -> shuffleId -> shuffle size in buffer @@ -496,11 +499,22 @@ protected boolean flushBuffer( shuffleFlushManager.getDataDistributionType(appId)); if (event != null) { event.addCleanupCallback(() -> releaseMemory(event.getEncodedLength(), true, false)); + event.addCleanupCallback( + () -> { + long blockCount = event.getBlockCount(); + ShuffleTaskInfo shuffleTaskInfo = shuffleTaskManager.getShuffleTaskInfo(appId); + if (shuffleTaskInfo != null) { + shuffleTaskInfo.addInMemoryBlockCount(-blockCount); + } + addInMemoryBlockCount(-blockCount); + addInFlushBlockCount(-blockCount); + }); updateShuffleSize(appId, shuffleId, -event.getEncodedLength()); inFlushSize.addAndGet(event.getEncodedLength()); if (isHugePartition) { event.markOwnedByHugePartition(); } + addInFlushBlockCount(event.getBlockCount()); ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get()); shuffleFlushManager.addToFlushQueue(event); return true; @@ -524,6 +538,16 @@ public void removeBuffer(String appId) { } } + public void addInMemoryBlockCount(long delta) { + long blockCount = inMemoryBlockCount.addAndGet(delta); + ShuffleServerMetrics.gaugeTotalInMemoryBlockCount.set(blockCount); + } + + public void addInFlushBlockCount(long delta) { + long blockCount = inFlushBlockCount.addAndGet(delta); + ShuffleServerMetrics.gaugeTotalInFlushBlockCount.set(blockCount); + } + public synchronized boolean requireMemory(long size, boolean isPreAllocated) { if (capacity - usedMemory.get() >= size) { usedMemory.addAndGet(size); @@ -891,6 +915,7 @@ public void removeBufferByShuffleId(String appId, Collection shuffleIds Collection buffers = bufferRangeMap.asMapOfRanges().values(); if (buffers != null) { for (ShuffleBuffer buffer : buffers) { + addInMemoryBlockCount(-buffer.getBlockCount()); // the actual released size by this thread long releasedSize = buffer.release(); ShuffleServerMetrics.gaugeTotalPartitionNum.dec(); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java index 6485eef551..156797bef9 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -57,6 +57,8 @@ public synchronized long append(ShufflePartitionedData data) { } long currentEncodedLength = 0; long currentDataLength = 0; + int duplicateBlockCount = 0; + long duplicateBlockSize = 0; for (ShufflePartitionedBlock block : data.getBlockList()) { // If sendShuffleData retried, we may receive duplicate block. The duplicate @@ -65,11 +67,15 @@ public synchronized long append(ShufflePartitionedData data) { currentEncodedLength += block.getEncodedLength(); currentDataLength += block.getDataLength(); } else { + duplicateBlockCount++; + duplicateBlockSize += block.getEncodedLength(); releaseBlock(block); } } this.encodedLength += currentEncodedLength; this.dataLength += currentDataLength; + data.setDuplicateBlockCount(duplicateBlockCount); + data.setDuplicateBlockSize(duplicateBlockSize); return currentEncodedLength; } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java index 9c65eeee08..c442c9af6d 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -64,6 +64,8 @@ public synchronized long append(ShufflePartitionedData data) { } long currentEncodedLength = 0; long currentDataLength = 0; + int duplicateBlockCount = 0; + long duplicateBlockSize = 0; for (ShufflePartitionedBlock block : data.getBlockList()) { // If sendShuffleData retried, we may receive duplicate block. The duplicate @@ -74,11 +76,15 @@ public synchronized long append(ShufflePartitionedData data) { currentEncodedLength += block.getEncodedLength(); currentDataLength += block.getDataLength(); } else { + duplicateBlockCount++; + duplicateBlockSize += block.getEncodedLength(); releaseBlock(block); } } this.encodedLength += currentEncodedLength; this.dataLength += currentDataLength; + data.setDuplicateBlockCount(duplicateBlockCount); + data.setDuplicateBlockSize(duplicateBlockSize); return currentEncodedLength; } diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java index e4000bc39f..01ba26c8aa 100644 --- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java +++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java @@ -330,7 +330,7 @@ private boolean cachedMergedBlock(ByteBuf byteBuf, long blockId, int length) { shuffle .shuffleServer .getShuffleTaskManager() - .updateCachedBlockIds(appId, shuffle.shuffleId, spd.getPartitionId(), spd.getBlockList()); + .updateCachedBlockIds(appId, shuffle.shuffleId, spd.getPartitionId(), spd); sleepTime = initSleepTime; return true; } else { diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index 0b9b5f83cd..d6489deb6d 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -317,8 +317,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData // after each cacheShuffleData call, the `preAllocatedSize` is updated timely. shuffleTaskManager.releasePreAllocatedSize(toReleasedSize); alreadyReleasedSize += toReleasedSize; - shuffleTaskManager.updateCachedBlockIds( - appId, shuffleId, spd.getPartitionId(), spd.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd); } } catch (ExceedHugePartitionHardLimitException e) { String errorMsg = diff --git a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java index c21fb0db49..0a5508d8fd 100644 --- a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java @@ -134,9 +134,9 @@ public void removeShuffleDataWithHdfsTest() throws Exception { shuffleTaskManager.requireBuffer(35); shuffleTaskManager.requireBuffer(35); shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0); shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0); shuffleTaskManager.refreshAppId(appId); shuffleTaskManager.checkResourceStatus(); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index b9cdf1e248..3356cca5ae 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -156,7 +156,7 @@ public void appPurgeWithLocalfileTest() throws Exception { ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); shuffleTaskManager.requireBuffer(35); shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0); } assertEquals(1, shuffleTaskManager.getAppIds().size()); @@ -220,7 +220,7 @@ public void hugePartitionMemoryUsageLimitTest() throws Exception { // case3 ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500); shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0); try { long requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), Arrays.asList(500), 500); @@ -232,7 +232,7 @@ public void hugePartitionMemoryUsageLimitTest() throws Exception { // case4 partitionedData0 = createPartitionedData(1, 1, 500); shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0); try { shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), Arrays.asList(500), 500); fail("Should throw NoBufferForHugePartitionException"); @@ -255,7 +255,7 @@ public void hugePartitionMemoryUsageLimitTest() throws Exception { shuffleServer.getShuffleBufferManager().setBufferFlushThreshold(1024); partitionedData0 = createPartitionedData(1, 1, 500); shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0); try { shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), Arrays.asList(500), 500); fail("Should throw NoBufferForHugePartitionException"); @@ -298,14 +298,14 @@ public void partitionDataSizeSummaryTest() throws Exception { // case1 ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); long size1 = partitionedData0.getTotalBlockEncodedLength(); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0); assertEquals(size1, shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()); // case2 partitionedData0 = createPartitionedData(1, 1, 35); long size2 = partitionedData0.getTotalBlockEncodedLength(); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0); assertEquals(size1 + size2, shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()); assertEquals( size1 + size2, shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1)); @@ -412,7 +412,7 @@ public void writeProcessTest() throws Exception { PreAllocatedBufferInfo pabi = bufferIds.get(bufferId); assertEquals(35, pabi.getRequireSize()); StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData0); // the required id won't be removed in shuffleTaskManager, it is removed in Grpc service assertEquals(1, bufferIds.size()); assertEquals(StatusCode.SUCCESS, sc); @@ -429,7 +429,7 @@ public void writeProcessTest() throws Exception { expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList())); bufferId = shuffleTaskManager.requireBuffer(70); sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData1); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData1.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData1); assertEquals(StatusCode.SUCCESS, sc); shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId); waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1); @@ -439,7 +439,7 @@ public void writeProcessTest() throws Exception { expectedBlocks1.addAll(Lists.newArrayList(partitionedData2.getBlockList())); // receive un-preAllocation data sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData2); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData2.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData2); assertEquals(StatusCode.SUCCESS, sc); // won't flush for partition 2-2 @@ -447,7 +447,7 @@ public void writeProcessTest() throws Exception { expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList())); bufferId = shuffleTaskManager.requireBuffer(30); sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData3); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData3.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData3); shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId); assertEquals(StatusCode.SUCCESS, sc); @@ -456,7 +456,7 @@ public void writeProcessTest() throws Exception { expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList())); bufferId = shuffleTaskManager.requireBuffer(35); sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData4); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData4.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData4); shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId); assertEquals(StatusCode.SUCCESS, sc); @@ -466,7 +466,7 @@ public void writeProcessTest() throws Exception { // flush for partition 1-1 ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData5.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData5); expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList())); bufferId = shuffleTaskManager.requireBuffer(70); sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData5); @@ -483,7 +483,7 @@ public void writeProcessTest() throws Exception { // flush for partition 0-1 ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData7.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData7); bufferId = shuffleTaskManager.requireBuffer(70); sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData7); assertEquals(StatusCode.SUCCESS, sc); @@ -544,9 +544,9 @@ public void removeShuffleDataWithHdfsTest() throws Exception { shuffleTaskManager.requireBuffer(35); shuffleTaskManager.requireBuffer(35); shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0); shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0); shuffleTaskManager.refreshAppId(appId); shuffleTaskManager.checkResourceStatus(); @@ -596,7 +596,7 @@ public void removeShuffleDataWithLocalfileTest() throws Exception { ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); shuffleTaskManager.requireBuffer(35); shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0); } assertEquals(1, shuffleTaskManager.getAppIds().size()); @@ -666,8 +666,7 @@ public void clearTest() throws Exception { Thread.sleep(1000); ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); shuffleTaskManager.cacheShuffleData("clearTest1", shuffleId, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds( - "clearTest1", shuffleId, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds("clearTest1", shuffleId, partitionedData0); shuffleTaskManager.refreshAppId("clearTest1"); shuffleTaskManager.checkResourceStatus(); retry++; @@ -1039,7 +1038,7 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep Thread.sleep(1000); ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48); shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, shuffleData); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, shuffleData.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, shuffleData); shuffleTaskManager.refreshAppId(appId); shuffleTaskManager.checkResourceStatus(); @@ -1215,7 +1214,7 @@ public void testStorageRemoveResourceHang(@TempDir File tmpDir) throws Exception ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); shuffleTaskManager.requireBuffer(35); shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0); shuffleTaskManager.refreshAppId(appId); shuffleTaskManager.checkResourceStatus(); assertEquals(1, shuffleTaskManager.getAppIds().size()); @@ -1260,7 +1259,7 @@ public void testRegisterShuffleAfterAppIsExpired() throws Exception { ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); shuffleTaskManager.requireBuffer(35); shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0); - shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0); shuffleTaskManager.refreshAppId(appId); shuffleTaskManager.checkResourceStatus(); assertEquals(1, shuffleTaskManager.getAppIds().size()); diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java index 7dbba94b22..f2991b226a 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java @@ -517,7 +517,7 @@ public void flushSingleBufferForHugePartitionTest(@TempDir File tmpDir) throws E shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0); ShufflePartitionedData partitionedData = createData(0, 1); shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData); assertEquals(1 + 32, shuffleBufferManager.getUsedMemory()); long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0); assertEquals(1 + 32, usedSize); @@ -532,7 +532,7 @@ public void flushSingleBufferForHugePartitionTest(@TempDir File tmpDir) throws E // case2: its partition is huge partition, its buffer will be flushed to DISK directly partitionedData = createData(0, 36); shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData); assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory()); assertTrue( HugePartitionUtils.limitHugePartition( @@ -543,7 +543,7 @@ public void flushSingleBufferForHugePartitionTest(@TempDir File tmpDir) throws E shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0))); partitionedData = createData(0, 1); shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData); waitForFlush(shuffleFlushManager, appId, shuffleId, 3); } @@ -845,7 +845,7 @@ public void splitPartitionTest(@TempDir File tmpDir) throws Exception { shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0); ShufflePartitionedData partitionedData = createData(0, 1); shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData); long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0); assertEquals(1 + 32, usedSize); assertFalse( @@ -855,7 +855,7 @@ public void splitPartitionTest(@TempDir File tmpDir) throws Exception { // case2: its partition exceed the split limit partitionedData = createData(0, 200); shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData); - shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData.getBlockList()); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, partitionedData); usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0); assertEquals(1 + 32 + 200 + 32, usedSize); assertTrue(