Skip to content

Commit f818418

Browse files
authored
Merge pull request #1483 from nats-io/dispatcher-unsub-subject
Properly unsubscribe from dispatcher subject
2 parents 9e57748 + cad858d commit f818418

File tree

3 files changed

+55
-38
lines changed

3 files changed

+55
-38
lines changed

src/main/java/io/nats/client/impl/NatsDispatcher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,11 @@ public Dispatcher unsubscribe(String subject, int after) {
338338
connection.unsubscribe(defaultHandlerSub, after);
339339
}
340340

341-
subWithNonDefaultHandlerBySid.forEach((sid, sub) -> connection.unsubscribe(sub, after));
341+
subWithNonDefaultHandlerBySid.forEach((sid, sub) -> {
342+
if (subject.equals(sub.getSubject())) {
343+
connection.unsubscribe(sub, after);
344+
}
345+
});
342346

343347
return this;
344348
}

src/test/java/io/nats/client/SubscriberTests.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
import org.junit.jupiter.api.Test;
1717

1818
import java.time.Duration;
19-
import java.util.ArrayList;
20-
import java.util.Collections;
2119
import java.util.HashSet;
22-
import java.util.List;
2320
import java.util.concurrent.CompletableFuture;
2421
import java.util.concurrent.TimeoutException;
2522

@@ -491,38 +488,6 @@ public void testInvalidSubjectsAndQueueNames() throws Exception {
491488
}
492489
}
493490

494-
@Test
495-
public void testDispatcherMultipleSubscriptionsBySubject() throws Exception {
496-
try (NatsTestServer ts = new NatsTestServer(false);
497-
Connection nc = Nats.connect(ts.getURI())) {
498-
standardConnectionWait(nc);
499-
String subject = subject();
500-
501-
List<Integer> dflt = Collections.synchronizedList(new ArrayList<>());
502-
List<Integer> nd1 = Collections.synchronizedList(new ArrayList<>());
503-
List<Integer> nd2 = Collections.synchronizedList(new ArrayList<>());
504-
Dispatcher d = nc.createDispatcher(m -> dflt.add(getDataId(m)));
505-
d.subscribe(subject);
506-
d.subscribe(subject, m -> nd1.add(getDataId(m)));
507-
d.subscribe(subject, m -> nd2.add(getDataId(m)));
508-
509-
nc.publish(subject, "1".getBytes());
510-
Thread.sleep(1000);
511-
d.unsubscribe(subject);
512-
nc.publish(subject, "2".getBytes());
513-
Thread.sleep(1000);
514-
515-
assertTrue(dflt.contains(1));
516-
assertTrue(nd1.contains(1));
517-
assertTrue(nd2.contains(1));
518-
519-
assertFalse(dflt.contains(2));
520-
assertFalse(nd1.contains(2));
521-
assertFalse(nd2.contains(2));
522-
523-
}
524-
}
525-
526491
private static int getDataId(Message m) {
527492
return Integer.parseInt(new String(m.getData()));
528493
}

src/test/java/io/nats/client/impl/DispatcherTests.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
import java.io.IOException;
2020
import java.time.Duration;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
2124
import java.util.concurrent.*;
2225
import java.util.concurrent.atomic.AtomicInteger;
2326
import java.util.concurrent.atomic.AtomicReference;
2427

25-
import static io.nats.client.utils.TestBase.sleep;
26-
import static io.nats.client.utils.TestBase.waitUntilStatus;
28+
import static io.nats.client.utils.TestBase.*;
2729
import static org.junit.jupiter.api.Assertions.*;
2830

2931

@@ -33,6 +35,52 @@
3335
// the done message (or should) - wanted to note that somewhere
3436

3537
public class DispatcherTests {
38+
@Test
39+
public void testDispatcherMultipleSubscriptionsBySubject() throws Exception {
40+
try (NatsTestServer ts = new NatsTestServer(false);
41+
Connection nc = Nats.connect(ts.getURI())) {
42+
standardConnectionWait(nc);
43+
String subject1 = subject();
44+
String subject2 = subject();
45+
46+
List<Integer> dflt = Collections.synchronizedList(new ArrayList<>());
47+
List<Integer> sub21 = Collections.synchronizedList(new ArrayList<>());
48+
List<Integer> sub22 = Collections.synchronizedList(new ArrayList<>());
49+
List<Integer> sub31 = Collections.synchronizedList(new ArrayList<>());
50+
List<Integer> sub32 = Collections.synchronizedList(new ArrayList<>());
51+
Dispatcher d1 = nc.createDispatcher(m -> dflt.add(getDataId(m)));
52+
d1.subscribe(subject1);
53+
d1.subscribe(subject1, m -> sub21.add(getDataId(m)));
54+
d1.subscribe(subject1, m -> sub22.add(getDataId(m)));
55+
d1.subscribe(subject2, m -> sub31.add(getDataId(m)));
56+
d1.subscribe(subject2, m -> sub32.add(getDataId(m)));
57+
58+
nc.publish(subject1, "1".getBytes());
59+
nc.publish(subject2, "1".getBytes());
60+
Thread.sleep(1000);
61+
d1.unsubscribe(subject1);
62+
nc.publish(subject1, "2".getBytes());
63+
nc.publish(subject2, "2".getBytes());
64+
Thread.sleep(1000);
65+
66+
assertTrue(dflt.contains(1));
67+
assertTrue(sub21.contains(1));
68+
assertTrue(sub22.contains(1));
69+
assertTrue(sub31.contains(1));
70+
assertTrue(sub32.contains(1));
71+
72+
assertFalse(dflt.contains(2));
73+
assertFalse(sub21.contains(2));
74+
assertFalse(sub22.contains(2));
75+
assertTrue(sub31.contains(2));
76+
assertTrue(sub32.contains(2));
77+
}
78+
}
79+
80+
private static int getDataId(Message m) {
81+
return Integer.parseInt(new String(m.getData()));
82+
}
83+
3684
@Test
3785
public void testSingleMessage() throws Exception {
3886
try (NatsTestServer ts = new NatsTestServer(false);

0 commit comments

Comments
 (0)