|
20 | 20 | import java.io.IOException; |
21 | 21 | import java.nio.BufferOverflowException; |
22 | 22 | import java.time.Duration; |
23 | | -import java.util.concurrent.CancellationException; |
24 | | -import java.util.concurrent.CompletableFuture; |
25 | | -import java.util.concurrent.ExecutionException; |
26 | | -import java.util.concurrent.Future; |
| 23 | +import java.util.concurrent.*; |
27 | 24 | import java.util.concurrent.atomic.AtomicBoolean; |
28 | 25 | import java.util.concurrent.atomic.AtomicInteger; |
29 | 26 | import java.util.concurrent.atomic.AtomicReference; |
@@ -177,8 +174,7 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st |
177 | 174 |
|
178 | 175 | stats.incrementOut(size); |
179 | 176 | if (writeListener != null) { |
180 | | - NatsMessage finalMsg = msg; |
181 | | - writeListener.submit(() -> writeListener.buffered(finalMsg, mode.get().name())); |
| 177 | + writeListener.buffered(msg, mode.get().name()); |
182 | 178 | } |
183 | 179 |
|
184 | 180 | if (msg.flushImmediatelyAfterPublish) { |
@@ -206,7 +202,12 @@ public void run() { |
206 | 202 | try { |
207 | 203 | dataPort = this.dataPortFuture.get(); // Will wait for the future to complete |
208 | 204 | StatisticsCollector stats = connection.getStatisticsCollector(); |
209 | | - WriteListener writeListener = connection.getOptions().getWriteListener(); |
| 205 | + WriteListener writeListener = null; |
| 206 | + WriteListener userWriteListener = connection.getOptions().getWriteListener(); |
| 207 | + if (userWriteListener != null) { |
| 208 | + ExecutorService executor = connection.getOptions().getExecutor(); |
| 209 | + writeListener = (m, o) -> executor.submit(() -> userWriteListener.buffered(m, o)); |
| 210 | + } |
210 | 211 |
|
211 | 212 | while (running.get() && !Thread.interrupted()) { |
212 | 213 | NatsMessage msg; |
|
0 commit comments