Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* JetStream Management context for creation and access to streams and consumers in NATS.
Expand Down Expand Up @@ -325,48 +324,6 @@ public interface JetStreamManagement {
*/
MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a list containing {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a queue used to asynchronously receive {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @param handler the handler used for receiving {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException;

/**
* Deletes a message, overwriting the message data with garbage
* This can be considered an expensive (time-consuming) operation, but is more secure.
Expand Down
44 changes: 0 additions & 44 deletions src/main/java/io/nats/client/api/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
*/
public class MessageInfo extends ApiResponse<MessageInfo> {

/**
* Message returned as a response in {@link MessageBatchGetRequest} to signal end of data.
*/
public static final MessageInfo EOD = new MessageInfo(null, false);

private final boolean direct;
private final String subject;
private final long seq;
Expand All @@ -45,7 +40,6 @@ public class MessageInfo extends ApiResponse<MessageInfo> {
private final Headers headers;
private final String stream;
private final long lastSeq;
private final long numPending;

/**
* Create a Message Info
Expand All @@ -57,25 +51,6 @@ public MessageInfo(Message msg) {
this(msg, null, false);
}

/**
* Create a Message Info
* This signature is public for testing purposes and is not intended to be used externally.
* @param error the error
* @param direct true if the object is being created from a get direct api call instead of the standard get message
*/
public MessageInfo(Error error, boolean direct) {
super(error);
this.direct = direct;
subject = null;
data = null;
seq = -1;
time = null;
headers = null;
stream = null;
lastSeq = -1;
numPending = -1;
}

/**
* Create a Message Info
* This signature is public for testing purposes and is not intended to be used externally.
Expand All @@ -102,14 +77,6 @@ public MessageInfo(Message msg, String streamName, boolean direct) {
else {
lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1);
}
String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING);
if (tempNumPending == null) {
numPending = -1;
}
else {
// Num pending is +1 since it includes EOB message, correct that here.
numPending = Long.parseLong(tempNumPending) - 1;
}
// these are control headers, not real headers so don't give them to the user.
headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
}
Expand All @@ -121,7 +88,6 @@ else if (hasError()) {
headers = null;
stream = null;
lastSeq = -1;
numPending = -1;
}
else {
JsonValue mjv = readValue(jv, MESSAGE);
Expand All @@ -133,7 +99,6 @@ else if (hasError()) {
headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders();
stream = streamName;
lastSeq = -1;
numPending = -1;
}
}

Expand Down Expand Up @@ -193,14 +158,6 @@ public long getLastSeq() {
return lastSeq;
}

/**
* Amount of pending messages that can be requested with a subsequent batch request.
* @return number of pending messages
*/
public long getNumPending() {
return numPending;
}

@Override
public String toString() {
StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":");
Expand All @@ -217,7 +174,6 @@ public String toString() {
JsonUtils.addField(sb, TIME, time);
JsonUtils.addField(sb, STREAM, stream);
JsonUtils.addField(sb, LAST_SEQ, lastSeq);
JsonUtils.addField(sb, NUM_PENDING, numPending);
JsonUtils.addField(sb, SUBJECT, subject);
JsonUtils.addField(sb, HDRS, headers);
return JsonUtils.endJson(sb).toString();
Expand Down
108 changes: 0 additions & 108 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@
import io.nats.client.*;
import io.nats.client.api.Error;
import io.nats.client.api.*;
import io.nats.client.support.Status;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired;
import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable;
import static io.nats.client.support.Validator.*;

public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement {
Expand Down Expand Up @@ -345,109 +340,6 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
}
}

/**
* {@inheritDoc}
*/
@Override
public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
List<MessageInfo> results = new ArrayList<>();
_requestMessageBatch(streamName, messageBatchGetRequest, msg -> {
if (msg != MessageInfo.EOD) {
results.add(msg);
}
});
return results;
}

/**
* {@inheritDoc}
*/
@Override
public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
final LinkedBlockingQueue<MessageInfo> q = new LinkedBlockingQueue<>();
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add));
return q;
}

/**
* {@inheritDoc}
*/
@Override
public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
_requestMessageBatch(streamName, messageBatchGetRequest, handler);
}

public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) {
Subscription sub = null;
try {
String replyTo = conn.createInbox();
sub = conn.subscribe(replyTo);

String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName));
conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize());

long maxTimeMillis = getTimeout().toMillis();
long timeLeft = maxTimeMillis;
long start = System.currentTimeMillis();
while (true) {
Message msg = sub.nextMessage(timeLeft);
if (msg == null) {
break;
}
if (msg.isStatusMessage()) {
Status status = msg.getStatus();
// Report error, otherwise successful status.
if (status.getCode() < 200 || status.getCode() > 299) {
MessageInfo messageInfo = new MessageInfo(Error.convert(status), true);
handler.onMessageInfo(messageInfo);
}
break;
}

Headers headers = msg.getHeaders();
if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) {
throw JsDirectBatchGet211NotAvailable.instance();
}

MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
handler.onMessageInfo(messageInfo);
timeLeft = maxTimeMillis - (System.currentTimeMillis() - start);
}
} catch (InterruptedException e) {
// sub.nextMessage was fetching one message
// and data is not completely read
// so it seems like this is an error condition
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
try {
handler.onMessageInfo(MessageInfo.EOD);
} catch (Exception ignore) {
}
try {
//noinspection DataFlowIssue
sub.unsubscribe();
} catch (Exception ignore) {
}
}
}

private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateNotNull(messageBatchGetRequest, "Message Batch Get Request");

if (!directBatchGet211Available) {
throw JsDirectBatchGet211NotAvailable.instance();
}

CachedStreamInfo csi = getCachedStreamInfo(streamName);
if (!csi.allowDirect) {
throw JsAllowDirectRequired.instance();
}
}

/**
* {@inheritDoc}
*/
Expand Down
Loading