Skip to content

Commit fbc0bc5

Browse files
authored
Merge pull request #1484 from nats-io/internal-aware
Internal Options Executor Awareness
2 parents f818418 + 96ab782 commit fbc0bc5

File tree

2 files changed

+44
-24
lines changed

2 files changed

+44
-24
lines changed

src/main/java/io/nats/client/Options.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2234,6 +2234,24 @@ private ScheduledExecutorService _getInternalScheduledExecutor() {
22342234
return stpe;
22352235
}
22362236

2237+
/**
2238+
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
2239+
* @return the executor
2240+
*/
2241+
public ExecutorService getCallbackExecutor() {
2242+
return this.callbackThreadFactory == null ?
2243+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
2244+
}
2245+
2246+
/**
2247+
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
2248+
* @return the executor
2249+
*/
2250+
public ExecutorService getConnectExecutor() {
2251+
return this.connectThreadFactory == null ?
2252+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
2253+
}
2254+
22372255
/**
22382256
* whether the general executor is the internal one versus a user supplied one
22392257
* @return true if the executor is internal
@@ -2251,21 +2269,19 @@ public boolean scheduledExecutorIsInternal() {
22512269
}
22522270

22532271
/**
2254-
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
2255-
* @return the executor
2272+
* whether the callback executor is the internal one versus a user supplied one
2273+
* @return true if the executor is internal
22562274
*/
2257-
public ExecutorService getCallbackExecutor() {
2258-
return this.callbackThreadFactory == null ?
2259-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
2275+
public boolean callbackExecutorIsInternal() {
2276+
return this.callbackThreadFactory == null;
22602277
}
22612278

22622279
/**
2263-
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
2264-
* @return the executor
2280+
* whether the connect executor is the internal one versus a user supplied one
2281+
* @return true if the executor is internal
22652282
*/
2266-
public ExecutorService getConnectExecutor() {
2267-
return this.connectThreadFactory == null ?
2268-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
2283+
public boolean connectExecutorIsInternal() {
2284+
return this.connectThreadFactory == null;
22692285
}
22702286

22712287
/**

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -875,22 +875,26 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep
875875
statusLock.unlock();
876876
}
877877

878-
// Stop the error handling and connect executors
879-
callbackRunner.shutdown();
880-
try {
881-
// At this point in the flow, the connection is shutting down.
882-
// There is really no use in giving this information to the developer,
883-
// It's fair to say that an exception here anyway will practically never happen
884-
// and if it did, the app is probably already frozen.
885-
//noinspection ResultOfMethodCallIgnored
886-
callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
887-
}
888-
finally {
889-
callbackRunner.shutdownNow();
878+
if (options.callbackExecutorIsInternal()) {
879+
// Stop the error handling and connect executors
880+
callbackRunner.shutdown();
881+
try {
882+
// At this point in the flow, the connection is shutting down.
883+
// There is really no use in giving this information to the developer,
884+
// It's fair to say that an exception here anyway will practically never happen
885+
// and if it did, the app is probably already frozen.
886+
//noinspection ResultOfMethodCallIgnored
887+
callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
888+
}
889+
finally {
890+
callbackRunner.shutdownNow();
891+
}
890892
}
891893

892-
// There's no need to wait for running tasks since we're told to close
893-
connectExecutor.shutdownNow();
894+
if (options.connectExecutorIsInternal()) {
895+
// There's no need to wait for running tasks since we're told to close
896+
connectExecutor.shutdownNow();
897+
}
894898

895899
// The callbackRunner and connectExecutor always come from a factory,
896900
// so we always shut them down.

0 commit comments

Comments
 (0)