Skip to content

Commit 99c8964

Browse files
authored
Reworked ProtocolAdapterManager/Wrapper so they are the start of async operations. Removed useless CompleteableFutures (#1066)
* Reworked ProtocolAdapterManager/Wrapper so they are the start of async operations. Removed useless CompleteableFutures
1 parent e621d22 commit 99c8964

File tree

11 files changed

+539
-392
lines changed

11 files changed

+539
-392
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ public ProtocolAdaptersResourceImpl(
210210

211211
@Override
212212
public @NotNull Response getAdapter(final @NotNull String adapterId) {
213-
final Optional<ProtocolAdapterWrapper> instance = protocolAdapterManager.getProtocolAdapterWrapperByAdapterId(adapterId);
213+
final Optional<ProtocolAdapterWrapper> instance = protocolAdapterManager
214+
.getProtocolAdapterWrapperByAdapterId(adapterId);
214215
if (instance.isEmpty()) {
215216
return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'",
216217
adapterId)));
@@ -239,7 +240,8 @@ public ProtocolAdaptersResourceImpl(
239240
public @NotNull Response discoverDataPoints(
240241
final @NotNull String adapterId, final @Nullable String rootNode, final @Nullable Integer depth) {
241242

242-
final Optional<ProtocolAdapterWrapper> instance = protocolAdapterManager.getProtocolAdapterWrapperByAdapterId(adapterId);
243+
final Optional<ProtocolAdapterWrapper> instance = protocolAdapterManager
244+
.getProtocolAdapterWrapperByAdapterId(adapterId);
243245
if (instance.isEmpty()) {
244246
return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'",
245247
adapterId)));
@@ -386,22 +388,22 @@ public int getDepth() {
386388
return ErrorResponseUtil.errorResponse(new AdapterFailedSchemaValidationError(errorMessages.toErrorList()));
387389
} else {
388390
switch (command.getCommand()) {
389-
case START -> protocolAdapterManager.start(adapterId).whenComplete((result, throwable) -> {
391+
case START -> protocolAdapterManager.startAsync(adapterId).whenComplete((result, throwable) -> {
390392
if (throwable != null) {
391393
log.error("Failed to start adapter '{}'.", adapterId, throwable);
392394
} else {
393395
log.trace("Adapter '{}' was started successfully.", adapterId);
394396
}
395397
});
396-
case STOP -> protocolAdapterManager.stop(adapterId, false).whenComplete((result, throwable) -> {
398+
case STOP -> protocolAdapterManager.stopAsync(adapterId, false).whenComplete((result, throwable) -> {
397399
if (throwable != null) {
398400
log.error("Failed to stop adapter '{}'.", adapterId, throwable);
399401
} else {
400402
log.trace("Adapter '{}' was stopped successfully.", adapterId);
401403
}
402404
});
403-
case RESTART -> protocolAdapterManager.stop(adapterId, false)
404-
.thenRun(() -> protocolAdapterManager.start(adapterId))
405+
case RESTART -> protocolAdapterManager.stopAsync(adapterId, false)
406+
.thenRun(() -> protocolAdapterManager.startAsync(adapterId))
405407
.whenComplete((result, throwable) -> {
406408
if (throwable != null) {
407409
log.error("Failed to restart adapter '{}'.", adapterId, throwable);
@@ -439,7 +441,8 @@ public int getDepth() {
439441

440442
protected @NotNull Status getStatusInternal(final @NotNull String adapterId) {
441443
final Optional<ProtocolAdapterWrapper> optionalAdapterInstance =
442-
protocolAdapterManager.getProtocolAdapterWrapperByAdapterId(adapterId);
444+
protocolAdapterManager
445+
.getProtocolAdapterWrapperByAdapterId(adapterId);
443446
return optionalAdapterInstance.map(AdapterStatusModelConversionUtils::getAdapterStatus)
444447
.orElseGet(() -> unknown(Status.RuntimeEnum.STOPPED, ApiConstants.ADAPTER_TYPE, adapterId));
445448
}
@@ -722,7 +725,8 @@ protected void validateAdapterSchema(
722725
final String decodedTagName = URLDecoder.decode(tagName, StandardCharsets.UTF_8);
723726

724727
final Optional<ProtocolAdapterWrapper> optionalProtocolAdapterWrapper =
725-
protocolAdapterManager.getProtocolAdapterWrapperByAdapterId(adapterId);
728+
protocolAdapterManager
729+
.getProtocolAdapterWrapperByAdapterId(adapterId);
726730
if (optionalProtocolAdapterWrapper.isEmpty()) {
727731
log.warn("The Json Schema for an adapter '{}' was requested, but the adapter does not exist.", adapterId);
728732
return ErrorResponseUtil.errorResponse(new AdapterNotFoundError(String.format("Adapter not found '%s'",

hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,17 @@
1919
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService;
2020
import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter;
2121
import com.hivemq.bootstrap.services.EdgeCoreFactoryService;
22-
import com.hivemq.persistence.topicfilter.TopicFilterPersistence;
23-
import org.jetbrains.annotations.NotNull;
2422
import com.hivemq.mqtt.topic.tree.LocalTopicTree;
2523
import com.hivemq.persistence.SingleWriterService;
2624
import com.hivemq.protocols.InternalProtocolAdapterWritingService;
2725
import com.hivemq.protocols.InternalWritingContext;
26+
import jakarta.inject.Inject;
27+
import jakarta.inject.Singleton;
28+
import org.jetbrains.annotations.NotNull;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

31-
import jakarta.inject.Inject;
32-
import jakarta.inject.Singleton;
3332
import java.util.List;
34-
import java.util.concurrent.CompletableFuture;
3533

3634
@Singleton
3735
public class WritingServiceProvider {
@@ -40,20 +38,17 @@ public class WritingServiceProvider {
4038
private final @NotNull ObjectMapper objectMapper;
4139
private final @NotNull LocalTopicTree localTopicTree;
4240
private final @NotNull SingleWriterService singleWriterService;
43-
private final @NotNull TopicFilterPersistence topicFilterPersistence;
4441

4542
@Inject
4643
public WritingServiceProvider(
4744
final @NotNull EdgeCoreFactoryService edgeCoreFactoryService,
4845
final @NotNull ObjectMapper objectMapper,
4946
final @NotNull LocalTopicTree localTopicTree,
50-
final @NotNull SingleWriterService singleWriterService,
51-
final @NotNull TopicFilterPersistence topicFilterPersistence) {
47+
final @NotNull SingleWriterService singleWriterService) {
5248
this.edgeCoreFactoryService = edgeCoreFactoryService;
5349
this.objectMapper = objectMapper;
5450
this.localTopicTree = localTopicTree;
5551
this.singleWriterService = singleWriterService;
56-
this.topicFilterPersistence = topicFilterPersistence;
5752
}
5853

5954
public @NotNull InternalProtocolAdapterWritingService get() {
@@ -83,19 +78,18 @@ public boolean writingEnabled() {
8378

8479

8580
@Override
86-
public @NotNull CompletableFuture<Void> startWriting(
81+
public boolean startWriting(
8782
final @NotNull WritingProtocolAdapter writingProtocolAdapter,
8883
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
8984
final @NotNull List<InternalWritingContext> southboundMappings) {
9085
log.warn("No bidirectional module is currently installed. Writing to PLCs is currently not supported.");
91-
return CompletableFuture.completedFuture(null); }
86+
return true; }
9287

9388
@Override
94-
public @NotNull CompletableFuture<Void> stopWriting(
89+
public void stopWriting(
9590
final @NotNull WritingProtocolAdapter writingProtocolAdapter,
9691
final @NotNull List<InternalWritingContext> writingContexts) {
9792
// NOOP as nothing was started.
98-
return CompletableFuture.completedFuture(null);
9993
}
10094
}
10195
}

hivemq-edge/src/main/java/com/hivemq/configuration/reader/BridgeExtractor.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,6 @@ public synchronized void removeBridge(final @NotNull String id) {
9797
configFileReaderWriter.writeConfigWithSync();
9898
}
9999

100-
private void notifyConsumer() {
101-
final var consumer = bridgeEntitiesConsumer;
102-
if(consumer != null) {
103-
consumer.accept(bridgeEntities);
104-
}
105-
}
106-
107-
108-
@Override
109-
public boolean needsRestartWithConfig(final HiveMQConfigEntity config) {
110-
return false;
111-
}
112-
113100
@Override
114101
public synchronized Configurator.ConfigResult updateConfig(final HiveMQConfigEntity config) {
115102
final var bridgeEntities = convertBridgeConfigs(config);
@@ -129,6 +116,11 @@ public synchronized Configurator.ConfigResult updateConfig(final HiveMQConfigEnt
129116
return Configurator.ConfigResult.SUCCESS;
130117
}
131118

119+
@Override
120+
public boolean needsRestartWithConfig(final HiveMQConfigEntity config) {
121+
return false;
122+
}
123+
132124
@Override
133125
public void registerConsumer(final Consumer<List<@NotNull MqttBridge>> consumer) {
134126
this.bridgeEntitiesConsumer = consumer;
@@ -502,4 +494,11 @@ protected RemoteBrokerEntity unconvertBrokerEntity(final MqttBridge from) {
502494

503495
return remoteBrokerEntity;
504496
}
497+
498+
private void notifyConsumer() {
499+
final var consumer = bridgeEntitiesConsumer;
500+
if(consumer != null) {
501+
consumer.accept(bridgeEntities);
502+
}
503+
}
505504
}

hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,12 @@
2727
public interface InternalProtocolAdapterWritingService extends ProtocolAdapterWritingService {
2828

2929

30-
@NotNull
31-
CompletableFuture<Void> startWriting(
30+
boolean startWriting(
3231
@NotNull WritingProtocolAdapter writingProtocolAdapter,
3332
@NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
3433
@NotNull List<InternalWritingContext> writingContexts);
3534

36-
@NotNull
37-
CompletableFuture<Void> stopWriting(@NotNull WritingProtocolAdapter writingProtocolAdapter, final @NotNull List<InternalWritingContext> writingContexts);
35+
void stopWriting(@NotNull WritingProtocolAdapter writingProtocolAdapter, final @NotNull List<InternalWritingContext> writingContexts);
3836

3937
void addWritingChangedCallback(@NotNull WritingChangedCallback callback);
4038

0 commit comments

Comments
 (0)