Skip to content

Commit 2167aa5

Browse files
committed
Ordering getting wild
1 parent 3c27b0e commit 2167aa5

File tree

3 files changed

+113
-97
lines changed

3 files changed

+113
-97
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,9 @@ public int getDepth() {
389389
log.trace("Adapter '{}' was stopped successfully.", adapterId);
390390
}
391391
});
392-
case RESTART -> protocolAdapterManager.stopAsync(adapterId, false)
393-
.thenCompose(ignore -> protocolAdapterManager.startAsync(adapterId))
392+
case RESTART -> protocolAdapterManager
393+
.stopAsync(adapterId, false)
394+
.thenCompose(ignore -> protocolAdapterManager.startAsync(adapterId))
394395
.whenComplete((result, throwable) -> {
395396
if (throwable != null) {
396397
log.error("Failed to restart adapter '{}'.", adapterId, throwable);

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ public boolean protocolAdapterFactoryExists(final @NotNull String protocolAdapte
277277

278278
public @NotNull CompletableFuture<Void> startAsync(final @NotNull String protocolAdapterId) {
279279
Preconditions.checkNotNull(protocolAdapterId);
280-
return getProtocolAdapterWrapperByAdapterId(protocolAdapterId).map(this::startAsync)
280+
return getProtocolAdapterWrapperByAdapterId(protocolAdapterId)
281+
.map(this::startAsync)
281282
.orElseGet(() -> CompletableFuture.failedFuture(new ProtocolAdapterException("Adapter '" +
282283
protocolAdapterId +
283284
"'not found.")));
@@ -379,21 +380,23 @@ private void deleteAdapterInternal(final @NotNull String adapterId) {
379380
Preconditions.checkNotNull(wrapper);
380381
final String wid = wrapper.getId();
381382
log.info("Starting protocol-adapter '{}'.", wid);
382-
return wrapper.startAsync(writingEnabled(), moduleServices).whenComplete((result, throwable) -> {
383-
if (throwable == null) {
384-
log.info("Protocol-adapter '{}' started successfully.", wid);
385-
fireEvent(wrapper,
386-
HiveMQEdgeRemoteEvent.EVENT_TYPE.ADAPTER_STARTED,
387-
Event.SEVERITY.INFO,
388-
"Adapter '" + wid + "' started OK.");
389-
} else {
390-
log.warn("Protocol-adapter '{}' could not be started, reason: {}", wid, "unknown");
391-
fireEvent(wrapper,
392-
HiveMQEdgeRemoteEvent.EVENT_TYPE.ADAPTER_ERROR,
393-
Event.SEVERITY.CRITICAL,
394-
"Error starting adapter '" + wid + "'.");
395-
}
396-
});
383+
return wrapper
384+
.startAsync(writingEnabled(), moduleServices)
385+
.whenComplete((result, throwable) -> {
386+
if (throwable == null) {
387+
log.info("Protocol-adapter '{}' started successfully.", wid);
388+
fireEvent(wrapper,
389+
HiveMQEdgeRemoteEvent.EVENT_TYPE.ADAPTER_STARTED,
390+
Event.SEVERITY.INFO,
391+
"Adapter '" + wid + "' started OK.");
392+
} else {
393+
log.warn("Protocol-adapter '{}' could not be started, reason: {}", wid, "unknown");
394+
fireEvent(wrapper,
395+
HiveMQEdgeRemoteEvent.EVENT_TYPE.ADAPTER_ERROR,
396+
Event.SEVERITY.CRITICAL,
397+
"Error starting adapter '" + wid + "'.");
398+
}
399+
});
397400
}
398401

399402
private void fireEvent(
@@ -413,23 +416,28 @@ private void fireEvent(
413416
Preconditions.checkNotNull(wrapper);
414417
log.info("Stopping protocol-adapter '{}'.", wrapper.getId());
415418

416-
return wrapper.stopAsync(destroy).whenComplete((result, throwable) -> {
417-
final Event.SEVERITY severity;
418-
final String message;
419-
final String wid = wrapper.getId();
420-
final String protocolId = wrapper.getProtocolAdapterInformation().getProtocolId();
421-
if (throwable == null) {
422-
log.info("Protocol-adapter '{}' stopped successfully.", wid);
423-
severity = Event.SEVERITY.INFO;
424-
message = "Adapter '" + wid + "' stopped OK.";
425-
} else {
426-
log.warn("Protocol-adapter '{}' was unable to stop cleanly", wrapper.getId());
427-
severity = Event.SEVERITY.CRITICAL;
428-
message = "Error stopping adapter '" + wid + "'.";
429-
}
430-
wrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
431-
eventService.createAdapterEvent(wid, protocolId).withSeverity(severity).withMessage(message).fire();
432-
});
419+
return wrapper
420+
.stopAsync(destroy)
421+
.thenApply(v -> {
422+
wrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
423+
return v;
424+
})
425+
.whenComplete((result, throwable) -> {
426+
final Event.SEVERITY severity;
427+
final String message;
428+
final String wid = wrapper.getId();
429+
final String protocolId = wrapper.getProtocolAdapterInformation().getProtocolId();
430+
if (throwable == null) {
431+
log.info("Protocol-adapter '{}' stopped successfully.", wid);
432+
severity = Event.SEVERITY.INFO;
433+
message = "Adapter '" + wid + "' stopped OK.";
434+
} else {
435+
log.warn("Protocol-adapter '{}' was unable to stop cleanly", wrapper.getId());
436+
severity = Event.SEVERITY.CRITICAL;
437+
message = "Error stopping adapter '" + wid + "'.";
438+
}
439+
eventService.createAdapterEvent(wid, protocolId).withSeverity(severity).withMessage(message).fire();
440+
});
433441
}
434442

435443
private void updateAdapter(final ProtocolAdapterConfig protocolAdapterConfig) {

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 69 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -121,46 +121,48 @@ public ProtocolAdapterWrapper(
121121
initStartAttempt();
122122
final var output = new ProtocolAdapterStartOutputImpl();
123123
final var input = new ProtocolAdapterStartInputImpl(moduleServices);
124-
final var startFuture = CompletableFuture.supplyAsync(() -> {
125-
try {
126-
adapter.start(input, output);
127-
} catch (final Throwable throwable) {
128-
output.getStartFuture().completeExceptionally(throwable);
129-
}
130-
return output.getStartFuture();
131-
}).thenCompose(Function.identity()).handle((ignored, error) -> {
132-
if (error != null) {
133-
log.error("Error starting adapter", error);
134-
stopAfterFailedStart();
135-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
136-
//we still return the initial error since that's the most significant information
137-
return CompletableFuture.failedFuture(error);
138-
} else {
139-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
140-
return attemptStartingConsumers(writingEnabled,
141-
moduleServices.eventService()).handle((success, startException) -> {
142-
if (startException == null) {
143-
if (success) {
144-
log.debug("Successfully started adapter with id {}", adapter.getId());
145-
} else {
146-
log.debug("Partially started adapter with id {}", adapter.getId());
147-
}
148-
} else {
149-
log.error("Failed to start adapter with id {}", adapter.getId(), startException);
124+
final var startFuture = CompletableFuture
125+
.supplyAsync(() -> {
126+
try {
127+
adapter.start(input, output);
128+
} catch (final Throwable throwable) {
129+
output.getStartFuture().completeExceptionally(throwable);
130+
}
131+
return output.getStartFuture();
132+
})
133+
.thenCompose(Function.identity()).handle((ignored, error) -> {
134+
if (error != null) {
135+
log.error("Error starting adapter", error);
150136
stopAfterFailedStart();
151-
//we still return the initial error since that's the most significant information
152137
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
153-
throw new RuntimeException("Failed to start adapter with id " + adapter.getId(),
154-
startException);
138+
//we still return the initial error since that's the most significant information
139+
return CompletableFuture.failedFuture(error);
140+
} else {
141+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED);
142+
return attemptStartingConsumers(writingEnabled,
143+
moduleServices.eventService()).handle((success, startException) -> {
144+
if (startException == null) {
145+
if (success) {
146+
log.debug("Successfully started adapter with id {}", adapter.getId());
147+
} else {
148+
log.debug("Partially started adapter with id {}", adapter.getId());
149+
}
150+
} else {
151+
log.error("Failed to start adapter with id {}", adapter.getId(), startException);
152+
stopAfterFailedStart();
153+
//we still return the initial error since that's the most significant information
154+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
155+
throw new RuntimeException("Failed to start adapter with id " + adapter.getId(),
156+
startException);
157+
}
158+
return success;
159+
});
155160
}
156-
return success;
161+
}).thenApply(ignored -> (Void) null).whenComplete((result, throwable) -> {
162+
//always clean up state
163+
startFutureRef.set(null);
164+
operationState.set(OperationState.IDLE);
157165
});
158-
}
159-
}).thenApply(ignored -> (Void) null).whenComplete((result, throwable) -> {
160-
//always clean up state
161-
startFutureRef.set(null);
162-
operationState.set(OperationState.IDLE);
163-
});
164166

165167
startFutureRef.set(startFuture);
166168
return startFuture;
@@ -302,33 +304,38 @@ private void cleanUpScheduler() {
302304

303305
final var stopFuture = new CompletableFuture<Void>();
304306
stopFutureRef.set(stopFuture);
305-
CompletableFuture.supplyAsync(() -> {
306-
stopPolling(protocolAdapterPollingService);
307-
stopWriting(protocolAdapterWritingService);
308-
try {
309-
adapter.stop(input, output);
310-
} catch (final Throwable throwable) {
311-
output.getOutputFuture().completeExceptionally(throwable);
312-
}
313-
return output.getOutputFuture();
314-
}).thenCompose(Function.identity())
315-
.whenComplete((result, throwable) -> {
316-
if (destroy) {
317-
log.info("Destroying adapter with id '{}'", getId());
318-
adapter.destroy();
319-
}
320-
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
321-
operationState.set(OperationState.IDLE);
322-
stopFutureRef.set(null);
323-
if (throwable == null) {
324-
log.info("Stopped adapter with id {}", adapter.getId());
325-
stopFuture.complete(null);
326-
} else {
327-
log.error("Error stopping adapter with id {}", adapter.getId(), throwable);
328-
stopFuture.completeExceptionally(throwable);
329-
}
330-
});
331307

308+
final var actualFuture = CompletableFuture
309+
.supplyAsync(() -> {
310+
stopPolling(protocolAdapterPollingService);
311+
stopWriting(protocolAdapterWritingService);
312+
try {
313+
adapter.stop(input, output);
314+
} catch (final Throwable throwable) {
315+
output.getOutputFuture().completeExceptionally(throwable);
316+
}
317+
return output.getOutputFuture();
318+
})
319+
.thenApply(v -> {
320+
stopFutureRef.set(null);
321+
protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
322+
operationState.set(OperationState.IDLE);
323+
if (destroy) {
324+
log.info("Destroying adapter with id '{}'", getId());
325+
adapter.destroy();
326+
}
327+
return v;
328+
})
329+
.thenCompose(Function.identity())
330+
.whenComplete((result, throwable) -> {
331+
if (throwable == null) {
332+
log.info("Stopped adapter with id {}", adapter.getId());
333+
} else {
334+
log.error("Error stopping adapter with id {}", adapter.getId(), throwable);
335+
}
336+
});
337+
338+
stopFuture.thenCompose(v -> actualFuture);
332339
return stopFuture;
333340
}
334341

0 commit comments

Comments
 (0)