Skip to content

Commit dce9624

Browse files
committed
trying to stop the writer as soon as it's known that there is a communication issue
1 parent eeb1bc5 commit dce9624

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ void handleCommunicationIssue(Exception io) {
722722
statusLock.unlock();
723723
}
724724

725-
executor.submit(() -> { writer.stop(); });
725+
writer.preventWrite();
726726

727727
processException(io);
728728
if (currentServer != null) {

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ enum Mode {
4848
private Future<Boolean> stopped;
4949
private Future<DataPort> dataPortFuture;
5050
private DataPort dataPort;
51+
private final AtomicBoolean allowedToWrite;
5152
private final AtomicBoolean running;
5253
private final AtomicReference<Mode> mode;
5354
private final ReentrantLock startStopLock;
@@ -66,6 +67,7 @@ enum Mode {
6667
writerLock = new ReentrantLock();
6768

6869
this.running = new AtomicBoolean(false);
70+
this.allowedToWrite = new AtomicBoolean(false);
6971
if (sourceWriter == null) {
7072
mode = new AtomicReference<>(Mode.Normal);
7173
}
@@ -100,6 +102,7 @@ void start(Future<DataPort> dataPortFuture) {
100102
this.startStopLock.lock();
101103
try {
102104
this.dataPortFuture = dataPortFuture;
105+
this.allowedToWrite.set(true);
103106
this.running.set(true);
104107
this.normalOutgoing.resume();
105108
this.reconnectOutgoing.resume();
@@ -114,6 +117,7 @@ void start(Future<DataPort> dataPortFuture) {
114117
// method does.
115118
Future<Boolean> stop() {
116119
if (running.get()) {
120+
allowedToWrite.set(false);
117121
running.set(false);
118122
startStopLock.lock();
119123
try {
@@ -128,6 +132,10 @@ Future<Boolean> stop() {
128132
return this.stopped;
129133
}
130134

135+
void preventWrite() {
136+
allowedToWrite.set(false);
137+
}
138+
131139
boolean isRunning() {
132140
return running.get();
133141
}
@@ -219,7 +227,7 @@ public void run() {
219227
try {
220228
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
221229

222-
while (running.get() && !Thread.interrupted()) {
230+
while (allowedToWrite.get() && !Thread.interrupted()) {
223231
NatsMessage msg;
224232
if (mode.get() == Mode.Normal) {
225233
msg = this.normalOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout);
@@ -242,7 +250,8 @@ public void run() {
242250
// Exit
243251
Thread.currentThread().interrupt();
244252
} finally {
245-
this.running.set(false);
253+
allowedToWrite.set(false);
254+
running.set(false);
246255
}
247256
}
248257

0 commit comments

Comments
 (0)