Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public IncomingHeadersProcessor(byte[] serialized) {
throw new IllegalArgumentException(SERIALIZED_HEADER_CANNOT_BE_NULL_OR_EMPTY);
}

// is tis the correct version
// is this the correct version
for (int x = 0; x < HEADER_VERSION_BYTES_LEN; x++) {
if (serialized[x] != HEADER_VERSION_BYTES[x]) {
throw new IllegalArgumentException(INVALID_HEADER_VERSION);
Expand All @@ -42,17 +42,15 @@ public IncomingHeadersProcessor(byte[] serialized) {
Token terminus = new Token(serialized, serializedLength, serializedLength - 2, TokenType.CRLF);
Token token = new Token(serialized, serializedLength, HEADER_VERSION_BYTES_LEN, null);

boolean hadStatus = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable was never used in the initHeader function that it was passed to and there are no other uses, so it was removed.

if (token.isType(TokenType.SPACE)) {
token = initStatus(serialized, serializedLength, token);
if (token.samePoint(terminus)) {
return; // status only
}
hadStatus = true;
}

if (token.isType(TokenType.CRLF)) {
initHeader(serialized, serializedLength, token, hadStatus);
initHeader(serialized, serializedLength, token);
}
else {
throw new IllegalArgumentException(INVALID_HEADER_COMPOSITION);
Expand All @@ -71,7 +69,7 @@ public Status getStatus() {
return inlineStatus;
}

private void initHeader(byte[] serialized, int len, Token tCrlf, boolean hadStatus) {
private void initHeader(byte[] serialized, int len, Token tCrlf) {
// REGULAR HEADER
Token peek = new Token(serialized, len, tCrlf, null);
while (peek.isType(TokenType.TEXT)) {
Expand All @@ -90,7 +88,7 @@ private void initHeader(byte[] serialized, int len, Token tCrlf, boolean hadStat
if (headers == null) {
headers = new Headers();
}
headers.add(tKey.getValue(), tVal.getValue());
headers.add(tKey.getValueCheckKnownKeys(), tVal.getValue());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the new function to get the key, checking against known keys.

peek = new Token(serialized, len, tCrlf, null);
}
peek.mustBe(TokenType.CRLF);
Expand Down
29 changes: 24 additions & 5 deletions src/main/java/io/nats/client/support/NatsJetStreamConstants.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.nats.client.support;

import static java.nio.charset.StandardCharsets.US_ASCII;

public interface NatsJetStreamConstants {
/**
* The maximum pull size [NO LONGER ENFORCED]
Expand Down Expand Up @@ -98,9 +100,13 @@ public interface NatsJetStreamConstants {

String LAST_CONSUMER_HDR = "Nats-Last-Consumer";
String LAST_STREAM_HDR = "Nats-Last-Stream";
String CONSUMER_STALLED_HDR = "Nats-Consumer-Stalled";
String MSG_SIZE_HDR = "Nats-Msg-Size";
String NATS_MARKER_REASON_HDR = "Nats-Marker-Reason";

String CONSUMER_STALLED_HDR = "Nats-Consumer-Stalled";
String MSG_SIZE_HDR = "Nats-Msg-Size";
String NATS_MARKER_REASON_HDR = "Nats-Marker-Reason";
byte[] CONSUMER_STALLED_HDR_BYTES = CONSUMER_STALLED_HDR.getBytes(US_ASCII);
byte[] MSG_SIZE_HDR_BYTES = MSG_SIZE_HDR.getBytes(US_ASCII);
byte[] NATS_MARKER_REASON_HDR_BYTES = NATS_MARKER_REASON_HDR.getBytes(US_ASCII);

String ROLLUP_HDR = "Nats-Rollup";
String ROLLUP_HDR_SUBJECT = "sub";
Expand All @@ -114,8 +120,21 @@ public interface NatsJetStreamConstants {
String NATS_NUM_PENDING = "Nats-Num-Pending";
String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE, NATS_NUM_PENDING};

String NATS_PENDING_MESSAGES = "Nats-Pending-Messages";
String NATS_PENDING_BYTES = "Nats-Pending-Bytes";
// bytes used for faster matching and less string allocation when
byte[] NATS_STREAM_BYTES = NATS_STREAM.getBytes(US_ASCII);
byte[] NATS_SEQUENCE_BYTES = NATS_SEQUENCE.getBytes(US_ASCII);
byte[] NATS_TIMESTAMP_BYTES = NATS_TIMESTAMP.getBytes(US_ASCII);
byte[] NATS_SUBJECT_BYTES = NATS_SUBJECT.getBytes(US_ASCII);
byte[] NATS_LAST_SEQUENCE_BYTES = NATS_LAST_SEQUENCE.getBytes(US_ASCII);
byte[] NATS_NUM_PENDING_BYTES = NATS_NUM_PENDING.getBytes(US_ASCII);

String NATS_PENDING_MESSAGES = "Nats-Pending-Messages";
String NATS_PENDING_BYTES = "Nats-Pending-Bytes";
byte[] NATS_PENDING_MESSAGES_BYTES = NATS_PENDING_MESSAGES.getBytes(US_ASCII);
byte[] NATS_PENDING_BYTES_BYTES = NATS_PENDING_BYTES.getBytes(US_ASCII);

String KV_OPERATION_HEADER_KEY = "KV-Operation";
byte[] KV_OPERATION_HEADER_KEY_BYTES = KV_OPERATION_HEADER_KEY.getBytes(US_ASCII);

String NATS_SCHEDULE_HDR = "Nats-Schedule";
String NATS_SCHEDULE_TARGET_HDR = "Nats-Schedule-Target";
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/support/NatsKeyValueUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private NatsKeyValueUtil() {} /* ensures cannot be constructed */
public static final int KV_STREAM_PREFIX_LEN = KV_STREAM_PREFIX.length();
public static final String KV_SUBJECT_PREFIX = "$KV.";
public static final String KV_SUBJECT_SUFFIX = ".>";
public static final String KV_OPERATION_HEADER_KEY = "KV-Operation";
public static final String KV_OPERATION_HEADER_KEY = NatsJetStreamConstants.KV_OPERATION_HEADER_KEY;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting in NatsJetStreamConstants to be able to not depend on KV in non kv code


@NonNull
public static String extractBucketName(String streamName) {
Expand Down
79 changes: 53 additions & 26 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

public class Status {

public static final String FLOW_CONTROL_TEXT = "FlowControl Request";
public static final String HEARTBEAT_TEXT = "Idle Heartbeat";
public static final String FLOW_CONTROL_TEXT = "FlowControl Request";
public static final String HEARTBEAT_TEXT = "Idle Heartbeat";
public static final String NO_RESPONDERS_TEXT = "No Responders Available For Request";
public static final String EOB_TEXT = "EOB";
public static final String EOB_TEXT = "EOB";
public static final byte[] FLOW_CONTROL_TEXT_BYTES = FLOW_CONTROL_TEXT.getBytes();
public static final byte[] HEARTBEAT_TEXT_BYTES = HEARTBEAT_TEXT.getBytes();
public static final byte[] NO_RESPONDERS_TEXT_BYTES = NO_RESPONDERS_TEXT.getBytes();
public static final byte[] EOB_TEXT_BYTES = EOB_TEXT.getBytes();

public static final int FLOW_OR_HEARTBEAT_STATUS_CODE = 100;
public static final int NO_RESPONDERS_CODE = 503;
public static final int BAD_REQUEST_CODE = 400;
Expand All @@ -31,24 +36,46 @@ public class Status {
public static final int CONFLICT_CODE = 409;
public static final int EOB_CODE = 204;

public static final byte[] FLOW_OR_HEARTBEAT_STATUS_CODE_BYTES = ("" + FLOW_OR_HEARTBEAT_STATUS_CODE).getBytes();
public static final byte[] NO_RESPONDERS_CODE_BYTES = ("" + NO_RESPONDERS_CODE).getBytes();
public static final byte[] BAD_REQUEST_CODE_BYTES = ("" + BAD_REQUEST_CODE).getBytes();
public static final byte[] NOT_FOUND_CODE_BYTES = ("" + NOT_FOUND_CODE).getBytes();
public static final byte[] BAD_JS_REQUEST_CODE_BYTES = ("" + BAD_JS_REQUEST_CODE).getBytes();
public static final byte[] CONFLICT_CODE_BYTES = ("" + CONFLICT_CODE).getBytes();
public static final byte[] EOB_CODE_BYTES = ("" + EOB_CODE).getBytes();

// TODO - PINNED CONSUMER SUPPORT
// public static final int PIN_ERROR_CODE = 423;

public static String BAD_REQUEST = "Bad Request"; // 400
public static String NO_MESSAGES = "No Messages"; // 404
public static String CONSUMER_DELETED = "Consumer Deleted"; // 409
public static String CONSUMER_IS_PUSH_BASED = "Consumer is push based"; // 409

public static String MESSAGE_SIZE_EXCEEDS_MAX_BYTES = "Message Size Exceeds MaxBytes"; // 409
public static String EXCEEDED_MAX_PREFIX = "Exceeded Max";
public static String EXCEEDED_MAX_WAITING = "Exceeded MaxWaiting"; // 409
public static String EXCEEDED_MAX_REQUEST_BATCH = "Exceeded MaxRequestBatch"; // 409
public static String EXCEEDED_MAX_REQUEST_EXPIRES = "Exceeded MaxRequestExpires"; // 409
public static String EXCEEDED_MAX_REQUEST_MAX_BYTES = "Exceeded MaxRequestMaxBytes"; // 409

public static String BATCH_COMPLETED = "Batch Completed"; // 409 informational
public static String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers
public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409
// public static final byte[] PIN_ERROR_CODE_BYTES = ("" + PIN_ERROR_CODE).getBytes();

public static final String BAD_REQUEST = "Bad Request"; // 400
public static final String NO_MESSAGES = "No Messages"; // 404
public static final String CONSUMER_DELETED = "Consumer Deleted"; // 409
public static final String CONSUMER_IS_PUSH_BASED = "Consumer is push based"; // 409
public static final byte[] BAD_REQUEST_BYTES = BAD_REQUEST.getBytes();
public static final byte[] NO_MESSAGES_BYTES = NO_MESSAGES.getBytes();
public static final byte[] CONSUMER_DELETED_BYTES = CONSUMER_DELETED.getBytes();
public static final byte[] CONSUMER_IS_PUSH_BASED_BYTES = CONSUMER_IS_PUSH_BASED.getBytes();

public static final String MESSAGE_SIZE_EXCEEDS_MAX_BYTES = "Message Size Exceeds MaxBytes"; // 409
public static final String EXCEEDED_MAX_PREFIX = "Exceeded Max";
public static final String EXCEEDED_MAX_WAITING = "Exceeded MaxWaiting"; // 409
public static final String EXCEEDED_MAX_REQUEST_BATCH = "Exceeded MaxRequestBatch"; // 409
public static final String EXCEEDED_MAX_REQUEST_EXPIRES = "Exceeded MaxRequestExpires"; // 409
public static final String EXCEEDED_MAX_REQUEST_MAX_BYTES = "Exceeded MaxRequestMaxBytes"; // 409
public static final byte[] MESSAGE_SIZE_EXCEEDS_MAX_BYTES_BYTES = MESSAGE_SIZE_EXCEEDS_MAX_BYTES.getBytes();
public static final byte[] EXCEEDED_MAX_PREFIX_BYTES = EXCEEDED_MAX_PREFIX.getBytes();
public static final byte[] EXCEEDED_MAX_WAITING_BYTES = EXCEEDED_MAX_WAITING.getBytes();
public static final byte[] EXCEEDED_MAX_REQUEST_BATCH_BYTES = EXCEEDED_MAX_REQUEST_BATCH.getBytes();
public static final byte[] EXCEEDED_MAX_REQUEST_EXPIRES_BYTES = EXCEEDED_MAX_REQUEST_EXPIRES.getBytes();
public static final byte[] EXCEEDED_MAX_REQUEST_MAX_BYTES_BYTES = EXCEEDED_MAX_REQUEST_MAX_BYTES.getBytes();

public static final String BATCH_COMPLETED = "Batch Completed"; // 409 informational
public static final String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers
public static final String LEADERSHIP_CHANGE = "Leadership Change"; // 409
public static final byte[] BATCH_COMPLETED_BYTES = BATCH_COMPLETED.getBytes();
public static final byte[] SERVER_SHUTDOWN_BYTES = SERVER_SHUTDOWN.getBytes();
public static final byte[] LEADERSHIP_CHANGE_BYTES = LEADERSHIP_CHANGE.getBytes();

public static final Status EOB = new Status(EOB_CODE, EOB_TEXT);
public static final Status TIMEOUT_OR_NO_MESSAGES = new Status(NOT_FOUND_CODE, "Timeout or No Messages");
Expand All @@ -69,7 +96,7 @@ public Status(int code, String message) {
}

public Status(Token codeToken, Token messageToken) {
this(extractCode(codeToken), extractMessage(messageToken));
this(extractCode(codeToken), messageToken.getValueCheckKnownStatuses());
}

public int getCode() {
Expand All @@ -84,15 +111,15 @@ public String getMessageWithCode() {
return code + " " + message;
}

private static String extractMessage(Token messageToken) {
return messageToken.hasValue() ? messageToken.getValue() : null;
}

private static int extractCode(Token codeToken) {
try {
return Integer.parseInt(codeToken.getValue());
Integer i = codeToken.getIntValue();
if (i == null) {
throw new IllegalArgumentException(NatsConstants.INVALID_HEADER_STATUS_CODE);
}
return i;
}
catch (Exception e) {
catch (NumberFormatException e) {
throw new IllegalArgumentException(NatsConstants.INVALID_HEADER_STATUS_CODE);
}
}
Expand Down
Loading
Loading