Skip to content

Commit 325c60c

Browse files
ramarajesh2112Ram Gudivada
andauthored
fix: Use thread-safe datastructure in SimultaneousExecutor (#736)
* fix: Use thread-safe datastructure in SimultaneousExecutor * Add javadocs * Add javadocs * Update javadocs * Fix formatting --------- Co-authored-by: Ram Gudivada <[email protected]>
1 parent cc1333a commit 325c60c

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

hollow/src/main/java/com/netflix/hollow/core/util/SimultaneousExecutor.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818

1919
import static com.netflix.hollow.core.util.Threads.daemonThread;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
2321
import java.util.concurrent.Callable;
22+
import java.util.concurrent.ConcurrentLinkedQueue;
2423
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.LinkedBlockingQueue;
@@ -30,16 +29,17 @@
3029
import java.util.concurrent.TimeUnit;
3130

3231
/**
33-
*
3432
* A convenience wrapper around ThreadPoolExecutor. Provides sane defaults to
3533
* constructor arguments and allows for awaitUninterruptibly().
36-
*
34+
*
35+
* <p><strong>Internal Use:</strong> This class is intended for internal
36+
* framework use and is not meant for external consumption.
3737
*/
3838
public class SimultaneousExecutor extends ThreadPoolExecutor {
3939

4040
private static final String DEFAULT_THREAD_NAME = "simultaneous-executor";
4141

42-
private final List<Future<?>> futures = new ArrayList<Future<?>>();
42+
private final ConcurrentLinkedQueue<Future<?>> futures = new ConcurrentLinkedQueue<>();
4343

4444
/**
4545
* Creates an executor with a thread per processor.
@@ -253,18 +253,21 @@ public void awaitSuccessfulCompletion() throws InterruptedException, ExecutionEx
253253
* if 1 or more tasks failed.
254254
*
255255
* After this call completes, the thread pool will <i>not</i> be shut down and can be reused.
256+
*
257+
* If tasks are being submitted concurrently from other threads while this method executes,
258+
* the iteration over futures is weakly consistent and may not include all concurrently submitted
259+
* tasks. Ideally this method should be called after all the tasks are submitted.
256260
*
257261
* @throws ExecutionException if a computation threw an
258262
* exception
259263
* @throws InterruptedException if the current thread was interrupted
260264
* while waiting
261265
*/
262266
public void awaitSuccessfulCompletionOfCurrentTasks() throws InterruptedException, ExecutionException {
263-
for(Future<?> f : futures) {
267+
Future<?> f;
268+
while ((f = futures.poll()) != null) {
264269
f.get();
265270
}
266-
267-
futures.clear();
268271
}
269272

270273
}

0 commit comments

Comments
 (0)