diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 8a4196687..0aa0a0127 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -2234,6 +2234,24 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { return stpe; } + /** + * the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc + * @return the executor + */ + public ExecutorService getCallbackExecutor() { + return this.callbackThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory); + } + + /** + * the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc + * @return the executor + */ + public ExecutorService getConnectExecutor() { + return this.connectThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory); + } + /** * whether the general executor is the internal one versus a user supplied one * @return true if the executor is internal @@ -2251,21 +2269,19 @@ public boolean scheduledExecutorIsInternal() { } /** - * the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc - * @return the executor + * whether the callback executor is the internal one versus a user supplied one + * @return true if the executor is internal */ - public ExecutorService getCallbackExecutor() { - return this.callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory); + public boolean callbackExecutorIsInternal() { + return this.callbackThreadFactory == null; } /** - * the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc - * @return the executor + * whether the connect executor is the internal one versus a user supplied one + * @return true if the executor is internal */ - public ExecutorService getConnectExecutor() { - return this.connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory); + public boolean connectExecutorIsInternal() { + return this.connectThreadFactory == null; } /** diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 5edd1ab63..8dbe3bbd6 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -875,22 +875,26 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep statusLock.unlock(); } - // Stop the error handling and connect executors - callbackRunner.shutdown(); - try { - // At this point in the flow, the connection is shutting down. - // There is really no use in giving this information to the developer, - // It's fair to say that an exception here anyway will practically never happen - // and if it did, the app is probably already frozen. - //noinspection ResultOfMethodCallIgnored - callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - finally { - callbackRunner.shutdownNow(); + if (options.callbackExecutorIsInternal()) { + // Stop the error handling and connect executors + callbackRunner.shutdown(); + try { + // At this point in the flow, the connection is shutting down. + // There is really no use in giving this information to the developer, + // It's fair to say that an exception here anyway will practically never happen + // and if it did, the app is probably already frozen. + //noinspection ResultOfMethodCallIgnored + callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + finally { + callbackRunner.shutdownNow(); + } } - // There's no need to wait for running tasks since we're told to close - connectExecutor.shutdownNow(); + if (options.connectExecutorIsInternal()) { + // There's no need to wait for running tasks since we're told to close + connectExecutor.shutdownNow(); + } // The callbackRunner and connectExecutor always come from a factory, // so we always shut them down.