Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/FeatureOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public abstract class FeatureOptions {

private final JetStreamOptions jso;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected FeatureOptions(Builder b) {
protected FeatureOptions(Builder<?, ?> b) {
jso = b.jsoBuilder.build();
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public abstract class SubscribeOptions {
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
protected final String name;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected SubscribeOptions(Builder builder, boolean isPull,
protected SubscribeOptions(Builder<?, ?> builder, boolean isPull,
String deliverSubject, String deliverGroup,
long pendingMessageLimit, long pendingByteLimit) {

Expand Down
18 changes: 9 additions & 9 deletions src/test/java/io/nats/client/impl/JetStreamGeneralTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ public void testBindPush() throws Exception {

jsPublish(js, tsc.subject(), 1, 1);
PushSubscribeOptions pso = PushSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();
JetStreamSubscription s = js.subscribe(tsc.subject(), pso);
Message m = s.nextMessage(DEFAULT_TIMEOUT);
Expand All @@ -474,7 +474,7 @@ public void testBindPush() throws Exception {
jsPublish(js, tsc.subject(), 2, 1);
pso = PushSubscribeOptions.builder()
.stream(tsc.stream)
.durable(tsc.name())
.durable(tsc.consumerName())
.bind(true)
.build();
s = js.subscribe(tsc.subject(), pso);
Expand All @@ -485,7 +485,7 @@ public void testBindPush() throws Exception {
unsubscribeEnsureNotBound(s);

jsPublish(js, tsc.subject(), 3, 1);
pso = PushSubscribeOptions.bind(tsc.stream, tsc.name());
pso = PushSubscribeOptions.bind(tsc.stream, tsc.consumerName());
s = js.subscribe(tsc.subject(), pso);
m = s.nextMessage(DEFAULT_TIMEOUT);
assertNotNull(m);
Expand All @@ -495,7 +495,7 @@ public void testBindPush() throws Exception {
() -> PushSubscribeOptions.builder().stream(tsc.stream).bind(true).build());

assertThrows(IllegalArgumentException.class,
() -> PushSubscribeOptions.builder().durable(tsc.name()).bind(true).build());
() -> PushSubscribeOptions.builder().durable(tsc.consumerName()).bind(true).build());

assertThrows(IllegalArgumentException.class,
() -> PushSubscribeOptions.builder().stream(EMPTY).bind(true).build());
Expand All @@ -514,7 +514,7 @@ public void testBindPull() throws Exception {
jsPublish(js, tsc.subject(), 1, 1);

PullSubscribeOptions pso = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();
JetStreamSubscription s = js.subscribe(tsc.subject(), pso);
s.pull(1);
Expand All @@ -527,7 +527,7 @@ public void testBindPull() throws Exception {
jsPublish(js, tsc.subject(), 2, 1);
pso = PullSubscribeOptions.builder()
.stream(tsc.stream)
.durable(tsc.name())
.durable(tsc.consumerName())
.bind(true)
.build();
s = js.subscribe(tsc.subject(), pso);
Expand All @@ -539,7 +539,7 @@ public void testBindPull() throws Exception {
unsubscribeEnsureNotBound(s);

jsPublish(js, tsc.subject(), 3, 1);
pso = PullSubscribeOptions.bind(tsc.stream, tsc.name());
pso = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName());
s = js.subscribe(tsc.subject(), pso);
s.pull(1);
m = s.nextMessage(DEFAULT_TIMEOUT);
Expand Down Expand Up @@ -960,9 +960,9 @@ public void testInternalLookupConsumerInfoCoverage() throws Exception {
// - consumer not found
// - stream does not exist
JetStreamSubscription sub = js.subscribe(tsc.subject());
assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.name()));
assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.consumerName()));
assertThrows(JetStreamApiException.class,
() -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.name()));
() -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.consumerName()));
});
}

Expand Down
24 changes: 12 additions & 12 deletions src/test/java/io/nats/client/impl/JetStreamManagementTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void testAddPausedConsumer() throws Exception {

ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusMinutes(2);
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.pauseUntil(pauseUntil)
.build();

Expand All @@ -849,7 +849,7 @@ public void testPauseResumeConsumer() throws Exception {
assertEquals(0, list.size());

ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();

// durable and name can both be null
Expand Down Expand Up @@ -886,9 +886,9 @@ public void testPauseResumeConsumer() throws Exception {
ci = jsm.getConsumerInfo(tsc.stream, ci.getName());
assertFalse(ci.getPaused());

assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.name(), pauseUntil));
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.consumerName(), pauseUntil));
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(tsc.stream, name(), pauseUntil));
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.name()));
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.consumerName()));
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(tsc.stream, name()));
});
}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ public void testConsumerMetadata() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(jsm);

ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.metadata(metaData)
.build();

Expand Down Expand Up @@ -1065,14 +1065,14 @@ public void testGetConsumerInfo() throws Exception {
jsServer.run(nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.name()));
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.consumerName()));
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc);
assertEquals(tsc.stream, ci.getStreamName());
assertEquals(tsc.name(), ci.getName());
ci = jsm.getConsumerInfo(tsc.stream, tsc.name());
assertEquals(tsc.consumerName(), ci.getName());
ci = jsm.getConsumerInfo(tsc.stream, tsc.consumerName());
assertEquals(tsc.stream, ci.getStreamName());
assertEquals(tsc.name(), ci.getName());
assertEquals(tsc.consumerName(), ci.getName());
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, durable(999)));
if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) {
assertNotNull(ci.getTimestamp());
Expand Down Expand Up @@ -1228,14 +1228,14 @@ public void testConsumerReplica() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

final ConsumerConfiguration cc0 = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();
ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc0);
// server returns 0 when value is not set
assertEquals(0, ci.getConsumerConfiguration().getNumReplicas());

final ConsumerConfiguration cc1 = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.numReplicas(1)
.build();
ci = jsm.addOrUpdateConsumer(tsc.stream, cc1);
Expand Down
26 changes: 13 additions & 13 deletions src/test/java/io/nats/client/impl/JetStreamPullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public void testFetch() throws Exception {
.build();

PullSubscribeOptions options = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.configuration(cc)
.build();

JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

List<Message> messages = sub.fetch(10, fetchDur);
Expand Down Expand Up @@ -139,12 +139,12 @@ public void testIterate() throws Exception {
.build();

PullSubscribeOptions options = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.configuration(cc)
.build();

JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

Iterator<Message> iterator = sub.iterate(10, fetchDur);
Expand Down Expand Up @@ -218,11 +218,11 @@ public void testBasic() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

// Build our subscription options.
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();

// Subscribe synchronously.
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

// publish some amount of messages, but not entire pull size
Expand Down Expand Up @@ -317,11 +317,11 @@ public void testNoWait() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

// Build our subscription options.
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();

// Subscribe synchronously.
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

// publish 10 messages
Expand Down Expand Up @@ -391,11 +391,11 @@ public void testPullExpires() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

// Build our subscription options.
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();

// Subscribe synchronously.
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

long expires = 500; // millis
Expand Down Expand Up @@ -574,7 +574,7 @@ public void testAckWaitTimeout() throws Exception {
.ackWait(1500)
.build();
PullSubscribeOptions pso = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.configuration(cc)
.build();

Expand Down Expand Up @@ -1108,10 +1108,10 @@ public void testReader() throws Exception {
JetStream js = nc.jetStream();

// Pre define a consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).filterSubjects(tsc.subject()).build();
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).filterSubjects(tsc.subject()).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.name());
PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName());
JetStreamSubscription sub = js.subscribe(tsc.subject(), so);
JetStreamReader reader = sub.reader(500, 125);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testQueueSubWorkflow() throws Exception {
// - the PushSubscribeOptions can be re-used since all the subscribers are the same
// - use a concurrent integer to track all the messages received
// - have a list of subscribers and threads so I can track them
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.name()).build();
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.consumerName()).build();
AtomicInteger allReceived = new AtomicInteger();
List<JsQueueSubscriber> subscribers = new ArrayList<>();
List<Thread> subThreads = new ArrayList<>();
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/nats/client/impl/JetStreamTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ public String subject(Object variant) {
return subjects.computeIfAbsent(variant, TestBase::subject);
}

public String name() {
return name(defaultNameVariant);
public String consumerName() {
return consumerName(defaultNameVariant);
}

public String name(Object variant) {
public String consumerName(Object variant) {
return names.computeIfAbsent(variant, TestBase::name);
}
}
Expand Down
42 changes: 21 additions & 21 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,11 @@ public void testIterableConsumer() throws Exception {
JetStream js = nc.jetStream();

// Pre define a consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

// Consumer[Context]
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name());
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName());

int stopCount = 500;
// create the consumer then use it
Expand Down Expand Up @@ -355,11 +355,11 @@ public void testConsumeWithHandler() throws Exception {
jsPublish(js, tsc.subject(), 2500);

// Pre define a consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

// Consumer[Context]
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name());
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName());

int stopCount = 500;

Expand Down Expand Up @@ -428,30 +428,30 @@ public void testCoverage() throws Exception {
JetStream js = nc.jetStream();

// Pre define a consumer
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(1)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(2)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(3)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(4)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(1)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(2)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(3)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(4)).build());

// Stream[Context]
StreamContext sctx1 = nc.getStreamContext(tsc.stream);
nc.getStreamContext(tsc.stream, JetStreamOptions.DEFAULT_JS_OPTIONS);
js.getStreamContext(tsc.stream);

// Consumer[Context]
ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.name(1));
ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.name(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.name(3));
ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.name(4));
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(5)).build());
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(6)).build());

after(cctx1.iterate(), tsc.name(1), true);
after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.name(2), true);
after(cctx3.consume(m -> {}), tsc.name(3), true);
after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.name(4), true);
after(cctx5.fetchMessages(1), tsc.name(5), false);
after(cctx6.fetchBytes(1000), tsc.name(6), false);
ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.consumerName(1));
ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.consumerName(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.consumerName(3));
ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.consumerName(4));
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(5)).build());
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(6)).build());

after(cctx1.iterate(), tsc.consumerName(1), true);
after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.consumerName(2), true);
after(cctx3.consume(m -> {}), tsc.consumerName(3), true);
after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.consumerName(4), true);
after(cctx5.fetchMessages(1), tsc.consumerName(5), false);
after(cctx6.fetchBytes(1000), tsc.consumerName(6), false);
});
}

Expand Down
Loading
Loading