Skip to content

Commit 96ab782

Browse files
committed
Internal Options Executor Awareness
Be aware of all executors being internal to know whether to shut them down or not on connection close
1 parent f818418 commit 96ab782

File tree

2 files changed

+44
-24
lines changed

2 files changed

+44
-24
lines changed

src/main/java/io/nats/client/Options.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2234,6 +2234,24 @@ private ScheduledExecutorService _getInternalScheduledExecutor() {
22342234
return stpe;
22352235
}
22362236

2237+
/**
2238+
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
2239+
* @return the executor
2240+
*/
2241+
public ExecutorService getCallbackExecutor() {
2242+
return this.callbackThreadFactory == null ?
2243+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
2244+
}
2245+
2246+
/**
2247+
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
2248+
* @return the executor
2249+
*/
2250+
public ExecutorService getConnectExecutor() {
2251+
return this.connectThreadFactory == null ?
2252+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
2253+
}
2254+
22372255
/**
22382256
* whether the general executor is the internal one versus a user supplied one
22392257
* @return true if the executor is internal
@@ -2251,21 +2269,19 @@ public boolean scheduledExecutorIsInternal() {
22512269
}
22522270

22532271
/**
2254-
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
2255-
* @return the executor
2272+
* whether the callback executor is the internal one versus a user supplied one
2273+
* @return true if the executor is internal
22562274
*/
2257-
public ExecutorService getCallbackExecutor() {
2258-
return this.callbackThreadFactory == null ?
2259-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
2275+
public boolean callbackExecutorIsInternal() {
2276+
return this.callbackThreadFactory == null;
22602277
}
22612278

22622279
/**
2263-
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
2264-
* @return the executor
2280+
* whether the connect executor is the internal one versus a user supplied one
2281+
* @return true if the executor is internal
22652282
*/
2266-
public ExecutorService getConnectExecutor() {
2267-
return this.connectThreadFactory == null ?
2268-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
2283+
public boolean connectExecutorIsInternal() {
2284+
return this.connectThreadFactory == null;
22692285
}
22702286

22712287
/**

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -875,22 +875,26 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep
875875
statusLock.unlock();
876876
}
877877

878-
// Stop the error handling and connect executors
879-
callbackRunner.shutdown();
880-
try {
881-
// At this point in the flow, the connection is shutting down.
882-
// There is really no use in giving this information to the developer,
883-
// It's fair to say that an exception here anyway will practically never happen
884-
// and if it did, the app is probably already frozen.
885-
//noinspection ResultOfMethodCallIgnored
886-
callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
887-
}
888-
finally {
889-
callbackRunner.shutdownNow();
878+
if (options.callbackExecutorIsInternal()) {
879+
// Stop the error handling and connect executors
880+
callbackRunner.shutdown();
881+
try {
882+
// At this point in the flow, the connection is shutting down.
883+
// There is really no use in giving this information to the developer,
884+
// It's fair to say that an exception here anyway will practically never happen
885+
// and if it did, the app is probably already frozen.
886+
//noinspection ResultOfMethodCallIgnored
887+
callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
888+
}
889+
finally {
890+
callbackRunner.shutdownNow();
891+
}
890892
}
891893

892-
// There's no need to wait for running tasks since we're told to close
893-
connectExecutor.shutdownNow();
894+
if (options.connectExecutorIsInternal()) {
895+
// There's no need to wait for running tasks since we're told to close
896+
connectExecutor.shutdownNow();
897+
}
894898

895899
// The callbackRunner and connectExecutor always come from a factory,
896900
// so we always shut them down.

0 commit comments

Comments
 (0)