From 00a8a0256f303865fa345f3f8e7460ebb76150e9 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 18 Sep 2024 17:13:07 +0200 Subject: [PATCH 01/10] Add batch direct get Signed-off-by: Maurice van Veen --- .../io/nats/client/JetStreamManagement.java | 15 + .../client/api/MessageBatchGetRequest.java | 347 ++++++++++++++++++ .../io/nats/client/api/MessageBatchInfo.java | 117 ++++++ .../nats/client/impl/NatsJetStreamImpl.java | 3 + .../client/impl/NatsJetStreamManagement.java | 68 ++++ .../io/nats/client/support/ApiConstants.java | 3 + .../support/NatsJetStreamClientError.java | 2 + .../support/NatsJetStreamConstants.java | 3 +- .../io/nats/client/support/Validator.java | 7 + .../client/impl/JetStreamManagementTests.java | 147 +++++++- 10 files changed, 707 insertions(+), 5 deletions(-) create mode 100644 src/main/java/io/nats/client/api/MessageBatchGetRequest.java create mode 100644 src/main/java/io/nats/client/api/MessageBatchInfo.java diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 499c8ef4d..8c7d64d5c 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.time.ZonedDateTime; import java.util.List; +import java.util.function.Consumer; /** * JetStream Management context for creation and access to streams and consumers in NATS. @@ -324,6 +325,20 @@ public interface JetStreamManagement { */ MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException; + /** + * Request a batch of messages using a {@link MessageBatchGetRequest}. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream + * @param messageBatchGetRequest the request details + * @param consumer the handler used to process messages in the batch + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException; + /** * Deletes a message, overwriting the message data with garbage * This can be considered an expensive (time-consuming) operation, but is more secure. diff --git a/src/main/java/io/nats/client/api/MessageBatchGetRequest.java b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java new file mode 100644 index 000000000..3be679f43 --- /dev/null +++ b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java @@ -0,0 +1,347 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.support.JsonSerializable; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static io.nats.client.support.ApiConstants.*; +import static io.nats.client.support.JsonUtils.*; +import static io.nats.client.support.Validator.*; + +/** + * Object used to make a request for message batch get requests. + */ +public class MessageBatchGetRequest implements JsonSerializable { + + private final Duration timeout; + private final int batch; + private final int maxBytes; + private final long sequence; + private final ZonedDateTime startTime; + private final String nextBySubject; + private final List multiLastFor; + private final long upToSequence; + private final ZonedDateTime upToTime; + + MessageBatchGetRequest(Builder b) { + this.timeout = b.timeout; + this.batch = b.batch; + this.maxBytes = b.maxBytes; + this.sequence = b.sequence; + this.startTime = b.startTime; + this.nextBySubject = b.nextBySubject; + this.multiLastFor = b.multiLastFor; + this.upToSequence = b.upToSequence; + this.upToTime = b.upToTime; + } + + /** + * Timeout used for the request. + * + * @return Duration + */ + public Duration getTimeout() { + return timeout; + } + + /** + * Maximum amount of messages to be returned for this request. + * + * @return batch size + */ + public int getBatch() { + return batch; + } + + /** + * Maximum amount of returned bytes for this request. + * Limits the amount of returned messages to not exceed this. + * + * @return maximum bytes + */ + public int getMaxBytes() { + return maxBytes; + } + + /** + * Minimum sequence for returned messages. + * All returned messages will have a sequence equal to or higher than this. + * + * @return minimum message sequence + */ + public long getSequence() { + return sequence; + } + + /** + * Minimum start time for returned messages. + * All returned messages will have a start time equal to or higher than this. + * + * @return minimum message start time + */ + public ZonedDateTime getStartTime() { + return startTime; + } + + /** + * Subject used to filter messages that should be returned. + * + * @return the subject to filter + */ + public String getSubject() { + return nextBySubject; + } + + /** + * Subjects filter used, these can include wildcards. + * Will get the last messages matching the subjects. + * + * @return the subjects to get the last messages for + */ + public List getMultiLastForSubjects() { + return multiLastFor; + } + + /** + * Only return messages up to this sequence. + * + * @return the maximum message sequence to return results for + */ + public long getUpToSequence() { + return upToSequence; + } + + /** + * Only return messages up to this time. + * + * @return the maximum message time to return results for + */ + public ZonedDateTime getUpToTime() { + return upToTime; + } + + @Override + public String toJson() { + StringBuilder sb = beginJson(); + addField(sb, BATCH, batch); + addField(sb, MAX_BYTES, maxBytes); + addField(sb, SEQ, sequence); + addField(sb, START_TIME, startTime); + addField(sb, NEXT_BY_SUBJECT, nextBySubject); + addStrings(sb, MULTI_LAST, multiLastFor); + addField(sb, UP_TO_SEQ, upToSequence); + addField(sb, UP_TO_TIME, upToTime); + return endJson(sb).toString(); + } + + /** + * Creates a builder for the request. + * + * @return Builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a builder for the request. + * + * @param req the {@link MessageBatchGetRequest} + * @return Builder + */ + public static Builder builder(MessageBatchGetRequest req) { + return req == null ? new Builder() : new Builder(req); + } + + /** + * {@link MessageBatchGetRequest} is created using a Builder. The builder supports chaining and will + * create a default set of options if no methods are calls. + * + *

{@code MessageBatchGetRequest.builder().build()} will create a default {@link MessageBatchGetRequest}. + */ + public static class Builder { + private Duration timeout = Duration.ofSeconds(5); + private int batch = -1; + private int maxBytes = -1; + private long sequence = -1; + private ZonedDateTime startTime = null; + private String nextBySubject = null; + private List multiLastFor = new ArrayList<>(); + private long upToSequence = -1; + private ZonedDateTime upToTime = null; + + /** + * Construct the builder + */ + public Builder() { + } + + /** + * Construct the builder and initialize values with the existing {@link MessageBatchGetRequest} + * + * @param req the {@link MessageBatchGetRequest} to clone + */ + public Builder(MessageBatchGetRequest req) { + if (req != null) { + this.timeout = req.timeout; + this.batch = req.batch; + this.maxBytes = req.maxBytes; + this.sequence = req.sequence; + this.startTime = req.startTime; + this.nextBySubject = req.nextBySubject; + this.multiLastFor = req.multiLastFor; + this.upToSequence = req.upToSequence; + this.upToTime = req.upToTime; + } + } + + /** + * Set the timeout used for the request. + * + * @param timeout the timeout + * @return Builder + */ + public Builder timeout(Duration timeout) { + validateDurationRequired(timeout); + this.timeout = timeout; + return this; + } + + /** + * Set the maximum amount of messages to be returned for this request. + * + * @param batch the batch size + * @return Builder + */ + public Builder batch(int batch) { + validateGtZero(batch, "Request batch size"); + this.batch = batch; + return this; + } + + /** + * Maximum amount of returned bytes for this request. + * Limits the amount of returned messages to not exceed this. + * + * @param maxBytes the maximum bytes + * @return Builder + */ + public Builder maxBytes(int maxBytes) { + this.maxBytes = maxBytes; + return this; + } + + /** + * Minimum sequence for returned messages. + * All returned messages will have a sequence equal to or higher than this. + * + * @param sequence the minimum message sequence + * @return Builder + */ + public Builder sequence(long sequence) { + validateGtEqZero(sequence, "Sequence"); + this.sequence = sequence; + return this; + } + + /** + * Minimum start time for returned messages. + * All returned messages will have a start time equal to or higher than this. + * + * @param startTime the minimum message start time + * @return Builder + */ + public Builder startTime(ZonedDateTime startTime) { + this.startTime = startTime; + return this; + } + + /** + * Subject used to filter messages that should be returned. + * + * @param subject the subject to filter + * @return Builder + */ + public Builder subject(String subject) { + this.nextBySubject = subject; + return this; + } + + /** + * Subjects filter used, these can include wildcards. + * Will get the last messages matching the subjects. + * + * @param subjects the subjects to get the last messages for + * @return Builder + */ + public Builder multiLastForSubjects(String... subjects) { + this.multiLastFor.clear(); + this.multiLastFor.addAll(Arrays.asList(subjects)); + return this; + } + + /** + * Subjects filter used, these can include wildcards. + * Will get the last messages matching the subjects. + * + * @param subjects the subjects to get the last messages for + * @return Builder + */ + public Builder multiLastForSubjects(Collection subjects) { + this.multiLastFor.clear(); + this.multiLastFor.addAll(subjects); + return this; + } + + /** + * Only return messages up to this sequence. + * If not set, will be last sequence for the stream. + * + * @param upToSequence the maximum message sequence to return results for + * @return Builder + */ + public Builder upToSequence(long upToSequence) { + validateGtZero(upToSequence, "Up to sequence"); + this.upToSequence = upToSequence; + return this; + } + + /** + * Only return messages up to this time. + * + * @param upToTime the maximum message time to return results for + * @return Builder + */ + public Builder upToTime(ZonedDateTime upToTime) { + this.upToTime = upToTime; + return this; + } + + /** + * Build the {@link MessageBatchGetRequest}. + * + * @return MessageBatchGetRequest + */ + public MessageBatchGetRequest build() { + return new MessageBatchGetRequest(this); + } + } +} diff --git a/src/main/java/io/nats/client/api/MessageBatchInfo.java b/src/main/java/io/nats/client/api/MessageBatchInfo.java new file mode 100644 index 000000000..00cd1ffb0 --- /dev/null +++ b/src/main/java/io/nats/client/api/MessageBatchInfo.java @@ -0,0 +1,117 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.Message; +import io.nats.client.impl.Headers; +import io.nats.client.support.DateTimeUtils; + +import java.time.ZonedDateTime; + +import static io.nats.client.support.NatsJetStreamConstants.*; + +/** + * The {@link MessageBatchInfo} class contains information about messages returned by a batch request. + */ +public class MessageBatchInfo { + + private final String subject; + private final long seq; + private final byte[] data; + private final ZonedDateTime time; + private final Headers headers; + private final long lastSeq; + private final long numPending; + + public MessageBatchInfo(Message msg) { + Headers msgHeaders = msg.getHeaders(); + this.subject = msgHeaders.getLast(NATS_SUBJECT); + this.data = msg.getData(); + this.seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE)); + this.time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP)); + long tmpLastSeq = Long.parseLong(msgHeaders.getLast(NATS_LAST_SEQUENCE)); + this.lastSeq = tmpLastSeq == 0 ? -1 : tmpLastSeq; + + // Num pending is +1 since it includes EOB message, correct that here. + this.numPending = Long.parseLong(msgHeaders.getLast(NATS_NUM_PENDING)) - 1; + + // these are control headers, not real headers so don't give them to the user. + headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS); + } + + /** + * Get the message subject + * + * @return the subject + */ + public String getSubject() { + return subject; + } + + /** + * Get the message sequence. + * Can be used in a subsequent batch request if there are {@link #getNumPending()} messages. + * In which case {@code getSeq() + 1} is used as the next minimum sequence number. + * + * @return the sequence number + */ + public long getSeq() { + return seq; + } + + /** + * Get the message data + * + * @return the data bytes + */ + public byte[] getData() { + return data; + } + + /** + * Get the time the message was received + * + * @return the time + */ + public ZonedDateTime getTime() { + return time; + } + + /** + * Get the headers + * + * @return the headers object or null if there were no headers + */ + public Headers getHeaders() { + return headers; + } + + /** + * Get the sequence number of the last message in the stream. Not always set. + * + * @return the last sequence or -1 if the value is not known. + */ + public long getLastSeq() { + return lastSeq; + } + + /** + * Amount of pending messages that can be requested with a subsequent batch request. + * + * @return number of pending messages + */ + public long getNumPending() { + return numPending; + } +} diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 853bfc3ab..b712cff27 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -47,6 +47,7 @@ public CachedStreamInfo(StreamInfo si) { final JetStreamOptions jso; final boolean consumerCreate290Available; final boolean multipleSubjectFilter210Available; + final boolean directBatchGet211Available; // ---------------------------------------------------------------------------------------------------- // Create / Init @@ -63,6 +64,7 @@ public CachedStreamInfo(StreamInfo si) { consumerCreate290Available = conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate(); multipleSubjectFilter210Available = conn.getInfo().isNewerVersionThan("2.9.99"); + directBatchGet211Available = conn.getInfo().isNewerVersionThan("2.10.99"); } NatsJetStreamImpl(NatsJetStreamImpl impl) { @@ -70,6 +72,7 @@ public CachedStreamInfo(StreamInfo si) { jso = impl.jso; consumerCreate290Available = impl.consumerCreate290Available; multipleSubjectFilter210Available = impl.multipleSubjectFilter210Available; + directBatchGet211Available = impl.directBatchGet211Available; } // ---------------------------------------------------------------------------------------------------- diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index b6fe2c7cd..fc6a951b3 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -16,12 +16,17 @@ import io.nats.client.*; import io.nats.client.api.Error; import io.nats.client.api.*; +import io.nats.client.support.Status; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; +import java.util.Iterator; import java.util.List; +import java.util.function.Consumer; +import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; +import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable; import static io.nats.client.support.Validator.*; public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement { @@ -340,6 +345,69 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR } } + /** + * {@inheritDoc} + */ + @Override + public void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException { + validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); + + if (!directBatchGet211Available) { + throw JsDirectBatchGet211NotAvailable.instance(); + } + + CachedStreamInfo csi = getCachedStreamInfo(streamName); + if (!csi.allowDirect) { + throw JsAllowDirectRequired.instance(); + } + + Subscription sub = null; + try { + String replyTo = conn.createInbox(); + sub = conn.subscribe(replyTo); + + String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName)); + conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize()); + + long start = System.currentTimeMillis(); + long maxTimeMillis = messageBatchGetRequest.getTimeout().toMillis(); + long timeLeft = maxTimeMillis; + while (true) { + Message msg = responseRequired(sub.nextMessage(timeLeft)); + if (msg.isStatusMessage()) { + Status status = msg.getStatus(); + if (status.getCode() < 200 || status.getCode() > 299) { + throw new JetStreamApiException(Error.convert(msg.getStatus())); + } + return; + } + + Headers headers = msg.getHeaders(); + if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) { + throw JsDirectBatchGet211NotAvailable.instance(); + } + + MessageBatchInfo messageBatchInfo = new MessageBatchInfo(msg); + consumer.accept(messageBatchInfo); + timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); + } + } + catch (InterruptedException e) { + // sub.nextMessage was fetching one message + // and data is not completely read + // so it seems like this is an error condition + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + finally { + try { + //noinspection DataFlowIssue + sub.unsubscribe(); + } + catch (Exception ignore) {} + } + } + /** * {@inheritDoc} */ diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 16ff26f17..e3479b4b8 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -126,6 +126,7 @@ public interface ApiConstants { String MTIME = "mtime"; String MIRROR = "mirror"; String MSGS = "msgs"; + String MULTI_LAST = "multi_last"; String NAME = "name"; String NEXT_BY_SUBJECT = "next_by_subj"; String NO_ACK = "no_ack"; @@ -202,5 +203,7 @@ public interface ApiConstants { String TLS_AVAILABLE = "tls_available"; String TOTAL = "total"; String TYPE = "type"; + String UP_TO_SEQ = "up_to_seq"; + String UP_TO_TIME = "up_to_time"; String VERSION = "version"; } diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java index 6bda9e345..5da56cdff 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java @@ -70,6 +70,8 @@ public class NatsJetStreamClientError { public static final NatsJetStreamClientError JsConsumerCreate290NotAvailable = new NatsJetStreamClientError(CON, 90301, "Name field not valid when v2.9.0 consumer create api is not available."); public static final NatsJetStreamClientError JsConsumerNameDurableMismatch = new NatsJetStreamClientError(CON, 90302, "Name must match durable if both are supplied."); public static final NatsJetStreamClientError JsMultipleFilterSubjects210NotAvailable = new NatsJetStreamClientError(CON, 90303, "Multiple filter subjects not available until server version 2.10.0."); + public static final NatsJetStreamClientError JsAllowDirectRequired = new NatsJetStreamClientError(CON, 90304, "Stream must have allow direct set."); + public static final NatsJetStreamClientError JsDirectBatchGet211NotAvailable = new NatsJetStreamClientError(CON, 90305, "Batch direct get not available until server version 2.11.0."); @Deprecated // Fixed spelling error public static final NatsJetStreamClientError JsSubFcHbHbNotValidQueue = new NatsJetStreamClientError(SUB, 90006, "Flow Control and/or heartbeat is not valid in queue mode."); diff --git a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java index 45aed12b2..c7e4d4a3b 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java @@ -100,7 +100,8 @@ public interface NatsJetStreamConstants { String NATS_TIMESTAMP = "Nats-Time-Stamp"; String NATS_SUBJECT = "Nats-Subject"; String NATS_LAST_SEQUENCE = "Nats-Last-Sequence"; - String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE}; + String NATS_NUM_PENDING = "Nats-Num-Pending"; + String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE, NATS_NUM_PENDING}; String NATS_PENDING_MESSAGES = "Nats-Pending-Messages"; String NATS_PENDING_BYTES = "Nats-Pending-Bytes"; diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index 517832637..e8dcbf109 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -383,6 +383,13 @@ public static int validateGtZero(int i, String label) { return i; } + public static long validateGtZero(long l, String label) { + if (l < 1) { + throw new IllegalArgumentException(label + " must be greater than zero"); + } + return l; + } + public static long validateGtZeroOrMinus1(long l, String label) { if (zeroOrLtMinus1(l)) { throw new IllegalArgumentException(label + " must be greater than zero or -1 for unlimited"); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 47f25d989..5a2f8891c 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -23,11 +23,12 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Collectors; import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; @@ -1548,4 +1549,142 @@ public void testCreateConsumerUpdateConsumer() throws Exception { assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); }); } + + @Test + public void testBatchDirectGet() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + TestingStreamContainer tsc = new TestingStreamContainer(nc); + assertFalse(tsc.si.getConfiguration().getAllowDirect()); + + List expected = Arrays.asList("foo", "bar", "baz"); + for (String data : expected) { + js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); + } + + List batch = new ArrayList<>(); + Consumer handler = batch::add; + + // Stream doesn't have AllowDirect enabled, will error. + assertThrows(IllegalArgumentException.class, () -> { + MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); + jsm.getMessageBatch(tsc.stream, request, handler); + }); + + // Enable AllowDirect. + StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); + StreamInfo si = jsm.updateStream(sc); + assertTrue(si.getConfiguration().getAllowDirect()); + + MessageBatchGetRequest request = MessageBatchGetRequest.builder() + .batch(2) + .subject(tsc.subject()) + .build(); + + // First batch gets first two messages. + jsm.getMessageBatch(tsc.stream, request, handler); + MessageBatchInfo last = batch.get(batch.size() - 1); + assertEquals(1, last.getNumPending()); + assertEquals(2, last.getSeq()); + assertEquals(1, last.getLastSeq()); + + // Second batch gets last message. + request = MessageBatchGetRequest.builder(request) + .sequence(last.getSeq() + 1) + .build(); + jsm.getMessageBatch(tsc.stream, request, handler); + + List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); + assertEquals(expected, actual); + + last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(-1, last.getLastSeq()); + }); + } + + @Test + public void testBatchDirectGetMultiLast() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + String stream = stream(); + jsm.addStream(StreamConfiguration.builder() + .name(stream) + .subjects(stream + ".a.>") + .allowDirect(true) + .build()); + + String subjectAFoo = stream + ".a.foo"; + String subjectABar = stream + ".a.bar"; + String subjectABaz = stream + ".a.baz"; + js.publish(subjectAFoo, "foo".getBytes(StandardCharsets.UTF_8)); + js.publish(subjectABar, "bar".getBytes(StandardCharsets.UTF_8)); + js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); + + List keys = new ArrayList<>(); + Consumer handler = msg -> keys.add(msg.getSubject()); + + MessageBatchGetRequest request = MessageBatchGetRequest.builder() + .multiLastForSubjects(subjectAFoo, subjectABaz) + .build(); + + // First batch gets first two messages. + jsm.getMessageBatch(stream, request, handler); + assertEquals(2, keys.size()); + assertEquals(subjectAFoo, keys.get(0)); + assertEquals(subjectABaz, keys.get(1)); + }); + } + + @Test + public void testBatchDirectGetBuilder() { + // Default timeout + assertEquals(Duration.ofSeconds(5), MessageBatchGetRequest.builder().build().getTimeout()); + + // Request options. + MessageBatchGetRequest requestOptions = MessageBatchGetRequest.builder() + .timeout(Duration.ofSeconds(1)) + .maxBytes(1234) + .batch(2) + .build(); + assertEquals(Duration.ofSeconds(1), requestOptions.getTimeout()); + assertEquals(1234, requestOptions.getMaxBytes()); + assertEquals(2, requestOptions.getBatch()); + assertEquals("{\"batch\":2,\"max_bytes\":1234}", requestOptions.toJson()); + + // Batch direct get - simple + ZonedDateTime time = Instant.EPOCH.atZone(ZoneOffset.UTC); + MessageBatchGetRequest simple = MessageBatchGetRequest.builder() + .sequence(1) + .startTime(time) + .subject("subject") + .build(); + assertEquals(1, simple.getSequence()); + assertEquals(time, simple.getStartTime()); + assertEquals("subject", simple.getSubject()); + assertEquals("{\"seq\":1,\"start_time\":\"1970-01-01T00:00:00.000000000Z\",\"next_by_subj\":\"subject\"}", simple.toJson()); + + // Batch direct get - multi last + List multiLastFor = Collections.singletonList("multi.last"); + MessageBatchGetRequest multiLast = MessageBatchGetRequest.builder() + .multiLastForSubjects("multi.last") + .upToSequence(1) + .upToTime(time) + .build(); + assertEquals(Collections.singletonList("multi.last"), multiLast.getMultiLastForSubjects()); + assertEquals(1, multiLast.getUpToSequence()); + assertEquals(time, multiLast.getUpToTime()); + assertEquals("{\"multi_last\":[\"multi.last\"],\"up_to_seq\":1,\"up_to_time\":\"1970-01-01T00:00:00.000000000Z\"}", multiLast.toJson()); + + MessageBatchGetRequest multiLastAlternative = MessageBatchGetRequest.builder() + .multiLastForSubjects(multiLastFor) + .build(); + assertEquals(multiLastFor, multiLastAlternative.getMultiLastForSubjects()); + assertEquals("{\"multi_last\":[\"multi.last\"]}", multiLastAlternative.toJson()); + } } From c93fa00998c378c3790ce7b18b85fb146131f435 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 23 Sep 2024 18:14:45 +0200 Subject: [PATCH 02/10] Remove MessageBatchInfo Signed-off-by: Maurice van Veen --- .../io/nats/client/JetStreamManagement.java | 2 +- .../io/nats/client/api/MessageBatchInfo.java | 117 ------------------ .../java/io/nats/client/api/MessageInfo.java | 26 +++- .../client/impl/NatsJetStreamManagement.java | 6 +- .../client/impl/JetStreamManagementTests.java | 10 +- 5 files changed, 32 insertions(+), 129 deletions(-) delete mode 100644 src/main/java/io/nats/client/api/MessageBatchInfo.java diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 8c7d64d5c..da3abc7fa 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -337,7 +337,7 @@ public interface JetStreamManagement { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException; + void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException; /** * Deletes a message, overwriting the message data with garbage diff --git a/src/main/java/io/nats/client/api/MessageBatchInfo.java b/src/main/java/io/nats/client/api/MessageBatchInfo.java deleted file mode 100644 index 00cd1ffb0..000000000 --- a/src/main/java/io/nats/client/api/MessageBatchInfo.java +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2024 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package io.nats.client.api; - -import io.nats.client.Message; -import io.nats.client.impl.Headers; -import io.nats.client.support.DateTimeUtils; - -import java.time.ZonedDateTime; - -import static io.nats.client.support.NatsJetStreamConstants.*; - -/** - * The {@link MessageBatchInfo} class contains information about messages returned by a batch request. - */ -public class MessageBatchInfo { - - private final String subject; - private final long seq; - private final byte[] data; - private final ZonedDateTime time; - private final Headers headers; - private final long lastSeq; - private final long numPending; - - public MessageBatchInfo(Message msg) { - Headers msgHeaders = msg.getHeaders(); - this.subject = msgHeaders.getLast(NATS_SUBJECT); - this.data = msg.getData(); - this.seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE)); - this.time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP)); - long tmpLastSeq = Long.parseLong(msgHeaders.getLast(NATS_LAST_SEQUENCE)); - this.lastSeq = tmpLastSeq == 0 ? -1 : tmpLastSeq; - - // Num pending is +1 since it includes EOB message, correct that here. - this.numPending = Long.parseLong(msgHeaders.getLast(NATS_NUM_PENDING)) - 1; - - // these are control headers, not real headers so don't give them to the user. - headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS); - } - - /** - * Get the message subject - * - * @return the subject - */ - public String getSubject() { - return subject; - } - - /** - * Get the message sequence. - * Can be used in a subsequent batch request if there are {@link #getNumPending()} messages. - * In which case {@code getSeq() + 1} is used as the next minimum sequence number. - * - * @return the sequence number - */ - public long getSeq() { - return seq; - } - - /** - * Get the message data - * - * @return the data bytes - */ - public byte[] getData() { - return data; - } - - /** - * Get the time the message was received - * - * @return the time - */ - public ZonedDateTime getTime() { - return time; - } - - /** - * Get the headers - * - * @return the headers object or null if there were no headers - */ - public Headers getHeaders() { - return headers; - } - - /** - * Get the sequence number of the last message in the stream. Not always set. - * - * @return the last sequence or -1 if the value is not known. - */ - public long getLastSeq() { - return lastSeq; - } - - /** - * Amount of pending messages that can be requested with a subsequent batch request. - * - * @return number of pending messages - */ - public long getNumPending() { - return numPending; - } -} diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index 206cd8b92..97e0680c4 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -40,6 +40,7 @@ public class MessageInfo extends ApiResponse { private final Headers headers; private final String stream; private final long lastSeq; + private final long numPending; /** * Create a Message Info @@ -70,12 +71,20 @@ public MessageInfo(Message msg, String streamName, boolean direct) { seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE)); time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP)); stream = msgHeaders.getLast(NATS_STREAM); - String temp = msgHeaders.getLast(NATS_LAST_SEQUENCE); - if (temp == null) { + String tempLastSeq = msgHeaders.getLast(NATS_LAST_SEQUENCE); + if (tempLastSeq == null) { lastSeq = -1; } else { - lastSeq = JsonUtils.safeParseLong(temp, -1); + lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1); + } + String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING); + if (tempNumPending == null) { + numPending = -1; + } + else { + // Num pending is +1 since it includes EOB message, correct that here. + numPending = Long.parseLong(tempNumPending) - 1; } // these are control headers, not real headers so don't give them to the user. headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS); @@ -88,6 +97,7 @@ else if (hasError()) { headers = null; stream = null; lastSeq = -1; + numPending = -1; } else { JsonValue mjv = readValue(jv, MESSAGE); @@ -99,6 +109,7 @@ else if (hasError()) { headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders(); stream = streamName; lastSeq = -1; + numPending = -1; } } @@ -158,6 +169,14 @@ public long getLastSeq() { return lastSeq; } + /** + * Amount of pending messages that can be requested with a subsequent batch request. + * @return number of pending messages + */ + public long getNumPending() { + return numPending; + } + @Override public String toString() { StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":"); @@ -174,6 +193,7 @@ public String toString() { JsonUtils.addField(sb, TIME, time); JsonUtils.addField(sb, STREAM, stream); JsonUtils.addField(sb, "last_seq", lastSeq); + JsonUtils.addField(sb, NUM_PENDING, numPending); JsonUtils.addField(sb, SUBJECT, subject); JsonUtils.addField(sb, HDRS, headers); return JsonUtils.endJson(sb).toString(); diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index fc6a951b3..0589b1845 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -349,7 +349,7 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR * {@inheritDoc} */ @Override - public void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException { + public void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException { validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); if (!directBatchGet211Available) { @@ -387,8 +387,8 @@ public void getMessageBatch(String streamName, MessageBatchGetRequest messageBat throw JsDirectBatchGet211NotAvailable.instance(); } - MessageBatchInfo messageBatchInfo = new MessageBatchInfo(msg); - consumer.accept(messageBatchInfo); + MessageInfo messageInfo = new MessageInfo(msg, streamName, true); + consumer.accept(messageInfo); timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 5a2f8891c..cc7193d92 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1564,8 +1564,8 @@ public void testBatchDirectGet() throws Exception { js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); } - List batch = new ArrayList<>(); - Consumer handler = batch::add; + List batch = new ArrayList<>(); + Consumer handler = batch::add; // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { @@ -1585,7 +1585,7 @@ public void testBatchDirectGet() throws Exception { // First batch gets first two messages. jsm.getMessageBatch(tsc.stream, request, handler); - MessageBatchInfo last = batch.get(batch.size() - 1); + MessageInfo last = batch.get(batch.size() - 1); assertEquals(1, last.getNumPending()); assertEquals(2, last.getSeq()); assertEquals(1, last.getLastSeq()); @@ -1602,7 +1602,7 @@ public void testBatchDirectGet() throws Exception { last = batch.get(batch.size() - 1); assertEquals(0, last.getNumPending()); assertEquals(3, last.getSeq()); - assertEquals(-1, last.getLastSeq()); + assertEquals(0, last.getLastSeq()); }); } @@ -1627,7 +1627,7 @@ public void testBatchDirectGetMultiLast() throws Exception { js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); List keys = new ArrayList<>(); - Consumer handler = msg -> keys.add(msg.getSubject()); + Consumer handler = msg -> keys.add(msg.getSubject()); MessageBatchGetRequest request = MessageBatchGetRequest.builder() .multiLastForSubjects(subjectAFoo, subjectABaz) From 7c3f4ea24956ffe914e75c35ee2989b1ce978efa Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 24 Sep 2024 11:51:59 +0200 Subject: [PATCH 03/10] Use pre-existing ApiConstants Signed-off-by: Maurice van Veen --- src/main/java/io/nats/client/api/MessageInfo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index 97e0680c4..d52f1229b 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -181,7 +181,7 @@ public long getNumPending() { public String toString() { StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":"); JsonUtils.addField(sb, "direct", direct); - JsonUtils.addField(sb, "error", getError()); + JsonUtils.addField(sb, ERROR, getError()); JsonUtils.addField(sb, SUBJECT, subject); JsonUtils.addField(sb, SEQ, seq); if (data == null) { @@ -192,7 +192,7 @@ public String toString() { } JsonUtils.addField(sb, TIME, time); JsonUtils.addField(sb, STREAM, stream); - JsonUtils.addField(sb, "last_seq", lastSeq); + JsonUtils.addField(sb, LAST_SEQ, lastSeq); JsonUtils.addField(sb, NUM_PENDING, numPending); JsonUtils.addField(sb, SUBJECT, subject); JsonUtils.addField(sb, HDRS, headers); From 63f3fceff1e513d3352a930d98a1b05812985c0b Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 24 Sep 2024 11:55:53 +0200 Subject: [PATCH 04/10] Add MessageInfoHandler Signed-off-by: Maurice van Veen --- .../io/nats/client/JetStreamManagement.java | 5 ++-- .../io/nats/client/MessageInfoHandler.java | 29 +++++++++++++++++++ .../client/impl/NatsJetStreamManagement.java | 6 ++-- .../client/impl/JetStreamManagementTests.java | 5 ++-- 4 files changed, 35 insertions(+), 10 deletions(-) create mode 100644 src/main/java/io/nats/client/MessageInfoHandler.java diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index da3abc7fa..0400a0967 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.time.ZonedDateTime; import java.util.List; -import java.util.function.Consumer; /** * JetStream Management context for creation and access to streams and consumers in NATS. @@ -332,12 +331,12 @@ public interface JetStreamManagement { * * @param streamName the name of the stream * @param messageBatchGetRequest the request details - * @param consumer the handler used to process messages in the batch + * @param handler the handler used to process messages in the batch * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException; + void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; /** * Deletes a message, overwriting the message data with garbage diff --git a/src/main/java/io/nats/client/MessageInfoHandler.java b/src/main/java/io/nats/client/MessageInfoHandler.java new file mode 100644 index 000000000..5b69412d8 --- /dev/null +++ b/src/main/java/io/nats/client/MessageInfoHandler.java @@ -0,0 +1,29 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import io.nats.client.api.MessageInfo; + +/** + * Handler used to process {@link MessageInfo}. + */ +public interface MessageInfoHandler { + /** + * Called to deliver a {@link MessageInfo} to the handler. + * + * @param messageInfo the received {@link MessageInfo} + * @throws InterruptedException if the handling thread is interrupted + */ + void onMessageInfo(MessageInfo messageInfo) throws InterruptedException; +} diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 0589b1845..4232e0696 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; -import java.util.Iterator; import java.util.List; -import java.util.function.Consumer; import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable; @@ -349,7 +347,7 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR * {@inheritDoc} */ @Override - public void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer consumer) throws IOException, JetStreamApiException { + public void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); if (!directBatchGet211Available) { @@ -388,7 +386,7 @@ public void getMessageBatch(String streamName, MessageBatchGetRequest messageBat } MessageInfo messageInfo = new MessageInfo(msg, streamName, true); - consumer.accept(messageInfo); + handler.onMessageInfo(messageInfo); timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index cc7193d92..6ba6c6c4d 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -27,7 +27,6 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.*; -import java.util.function.Consumer; import java.util.stream.Collectors; import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; @@ -1565,7 +1564,7 @@ public void testBatchDirectGet() throws Exception { } List batch = new ArrayList<>(); - Consumer handler = batch::add; + MessageInfoHandler handler = batch::add; // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { @@ -1627,7 +1626,7 @@ public void testBatchDirectGetMultiLast() throws Exception { js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); List keys = new ArrayList<>(); - Consumer handler = msg -> keys.add(msg.getSubject()); + MessageInfoHandler handler = msg -> keys.add(msg.getSubject()); MessageBatchGetRequest request = MessageBatchGetRequest.builder() .multiLastForSubjects(subjectAFoo, subjectABaz) From ca5e800ea73fa32976de0cf2c96bd8d3c0cf5b89 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 24 Sep 2024 11:59:30 +0200 Subject: [PATCH 05/10] Add new errors to README Signed-off-by: Maurice van Veen --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 192b2782b..907d8ab1c 100644 --- a/README.md +++ b/README.md @@ -980,6 +980,8 @@ You can however set the deliver policy which will be used to start the subscript | JsConsumerCreate290NotAvailable | CON-90301 | Name field not valid when v2.9.0 consumer create api is not available. | | JsConsumerNameDurableMismatch | CON-90302 | Name must match durable if both are supplied. | | JsMultipleFilterSubjects210NotAvailable | CON-90303 | Multiple filter subjects not available until server version 2.10.0. | +| JsAllowDirectRequired | CON-90304 | Stream must have allow direct set. | +| JsDirectBatchGet211NotAvailable | CON-90305 | Batch direct get not available until server version 2.11.0. | | OsObjectNotFound | OS-90201 | The object was not found. | | OsObjectIsDeleted | OS-90202 | The object is deleted. | | OsObjectAlreadyExists | OS-90203 | An object with that name already exists. | From cda1e2e458997da61aa39ccefed91fa0d2b5bfc7 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 24 Sep 2024 13:52:00 +0200 Subject: [PATCH 06/10] Update method signature to return queue Signed-off-by: Maurice van Veen --- .../io/nats/client/JetStreamManagement.java | 5 ++-- .../io/nats/client/MessageInfoHandler.java | 29 ------------------- .../client/impl/NatsJetStreamManagement.java | 9 ++++-- .../client/impl/JetStreamManagementTests.java | 18 +++++------- 4 files changed, 16 insertions(+), 45 deletions(-) delete mode 100644 src/main/java/io/nats/client/MessageInfoHandler.java diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 0400a0967..331cf1232 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.time.ZonedDateTime; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; /** * JetStream Management context for creation and access to streams and consumers in NATS. @@ -331,12 +332,12 @@ public interface JetStreamManagement { * * @param streamName the name of the stream * @param messageBatchGetRequest the request details - * @param handler the handler used to process messages in the batch + * @return queue that will be populated with {@link MessageInfo} * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; + LinkedBlockingQueue getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; /** * Deletes a message, overwriting the message data with garbage diff --git a/src/main/java/io/nats/client/MessageInfoHandler.java b/src/main/java/io/nats/client/MessageInfoHandler.java deleted file mode 100644 index 5b69412d8..000000000 --- a/src/main/java/io/nats/client/MessageInfoHandler.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package io.nats.client; - -import io.nats.client.api.MessageInfo; - -/** - * Handler used to process {@link MessageInfo}. - */ -public interface MessageInfoHandler { - /** - * Called to deliver a {@link MessageInfo} to the handler. - * - * @param messageInfo the received {@link MessageInfo} - * @throws InterruptedException if the handling thread is interrupted - */ - void onMessageInfo(MessageInfo messageInfo) throws InterruptedException; -} diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 4232e0696..9fa07c4e2 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable; @@ -347,9 +348,11 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR * {@inheritDoc} */ @Override - public void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { + public LinkedBlockingQueue getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); + LinkedBlockingQueue q = new LinkedBlockingQueue<>(); + if (!directBatchGet211Available) { throw JsDirectBatchGet211NotAvailable.instance(); } @@ -377,7 +380,7 @@ public void getMessageBatch(String streamName, MessageBatchGetRequest messageBat if (status.getCode() < 200 || status.getCode() > 299) { throw new JetStreamApiException(Error.convert(msg.getStatus())); } - return; + return q; } Headers headers = msg.getHeaders(); @@ -386,7 +389,7 @@ public void getMessageBatch(String streamName, MessageBatchGetRequest messageBat } MessageInfo messageInfo = new MessageInfo(msg, streamName, true); - handler.onMessageInfo(messageInfo); + q.add(messageInfo); timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 6ba6c6c4d..949c7620b 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1563,13 +1563,10 @@ public void testBatchDirectGet() throws Exception { js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); } - List batch = new ArrayList<>(); - MessageInfoHandler handler = batch::add; - // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.getMessageBatch(tsc.stream, request, handler); + jsm.getMessageBatch(tsc.stream, request); }); // Enable AllowDirect. @@ -1583,7 +1580,7 @@ public void testBatchDirectGet() throws Exception { .build(); // First batch gets first two messages. - jsm.getMessageBatch(tsc.stream, request, handler); + List batch = new ArrayList<>(jsm.getMessageBatch(tsc.stream, request)); MessageInfo last = batch.get(batch.size() - 1); assertEquals(1, last.getNumPending()); assertEquals(2, last.getSeq()); @@ -1593,7 +1590,7 @@ public void testBatchDirectGet() throws Exception { request = MessageBatchGetRequest.builder(request) .sequence(last.getSeq() + 1) .build(); - jsm.getMessageBatch(tsc.stream, request, handler); + batch.addAll(jsm.getMessageBatch(tsc.stream, request)); List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); assertEquals(expected, actual); @@ -1625,15 +1622,14 @@ public void testBatchDirectGetMultiLast() throws Exception { js.publish(subjectABar, "bar".getBytes(StandardCharsets.UTF_8)); js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); - List keys = new ArrayList<>(); - MessageInfoHandler handler = msg -> keys.add(msg.getSubject()); - MessageBatchGetRequest request = MessageBatchGetRequest.builder() .multiLastForSubjects(subjectAFoo, subjectABaz) .build(); - // First batch gets first two messages. - jsm.getMessageBatch(stream, request, handler); + List keys = new ArrayList<>(); + for (MessageInfo info : jsm.getMessageBatch(stream, request)) { + keys.add(info.getSubject()); + } assertEquals(2, keys.size()); assertEquals(subjectAFoo, keys.get(0)); assertEquals(subjectABaz, keys.get(1)); From a24177526bfab572016f8a158a3f13d434a91bc2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 25 Sep 2024 16:05:52 +0200 Subject: [PATCH 07/10] Support fetch/iterate/consume semantics & add EOD Signed-off-by: Maurice van Veen --- .../io/nats/client/JetStreamManagement.java | 32 +++++- .../io/nats/client/MessageInfoHandler.java | 29 +++++ .../java/io/nats/client/api/ApiResponse.java | 6 + .../java/io/nats/client/api/MessageInfo.java | 24 ++++ .../client/impl/NatsJetStreamManagement.java | 79 ++++++++++---- .../client/impl/JetStreamManagementTests.java | 103 ++++++++++++++++-- 6 files changed, 242 insertions(+), 31 deletions(-) create mode 100644 src/main/java/io/nats/client/MessageInfoHandler.java diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 331cf1232..ee8f2ce86 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -332,12 +332,40 @@ public interface JetStreamManagement { * * @param streamName the name of the stream * @param messageBatchGetRequest the request details - * @return queue that will be populated with {@link MessageInfo} + * @return a list containing {@link MessageInfo} * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - LinkedBlockingQueue getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; + List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; + + /** + * Request a batch of messages using a {@link MessageBatchGetRequest}. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream + * @param messageBatchGetRequest the request details + * @return a queue used to asynchronously receive {@link MessageInfo} + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + LinkedBlockingQueue iterateMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; + + /** + * Request a batch of messages using a {@link MessageBatchGetRequest}. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream + * @param messageBatchGetRequest the request details + * @param handler the handler used for receiving {@link MessageInfo} + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + void consumeMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; /** * Deletes a message, overwriting the message data with garbage diff --git a/src/main/java/io/nats/client/MessageInfoHandler.java b/src/main/java/io/nats/client/MessageInfoHandler.java new file mode 100644 index 000000000..6a9697a31 --- /dev/null +++ b/src/main/java/io/nats/client/MessageInfoHandler.java @@ -0,0 +1,29 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import io.nats.client.api.MessageInfo; + +/** + * Handler for {@link MessageInfo}. + */ +public interface MessageInfoHandler { + /** + * Called to deliver a {@link MessageInfo} to the handler. + * + * @param messageInfo the received {@link MessageInfo} + * @throws InterruptedException if the thread for this handler is interrupted + */ + void onMessageInfo(MessageInfo messageInfo) throws InterruptedException; +} diff --git a/src/main/java/io/nats/client/api/ApiResponse.java b/src/main/java/io/nats/client/api/ApiResponse.java index 2dbe3a348..e97a42a41 100644 --- a/src/main/java/io/nats/client/api/ApiResponse.java +++ b/src/main/java/io/nats/client/api/ApiResponse.java @@ -76,6 +76,12 @@ public ApiResponse() { type = NO_TYPE; } + public ApiResponse(Error error) { + jv = null; + this.error = error; + type = NO_TYPE; + } + @SuppressWarnings("unchecked") public T throwOnHasError() throws JetStreamApiException { if (hasError()) { diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index d52f1229b..5ded6746f 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -32,6 +32,11 @@ */ public class MessageInfo extends ApiResponse { + /** + * Message returned as a response in {@link MessageBatchGetRequest} to signal end of data. + */ + public static final MessageInfo EOD = new MessageInfo(null, false); + private final boolean direct; private final String subject; private final long seq; @@ -52,6 +57,25 @@ public MessageInfo(Message msg) { this(msg, null, false); } + /** + * Create a Message Info + * This signature is public for testing purposes and is not intended to be used externally. + * @param error the error + * @param direct true if the object is being created from a get direct api call instead of the standard get message + */ + public MessageInfo(Error error, boolean direct) { + super(error); + this.direct = direct; + subject = null; + data = null; + seq = -1; + time = null; + headers = null; + stream = null; + lastSeq = -1; + numPending = -1; + } + /** * Create a Message Info * This signature is public for testing purposes and is not intended to be used externally. diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 9fa07c4e2..c0a3e9014 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -348,20 +349,38 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR * {@inheritDoc} */ @Override - public LinkedBlockingQueue getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { - validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); - - LinkedBlockingQueue q = new LinkedBlockingQueue<>(); + public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + validateMessageBatchGetRequest(streamName, messageBatchGetRequest); + List results = new ArrayList<>(); + _consumeMessageBatch(streamName, messageBatchGetRequest, msg -> { + if (msg != MessageInfo.EOD) { + results.add(msg); + } + }); + return results; + } - if (!directBatchGet211Available) { - throw JsDirectBatchGet211NotAvailable.instance(); - } + /** + * {@inheritDoc} + */ + @Override + public LinkedBlockingQueue iterateMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + validateMessageBatchGetRequest(streamName, messageBatchGetRequest); + final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); + conn.getOptions().getExecutor().submit(() -> _consumeMessageBatch(streamName, messageBatchGetRequest, q::add)); + return q; + } - CachedStreamInfo csi = getCachedStreamInfo(streamName); - if (!csi.allowDirect) { - throw JsAllowDirectRequired.instance(); - } + /** + * {@inheritDoc} + */ + @Override + public void consumeMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { + validateMessageBatchGetRequest(streamName, messageBatchGetRequest); + _consumeMessageBatch(streamName, messageBatchGetRequest, handler); + } + public void _consumeMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { Subscription sub = null; try { String replyTo = conn.createInbox(); @@ -374,13 +393,18 @@ public LinkedBlockingQueue getMessageBatch(String streamName, Messa long maxTimeMillis = messageBatchGetRequest.getTimeout().toMillis(); long timeLeft = maxTimeMillis; while (true) { - Message msg = responseRequired(sub.nextMessage(timeLeft)); + Message msg = sub.nextMessage(timeLeft); + if (msg == null) { + break; + } if (msg.isStatusMessage()) { Status status = msg.getStatus(); + // Report error, otherwise successful status. if (status.getCode() < 200 || status.getCode() > 299) { - throw new JetStreamApiException(Error.convert(msg.getStatus())); + MessageInfo messageInfo = new MessageInfo(Error.convert(status), true); + handler.onMessageInfo(messageInfo); } - return q; + break; } Headers headers = msg.getHeaders(); @@ -389,23 +413,38 @@ public LinkedBlockingQueue getMessageBatch(String streamName, Messa } MessageInfo messageInfo = new MessageInfo(msg, streamName, true); - q.add(messageInfo); + handler.onMessageInfo(messageInfo); timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { // sub.nextMessage was fetching one message // and data is not completely read // so it seems like this is an error condition Thread.currentThread().interrupt(); throw new RuntimeException(e); - } - finally { + } finally { + try { + handler.onMessageInfo(MessageInfo.EOD); + } catch (Exception ignore) { + } try { //noinspection DataFlowIssue sub.unsubscribe(); + } catch (Exception ignore) { } - catch (Exception ignore) {} + } + } + + private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); + + if (!directBatchGet211Available) { + throw JsDirectBatchGet211NotAvailable.instance(); + } + + CachedStreamInfo csi = getCachedStreamInfo(streamName); + if (!csi.allowDirect) { + throw JsAllowDirectRequired.instance(); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 949c7620b..1ddff9dad 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -27,6 +27,8 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; @@ -1563,10 +1565,17 @@ public void testBatchDirectGet() throws Exception { js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); } + List batch = new ArrayList<>(); + MessageInfoHandler handler = msg -> { + if (!msg.hasError() && msg != MessageInfo.EOD) { + batch.add(msg); + } + }; + // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.getMessageBatch(tsc.stream, request); + jsm.consumeMessageBatch(tsc.stream, request, handler); }); // Enable AllowDirect. @@ -1574,13 +1583,21 @@ public void testBatchDirectGet() throws Exception { StreamInfo si = jsm.updateStream(sc); assertTrue(si.getConfiguration().getAllowDirect()); - MessageBatchGetRequest request = MessageBatchGetRequest.builder() + // Empty request errors. + AtomicBoolean hasError = new AtomicBoolean(); + MessageInfoHandler errorHandler = msg -> { + hasError.compareAndSet(false, msg.hasError()); + }; + MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); + jsm.consumeMessageBatch(tsc.stream, request, errorHandler); + assertTrue(hasError.get()); + + // First batch gets first two messages. + request = MessageBatchGetRequest.builder() .batch(2) .subject(tsc.subject()) .build(); - - // First batch gets first two messages. - List batch = new ArrayList<>(jsm.getMessageBatch(tsc.stream, request)); + jsm.consumeMessageBatch(tsc.stream, request, handler); MessageInfo last = batch.get(batch.size() - 1); assertEquals(1, last.getNumPending()); assertEquals(2, last.getSeq()); @@ -1590,7 +1607,7 @@ public void testBatchDirectGet() throws Exception { request = MessageBatchGetRequest.builder(request) .sequence(last.getSeq() + 1) .build(); - batch.addAll(jsm.getMessageBatch(tsc.stream, request)); + jsm.consumeMessageBatch(tsc.stream, request, handler); List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); assertEquals(expected, actual); @@ -1602,6 +1619,71 @@ public void testBatchDirectGet() throws Exception { }); } + @Test + public void testBatchDirectGetAlternatives() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + TestingStreamContainer tsc = new TestingStreamContainer(nc); + assertFalse(tsc.si.getConfiguration().getAllowDirect()); + + // Enable AllowDirect. + StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); + StreamInfo si = jsm.updateStream(sc); + assertTrue(si.getConfiguration().getAllowDirect()); + + List expected = Arrays.asList("foo", "bar", "baz"); + for (String data : expected) { + js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); + } + + // Request stays the same for all options. + MessageBatchGetRequest request = MessageBatchGetRequest.builder() + .batch(3) + .subject(tsc.subject()) + .build(); + + // Get using handler. + List batch = new ArrayList<>(); + MessageInfoHandler handler = msg -> { + if (!msg.hasError() && msg != MessageInfo.EOD) { + batch.add(msg); + } + }; + jsm.consumeMessageBatch(tsc.stream, request, handler); + assertEquals(3, batch.size()); + MessageInfo last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(2, last.getLastSeq()); + + // Get using queue. + batch.clear(); + LinkedBlockingQueue queue = jsm.iterateMessageBatch(tsc.stream, request); + MessageInfo msg; + while ((msg = queue.take()) != MessageInfo.EOD) { + if (!msg.hasError()) { + batch.add(msg); + } + } + assertEquals(3, batch.size()); + last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(2, last.getLastSeq()); + + // Get using fetch. + batch.clear(); + batch.addAll(jsm.fetchMessageBatch(tsc.stream, request)); + assertEquals(3, batch.size()); + last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(2, last.getLastSeq()); + }); + } + @Test public void testBatchDirectGetMultiLast() throws Exception { jsServer.run(TestBase::atLeast2_11, nc -> { @@ -1627,9 +1709,12 @@ public void testBatchDirectGetMultiLast() throws Exception { .build(); List keys = new ArrayList<>(); - for (MessageInfo info : jsm.getMessageBatch(stream, request)) { - keys.add(info.getSubject()); - } + MessageInfoHandler handler = msg -> { + if (!msg.hasError() && msg != MessageInfo.EOD) { + keys.add(msg.getSubject()); + } + }; + jsm.consumeMessageBatch(stream, request, handler); assertEquals(2, keys.size()); assertEquals(subjectAFoo, keys.get(0)); assertEquals(subjectABaz, keys.get(1)); From 0e498c216c50e96c55d8fe659cee018a76cf2854 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 25 Sep 2024 16:54:32 +0200 Subject: [PATCH 08/10] Rename consume to gather Signed-off-by: Maurice van Veen --- .../java/io/nats/client/JetStreamManagement.java | 2 +- .../io/nats/client/impl/NatsJetStreamManagement.java | 10 +++++----- .../nats/client/impl/JetStreamManagementTests.java | 12 ++++++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index ee8f2ce86..906a27475 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -365,7 +365,7 @@ public interface JetStreamManagement { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - void consumeMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; + void gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; /** * Deletes a message, overwriting the message data with garbage diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index c0a3e9014..1240ee608 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -352,7 +352,7 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); List results = new ArrayList<>(); - _consumeMessageBatch(streamName, messageBatchGetRequest, msg -> { + _gatherMessageBatch(streamName, messageBatchGetRequest, msg -> { if (msg != MessageInfo.EOD) { results.add(msg); } @@ -367,7 +367,7 @@ public List fetchMessageBatch(String streamName, MessageBatchGetReq public LinkedBlockingQueue iterateMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - conn.getOptions().getExecutor().submit(() -> _consumeMessageBatch(streamName, messageBatchGetRequest, q::add)); + conn.getOptions().getExecutor().submit(() -> _gatherMessageBatch(streamName, messageBatchGetRequest, q::add)); return q; } @@ -375,12 +375,12 @@ public LinkedBlockingQueue iterateMessageBatch(String streamName, M * {@inheritDoc} */ @Override - public void consumeMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { + public void gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - _consumeMessageBatch(streamName, messageBatchGetRequest, handler); + _gatherMessageBatch(streamName, messageBatchGetRequest, handler); } - public void _consumeMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { + public void _gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { Subscription sub = null; try { String replyTo = conn.createInbox(); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 1ddff9dad..5649169eb 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1575,7 +1575,7 @@ public void testBatchDirectGet() throws Exception { // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.consumeMessageBatch(tsc.stream, request, handler); + jsm.gatherMessageBatch(tsc.stream, request, handler); }); // Enable AllowDirect. @@ -1589,7 +1589,7 @@ public void testBatchDirectGet() throws Exception { hasError.compareAndSet(false, msg.hasError()); }; MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.consumeMessageBatch(tsc.stream, request, errorHandler); + jsm.gatherMessageBatch(tsc.stream, request, errorHandler); assertTrue(hasError.get()); // First batch gets first two messages. @@ -1597,7 +1597,7 @@ public void testBatchDirectGet() throws Exception { .batch(2) .subject(tsc.subject()) .build(); - jsm.consumeMessageBatch(tsc.stream, request, handler); + jsm.gatherMessageBatch(tsc.stream, request, handler); MessageInfo last = batch.get(batch.size() - 1); assertEquals(1, last.getNumPending()); assertEquals(2, last.getSeq()); @@ -1607,7 +1607,7 @@ public void testBatchDirectGet() throws Exception { request = MessageBatchGetRequest.builder(request) .sequence(last.getSeq() + 1) .build(); - jsm.consumeMessageBatch(tsc.stream, request, handler); + jsm.gatherMessageBatch(tsc.stream, request, handler); List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); assertEquals(expected, actual); @@ -1651,7 +1651,7 @@ public void testBatchDirectGetAlternatives() throws Exception { batch.add(msg); } }; - jsm.consumeMessageBatch(tsc.stream, request, handler); + jsm.gatherMessageBatch(tsc.stream, request, handler); assertEquals(3, batch.size()); MessageInfo last = batch.get(batch.size() - 1); assertEquals(0, last.getNumPending()); @@ -1714,7 +1714,7 @@ public void testBatchDirectGetMultiLast() throws Exception { keys.add(msg.getSubject()); } }; - jsm.consumeMessageBatch(stream, request, handler); + jsm.gatherMessageBatch(stream, request, handler); assertEquals(2, keys.size()); assertEquals(subjectAFoo, keys.get(0)); assertEquals(subjectABaz, keys.get(1)); From b5c4c0b72a80c259f855dd28c8437c8d12993719 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 3 Oct 2024 09:33:19 +0200 Subject: [PATCH 09/10] Rename to request and queue Signed-off-by: Maurice van Veen --- .../java/io/nats/client/JetStreamManagement.java | 4 ++-- .../nats/client/impl/NatsJetStreamManagement.java | 12 ++++++------ .../nats/client/impl/JetStreamManagementTests.java | 14 +++++++------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 906a27475..6755e49d0 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -351,7 +351,7 @@ public interface JetStreamManagement { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - LinkedBlockingQueue iterateMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; + LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; /** * Request a batch of messages using a {@link MessageBatchGetRequest}. @@ -365,7 +365,7 @@ public interface JetStreamManagement { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - void gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; + void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; /** * Deletes a message, overwriting the message data with garbage diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 1240ee608..7ac235f55 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -352,7 +352,7 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); List results = new ArrayList<>(); - _gatherMessageBatch(streamName, messageBatchGetRequest, msg -> { + _requestMessageBatch(streamName, messageBatchGetRequest, msg -> { if (msg != MessageInfo.EOD) { results.add(msg); } @@ -364,10 +364,10 @@ public List fetchMessageBatch(String streamName, MessageBatchGetReq * {@inheritDoc} */ @Override - public LinkedBlockingQueue iterateMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - conn.getOptions().getExecutor().submit(() -> _gatherMessageBatch(streamName, messageBatchGetRequest, q::add)); + conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add)); return q; } @@ -375,12 +375,12 @@ public LinkedBlockingQueue iterateMessageBatch(String streamName, M * {@inheritDoc} */ @Override - public void gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { + public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { validateMessageBatchGetRequest(streamName, messageBatchGetRequest); - _gatherMessageBatch(streamName, messageBatchGetRequest, handler); + _requestMessageBatch(streamName, messageBatchGetRequest, handler); } - public void _gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { + public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { Subscription sub = null; try { String replyTo = conn.createInbox(); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 5649169eb..b0586401e 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1575,7 +1575,7 @@ public void testBatchDirectGet() throws Exception { // Stream doesn't have AllowDirect enabled, will error. assertThrows(IllegalArgumentException.class, () -> { MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.gatherMessageBatch(tsc.stream, request, handler); + jsm.requestMessageBatch(tsc.stream, request, handler); }); // Enable AllowDirect. @@ -1589,7 +1589,7 @@ public void testBatchDirectGet() throws Exception { hasError.compareAndSet(false, msg.hasError()); }; MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); - jsm.gatherMessageBatch(tsc.stream, request, errorHandler); + jsm.requestMessageBatch(tsc.stream, request, errorHandler); assertTrue(hasError.get()); // First batch gets first two messages. @@ -1597,7 +1597,7 @@ public void testBatchDirectGet() throws Exception { .batch(2) .subject(tsc.subject()) .build(); - jsm.gatherMessageBatch(tsc.stream, request, handler); + jsm.requestMessageBatch(tsc.stream, request, handler); MessageInfo last = batch.get(batch.size() - 1); assertEquals(1, last.getNumPending()); assertEquals(2, last.getSeq()); @@ -1607,7 +1607,7 @@ public void testBatchDirectGet() throws Exception { request = MessageBatchGetRequest.builder(request) .sequence(last.getSeq() + 1) .build(); - jsm.gatherMessageBatch(tsc.stream, request, handler); + jsm.requestMessageBatch(tsc.stream, request, handler); List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); assertEquals(expected, actual); @@ -1651,7 +1651,7 @@ public void testBatchDirectGetAlternatives() throws Exception { batch.add(msg); } }; - jsm.gatherMessageBatch(tsc.stream, request, handler); + jsm.requestMessageBatch(tsc.stream, request, handler); assertEquals(3, batch.size()); MessageInfo last = batch.get(batch.size() - 1); assertEquals(0, last.getNumPending()); @@ -1660,7 +1660,7 @@ public void testBatchDirectGetAlternatives() throws Exception { // Get using queue. batch.clear(); - LinkedBlockingQueue queue = jsm.iterateMessageBatch(tsc.stream, request); + LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); MessageInfo msg; while ((msg = queue.take()) != MessageInfo.EOD) { if (!msg.hasError()) { @@ -1714,7 +1714,7 @@ public void testBatchDirectGetMultiLast() throws Exception { keys.add(msg.getSubject()); } }; - jsm.gatherMessageBatch(stream, request, handler); + jsm.requestMessageBatch(stream, request, handler); assertEquals(2, keys.size()); assertEquals(subjectAFoo, keys.get(0)); assertEquals(subjectABaz, keys.get(1)); From 38f3c539809e30558f93834141ebba4446c175a5 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 3 Oct 2024 09:33:44 +0200 Subject: [PATCH 10/10] Add error handling test for fetch and queue Signed-off-by: Maurice van Veen --- .../java/io/nats/client/impl/JetStreamManagementTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index b0586401e..e05f21434 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1591,6 +1591,12 @@ public void testBatchDirectGet() throws Exception { MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); jsm.requestMessageBatch(tsc.stream, request, errorHandler); assertTrue(hasError.get()); + List list = jsm.fetchMessageBatch(tsc.stream, request); + assertEquals(1, list.size()); + assertTrue(list.get(0).hasError()); + LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); + assertTrue(queue.take().hasError()); + assertEquals(MessageInfo.EOD, queue.take()); // First batch gets first two messages. request = MessageBatchGetRequest.builder()