diff --git a/src/main/java/io/nats/client/impl/NatsDispatcher.java b/src/main/java/io/nats/client/impl/NatsDispatcher.java index a4f66fea3..097f1e05b 100644 --- a/src/main/java/io/nats/client/impl/NatsDispatcher.java +++ b/src/main/java/io/nats/client/impl/NatsDispatcher.java @@ -338,7 +338,11 @@ public Dispatcher unsubscribe(String subject, int after) { connection.unsubscribe(defaultHandlerSub, after); } - subWithNonDefaultHandlerBySid.forEach((sid, sub) -> connection.unsubscribe(sub, after)); + subWithNonDefaultHandlerBySid.forEach((sid, sub) -> { + if (subject.equals(sub.getSubject())) { + connection.unsubscribe(sub, after); + } + }); return this; } diff --git a/src/test/java/io/nats/client/SubscriberTests.java b/src/test/java/io/nats/client/SubscriberTests.java index 0bb63780d..da184c887 100644 --- a/src/test/java/io/nats/client/SubscriberTests.java +++ b/src/test/java/io/nats/client/SubscriberTests.java @@ -16,10 +16,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; @@ -491,38 +488,6 @@ public void testInvalidSubjectsAndQueueNames() throws Exception { } } - @Test - public void testDispatcherMultipleSubscriptionsBySubject() throws Exception { - try (NatsTestServer ts = new NatsTestServer(false); - Connection nc = Nats.connect(ts.getURI())) { - standardConnectionWait(nc); - String subject = subject(); - - List dflt = Collections.synchronizedList(new ArrayList<>()); - List nd1 = Collections.synchronizedList(new ArrayList<>()); - List nd2 = Collections.synchronizedList(new ArrayList<>()); - Dispatcher d = nc.createDispatcher(m -> dflt.add(getDataId(m))); - d.subscribe(subject); - d.subscribe(subject, m -> nd1.add(getDataId(m))); - d.subscribe(subject, m -> nd2.add(getDataId(m))); - - nc.publish(subject, "1".getBytes()); - Thread.sleep(1000); - d.unsubscribe(subject); - nc.publish(subject, "2".getBytes()); - Thread.sleep(1000); - - assertTrue(dflt.contains(1)); - assertTrue(nd1.contains(1)); - assertTrue(nd2.contains(1)); - - assertFalse(dflt.contains(2)); - assertFalse(nd1.contains(2)); - assertFalse(nd2.contains(2)); - - } - } - private static int getDataId(Message m) { return Integer.parseInt(new String(m.getData())); } diff --git a/src/test/java/io/nats/client/impl/DispatcherTests.java b/src/test/java/io/nats/client/impl/DispatcherTests.java index 22efee7c3..dc69bc526 100644 --- a/src/test/java/io/nats/client/impl/DispatcherTests.java +++ b/src/test/java/io/nats/client/impl/DispatcherTests.java @@ -18,12 +18,14 @@ import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static io.nats.client.utils.TestBase.sleep; -import static io.nats.client.utils.TestBase.waitUntilStatus; +import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; @@ -33,6 +35,52 @@ // the done message (or should) - wanted to note that somewhere public class DispatcherTests { + @Test + public void testDispatcherMultipleSubscriptionsBySubject() throws Exception { + try (NatsTestServer ts = new NatsTestServer(false); + Connection nc = Nats.connect(ts.getURI())) { + standardConnectionWait(nc); + String subject1 = subject(); + String subject2 = subject(); + + List dflt = Collections.synchronizedList(new ArrayList<>()); + List sub21 = Collections.synchronizedList(new ArrayList<>()); + List sub22 = Collections.synchronizedList(new ArrayList<>()); + List sub31 = Collections.synchronizedList(new ArrayList<>()); + List sub32 = Collections.synchronizedList(new ArrayList<>()); + Dispatcher d1 = nc.createDispatcher(m -> dflt.add(getDataId(m))); + d1.subscribe(subject1); + d1.subscribe(subject1, m -> sub21.add(getDataId(m))); + d1.subscribe(subject1, m -> sub22.add(getDataId(m))); + d1.subscribe(subject2, m -> sub31.add(getDataId(m))); + d1.subscribe(subject2, m -> sub32.add(getDataId(m))); + + nc.publish(subject1, "1".getBytes()); + nc.publish(subject2, "1".getBytes()); + Thread.sleep(1000); + d1.unsubscribe(subject1); + nc.publish(subject1, "2".getBytes()); + nc.publish(subject2, "2".getBytes()); + Thread.sleep(1000); + + assertTrue(dflt.contains(1)); + assertTrue(sub21.contains(1)); + assertTrue(sub22.contains(1)); + assertTrue(sub31.contains(1)); + assertTrue(sub32.contains(1)); + + assertFalse(dflt.contains(2)); + assertFalse(sub21.contains(2)); + assertFalse(sub22.contains(2)); + assertTrue(sub31.contains(2)); + assertTrue(sub32.contains(2)); + } + } + + private static int getDataId(Message m) { + return Integer.parseInt(new String(m.getData())); + } + @Test public void testSingleMessage() throws Exception { try (NatsTestServer ts = new NatsTestServer(false);