Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,24 @@ private ScheduledExecutorService _getInternalScheduledExecutor() {
return stpe;
}

/**
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
* @return the executor
*/
public ExecutorService getCallbackExecutor() {
return this.callbackThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
}

/**
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
* @return the executor
*/
public ExecutorService getConnectExecutor() {
return this.connectThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
}

/**
* whether the general executor is the internal one versus a user supplied one
* @return true if the executor is internal
Expand All @@ -2251,21 +2269,19 @@ public boolean scheduledExecutorIsInternal() {
}

/**
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
* @return the executor
* whether the callback executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public ExecutorService getCallbackExecutor() {
return this.callbackThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
public boolean callbackExecutorIsInternal() {
return this.callbackThreadFactory == null;
}

/**
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
* @return the executor
* whether the connect executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public ExecutorService getConnectExecutor() {
return this.connectThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
public boolean connectExecutorIsInternal() {
return this.connectThreadFactory == null;
}

/**
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -875,22 +875,26 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep
statusLock.unlock();
}

// Stop the error handling and connect executors
callbackRunner.shutdown();
try {
// At this point in the flow, the connection is shutting down.
// There is really no use in giving this information to the developer,
// It's fair to say that an exception here anyway will practically never happen
// and if it did, the app is probably already frozen.
//noinspection ResultOfMethodCallIgnored
callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
finally {
callbackRunner.shutdownNow();
if (options.callbackExecutorIsInternal()) {
// Stop the error handling and connect executors
callbackRunner.shutdown();
try {
// At this point in the flow, the connection is shutting down.
// There is really no use in giving this information to the developer,
// It's fair to say that an exception here anyway will practically never happen
// and if it did, the app is probably already frozen.
//noinspection ResultOfMethodCallIgnored
callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
finally {
callbackRunner.shutdownNow();
}
}

// There's no need to wait for running tasks since we're told to close
connectExecutor.shutdownNow();
if (options.connectExecutorIsInternal()) {
// There's no need to wait for running tasks since we're told to close
connectExecutor.shutdownNow();
}

// The callbackRunner and connectExecutor always come from a factory,
// so we always shut them down.
Expand Down
Loading