Skip to content

Commit feaf53c

Browse files
authored
Merge pull request #1473 from nats-io/enable-test
Fix and Enable ReconnectTests testForceReconnectQueueBehaviorCheck
2 parents a8e8aaf + 8ff1c37 commit feaf53c

File tree

5 files changed

+50
-25
lines changed

5 files changed

+50
-25
lines changed

src/main/java/io/nats/client/impl/ProtocolMessage.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,15 @@ class ProtocolMessage extends NatsPublishableMessage {
2929
}
3030

3131
ProtocolMessage(byte[] protocol) {
32-
super(false);
33-
protocolBab = new ByteArrayBuilder(protocol);
34-
sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data
35-
this.filterOnStop = true;
32+
this(new ByteArrayBuilder(protocol), true);
33+
}
34+
35+
ProtocolMessage(byte[] protocol, boolean filterOnStop) {
36+
this(new ByteArrayBuilder(protocol), filterOnStop);
3637
}
3738

3839
ProtocolMessage(ProtocolMessage pm) {
39-
super(false);
40-
protocolBab = pm.protocolBab;
41-
sizeInBytes = controlLineLength = pm.sizeInBytes;
42-
filterOnStop = pm.filterOnStop;
40+
this(pm.protocolBab, pm.filterOnStop);
4341
}
4442

4543
@Override

src/test/java/io/nats/client/impl/ForceReconnectQueueCheckDataPort.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,35 @@
1414
package io.nats.client.impl;
1515

1616
import java.io.IOException;
17+
import java.nio.charset.StandardCharsets;
1718

1819
public class ForceReconnectQueueCheckDataPort extends SocketDataPort {
19-
public static String WRITE_CHECK;
20+
private static byte[] WRITE_CHECK;
21+
private static int WC_LEN;
2022
public static long DELAY;
2123

24+
public static void setCheck(String check) {
25+
WRITE_CHECK = check.getBytes(StandardCharsets.ISO_8859_1);
26+
WC_LEN = check.length();
27+
}
28+
2229
@Override
2330
public void write(byte[] src, int toWrite) throws IOException {
24-
String s = new String(src, 0, Math.min(7, toWrite));
25-
if (s.startsWith(WRITE_CHECK)) {
26-
try {
27-
Thread.sleep(DELAY);
31+
if (src.length >= WC_LEN) {
32+
boolean check = true;
33+
for (int x = 0; x < WC_LEN; x++) {
34+
if (src[x] != WRITE_CHECK[x]) {
35+
check = false;
36+
break;
37+
}
2838
}
29-
catch (InterruptedException e) {
30-
throw new RuntimeException(e);
39+
if (check) {
40+
try {
41+
Thread.sleep(DELAY);
42+
}
43+
catch (InterruptedException e) {
44+
throw new RuntimeException(e);
45+
}
3146
}
3247
}
3348
super.write(src, toWrite);

src/test/java/io/nats/client/impl/ListenerForTesting.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
package io.nats.client.impl;
1515

1616
import io.nats.client.*;
17+
import io.nats.client.support.DateTimeUtils;
1718
import io.nats.client.support.Status;
1819

20+
import java.time.format.DateTimeFormatter;
1921
import java.util.ArrayList;
2022
import java.util.HashMap;
2123
import java.util.List;
@@ -291,8 +293,14 @@ else if (consumer instanceof NatsDispatcher) {
291293
}
292294
}
293295

296+
public static final DateTimeFormatter SIMPLE_TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
297+
298+
public static String simpleTime() {
299+
return SIMPLE_TIME_FORMATTER.format(DateTimeUtils.gmtNow());
300+
}
301+
294302
private void report(String func, Object message) {
295-
System.out.println("[" + System.currentTimeMillis() + " ListenerForTesting." + func + "] " + message);
303+
System.out.println("[" + simpleTime() + " ListenerForTesting." + func + "] " + message);
296304
}
297305

298306
private final ReentrantLock listLock = new ReentrantLock();
@@ -597,4 +605,4 @@ public String toString() {
597605
private static String extractSid(JetStreamSubscription sub) {
598606
return ((NatsJetStreamSubscription)sub).getSID();
599607
}
600-
}
608+
}

src/test/java/io/nats/client/impl/ReconnectTests.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.nats.client.*;
1717
import io.nats.client.ConnectionListener.Events;
1818
import io.nats.client.api.ServerInfo;
19-
import org.junit.jupiter.api.Disabled;
2019
import org.junit.jupiter.api.Test;
2120
import org.junit.jupiter.api.parallel.Isolated;
2221

@@ -806,7 +805,6 @@ public boolean includeAllServers() {
806805
}
807806

808807
@Test
809-
@Disabled("TODO FIGURE THIS OUT")
810808
public void testForceReconnectQueueBehaviorCheck() throws Exception {
811809
runInJsCluster((nc0, nc1, nc2) -> {
812810
if (atLeast2_9_0(nc0)) {
@@ -818,19 +816,19 @@ public void testForceReconnectQueueBehaviorCheck() throws Exception {
818816
ForceReconnectQueueCheckDataPort.DELAY = 75;
819817

820818
String subject = subject();
821-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
819+
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
822820
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, 0);
823821

824822
subject = subject();
825-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
823+
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
826824
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, flushWait);
827825

828826
subject = subject();
829-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
827+
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
830828
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, 0);
831829

832830
subject = subject();
833-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
831+
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
834832
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, flushWait);
835833
}
836834
});
@@ -972,7 +970,7 @@ public void testSocketDataPortTimeout() throws Exception {
972970
try {
973971
nc.publish(subject, ("" + pubId.incrementAndGet()).getBytes());
974972
if (pubId.get() == 10) {
975-
SocketDataPortBlockSimulator.SIMULATE_SOCKET_BLOCK.set(60000);
973+
SocketDataPortBlockSimulator.simulateBlock();
976974
}
977975
}
978976
catch (Exception e) {

src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,12 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti
7474
});
7575
}
7676

77-
public static AtomicLong SIMULATE_SOCKET_BLOCK = new AtomicLong();
77+
private static final AtomicLong SIMULATE_SOCKET_BLOCK = new AtomicLong();
78+
79+
public static void simulateBlock() {
80+
SIMULATE_SOCKET_BLOCK.set(60000);
81+
}
82+
7883
AtomicLong blocking = new AtomicLong();
7984
public void write(byte[] src, int toWrite) throws IOException {
8085
try {
@@ -87,6 +92,7 @@ public void write(byte[] src, int toWrite) throws IOException {
8792
blocking.addAndGet(-100);
8893
}
8994
catch (InterruptedException ignore) {
95+
Thread.currentThread().interrupt();
9096
return;
9197
}
9298
}

0 commit comments

Comments
 (0)