diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 40daca1df68..f5bd4b0d779 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -19,7 +19,8 @@ package org.apache.celeborn.service.deploy.worker.storage import java.io.IOException import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, TimeUnit} -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLongArray} +import java.util.function.IntUnaryOperator import scala.util.Random @@ -48,7 +49,10 @@ abstract private[worker] class Flusher( protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]() protected val workers = new Array[ExecutorService](threadCount) - protected var nextWorkerIndex: Int = 0 + protected val nextWorkerIndex: AtomicInteger = new AtomicInteger(0) + private val workerIndexUpdater: IntUnaryOperator = new IntUnaryOperator { + override def applyAsInt(i: Int): Int = (i + 1) % threadCount + } val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount) val stopFlag = new AtomicBoolean(false) @@ -104,9 +108,8 @@ abstract private[worker] class Flusher( ThreadPoolSource.registerSource(s"$this", workers) } - def getWorkerIndex: Int = synchronized { - nextWorkerIndex = (nextWorkerIndex + 1) % threadCount - nextWorkerIndex + def getWorkerIndex: Int = { + nextWorkerIndex.updateAndGet(workerIndexUpdater) } def takeBuffer(): CompositeByteBuf = {