Skip to content

Commit 28c52af

Browse files
committed
Prevent race coditions betwee ncreating and setting stopFuture
1 parent 3fc0b98 commit 28c52af

File tree

1 file changed

+27
-24
lines changed

1 file changed

+27
-24
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -300,31 +300,34 @@ private void cleanUpScheduler() {
300300
final var input = new ProtocolAdapterStopInputImpl();
301301
final var output = new ProtocolAdapterStopOutputImpl();
302302

303-
final var stopFuture = CompletableFuture.supplyAsync(() -> {
304-
stopPolling(protocolAdapterPollingService);
305-
stopWriting(protocolAdapterWritingService);
306-
try {
307-
adapter.stop(input, output);
308-
} catch (final Throwable throwable) {
309-
output.getOutputFuture().completeExceptionally(throwable);
310-
}
311-
return output.getOutputFuture();
312-
}).thenCompose(Function.identity()).whenComplete((result, throwable) -> {
313-
if (destroy) {
314-
log.info("Destroying adapter with id '{}'", getId());
315-
adapter.destroy();
316-
}
317-
if (throwable == null) {
318-
log.info("Stopped adapter with id {}", adapter.getId());
319-
} else {
320-
log.error("Error stopping adapter with id {}", adapter.getId(), throwable);
321-
}
322-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
323-
operationState.set(OperationState.IDLE);
324-
stopFutureRef.set(null);
325-
});
326-
303+
final var stopFuture = new CompletableFuture<Void>();
327304
stopFutureRef.set(stopFuture);
305+
CompletableFuture.supplyAsync(() -> {
306+
stopPolling(protocolAdapterPollingService);
307+
stopWriting(protocolAdapterWritingService);
308+
try {
309+
adapter.stop(input, output);
310+
} catch (final Throwable throwable) {
311+
output.getOutputFuture().completeExceptionally(throwable);
312+
}
313+
return output.getOutputFuture();
314+
}).thenCompose(Function.identity())
315+
.whenComplete((result, throwable) -> {
316+
if (destroy) {
317+
log.info("Destroying adapter with id '{}'", getId());
318+
adapter.destroy();
319+
}
320+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
321+
operationState.set(OperationState.IDLE);
322+
stopFutureRef.set(null);
323+
if (throwable == null) {
324+
log.info("Stopped adapter with id {}", adapter.getId());
325+
stopFuture.complete(null);
326+
} else {
327+
log.error("Error stopping adapter with id {}", adapter.getId(), throwable);
328+
stopFuture.completeExceptionally(throwable);
329+
}
330+
});
328331

329332
return stopFuture;
330333
}

0 commit comments

Comments
 (0)