Skip to content

Commit e054aa6

Browse files
aepfliCopilot
andcommitted
refactor(flagd): await scheduler termination before channel shutdown
Add retryScheduler.awaitTermination(deadline, MILLISECONDS) after shutdownNow() to ensure the scheduler thread has fully stopped before the gRPC channel is torn down, preventing race conditions during shutdown. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
1 parent 0aede81 commit e054aa6

File tree

4 files changed

+39
-3
lines changed

4 files changed

+39
-3
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider;
66

77
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
8+
import dev.openfeature.contrib.tools.flagd.api.Evaluator;
89
import dev.openfeature.sdk.EvaluationContext;
910
import dev.openfeature.sdk.ImmutableContext;
1011
import dev.openfeature.sdk.Structure;
@@ -203,6 +204,13 @@ public class FlagdOptions {
203204
*/
204205
private QueueSource customConnector;
205206

207+
/**
208+
* Inject a custom {@link Evaluator} for in-process flag evaluation.
209+
* If not set, {@link dev.openfeature.contrib.tools.flagd.core.FlagdCore} is used as the default.
210+
* A {@link java.util.ServiceLoader} discovery is also attempted before falling back to the default.
211+
*/
212+
private Evaluator customEvaluator;
213+
206214
/**
207215
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate
208216
* distributed tracing for flagd grpc

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import dev.openfeature.sdk.Structure;
1919
import dev.openfeature.sdk.Value;
2020
import dev.openfeature.sdk.internal.TriConsumer;
21+
import java.util.ServiceLoader;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicReference;
2324
import lombok.extern.slf4j.Slf4j;
@@ -50,9 +51,9 @@ public class InProcessResolver implements Resolver {
5051
public InProcessResolver(
5152
FlagdOptions options, TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onConnectionEvent) {
5253
this.queueSource = getQueueSource(options);
53-
Evaluator flagdCore = new FlagdCore();
54-
this.evaluator = flagdCore;
55-
this.flagStore = new FlagStore(queueSource, flagdCore);
54+
Evaluator evaluator = getEvaluator(options);
55+
this.evaluator = evaluator;
56+
this.flagStore = new FlagStore(queueSource, evaluator);
5657
this.onConnectionEvent = onConnectionEvent;
5758
}
5859

@@ -185,4 +186,14 @@ static QueueSource getQueueSource(final FlagdOptions options) {
185186
? new FileQueueSource(options.getOfflineFlagSourcePath(), options.getOfflinePollIntervalMs())
186187
: new SyncStreamQueueSource(options);
187188
}
189+
190+
static Evaluator getEvaluator(final FlagdOptions options) {
191+
if (options.getCustomEvaluator() != null) {
192+
return options.getCustomEvaluator();
193+
}
194+
return ServiceLoader.load(Evaluator.class).findFirst().orElseGet(() -> {
195+
log.warn("No Evaluator found via ServiceLoader, falling back to FlagdCore");
196+
return new FlagdCore();
197+
});
198+
}
188199
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public void shutdown() throws InterruptedException {
174174
}
175175

176176
retryScheduler.shutdownNow();
177+
retryScheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS);
177178
grpcComponents.channelConnector.shutdown();
178179
}
179180

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,22 @@ public void connectorSetup() {
113113
assertInstanceOf(MockConnector.class, InProcessResolver.getQueueSource(forCustomConnectorOptions));
114114
}
115115

116+
@Test
117+
public void evaluatorSetup() {
118+
// customEvaluator takes priority over ServiceLoader
119+
MockEvaluator customEvaluator = new MockEvaluator(null);
120+
FlagdOptions withCustomEvaluator = FlagdOptions.builder()
121+
.customEvaluator(customEvaluator)
122+
.build();
123+
assertThat(InProcessResolver.getEvaluator(withCustomEvaluator)).isSameAs(customEvaluator);
124+
125+
// no customEvaluator — ServiceLoader finds FlagdCore (registered as default SPI)
126+
FlagdOptions withoutCustomEvaluator = FlagdOptions.builder().build();
127+
assertInstanceOf(
128+
dev.openfeature.contrib.tools.flagd.core.FlagdCore.class,
129+
InProcessResolver.getEvaluator(withoutCustomEvaluator));
130+
}
131+
116132
@Test
117133
void eventHandling() throws Throwable {
118134
// given

0 commit comments

Comments
 (0)