Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public IncomingHeadersProcessor(byte[] serialized) {
throw new IllegalArgumentException(SERIALIZED_HEADER_CANNOT_BE_NULL_OR_EMPTY);
}

// is tis the correct version
// is this the correct version
for (int x = 0; x < HEADER_VERSION_BYTES_LEN; x++) {
if (serialized[x] != HEADER_VERSION_BYTES[x]) {
throw new IllegalArgumentException(INVALID_HEADER_VERSION);
Expand All @@ -42,17 +42,15 @@ public IncomingHeadersProcessor(byte[] serialized) {
Token terminus = new Token(serialized, serializedLength, serializedLength - 2, TokenType.CRLF);
Token token = new Token(serialized, serializedLength, HEADER_VERSION_BYTES_LEN, null);

boolean hadStatus = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable was never used in the initHeader function that it was passed to and there are no other uses, so it was removed.

if (token.isType(TokenType.SPACE)) {
token = initStatus(serialized, serializedLength, token);
if (token.samePoint(terminus)) {
return; // status only
}
hadStatus = true;
}

if (token.isType(TokenType.CRLF)) {
initHeader(serialized, serializedLength, token, hadStatus);
initHeader(serialized, serializedLength, token);
}
else {
throw new IllegalArgumentException(INVALID_HEADER_COMPOSITION);
Expand All @@ -71,7 +69,7 @@ public Status getStatus() {
return inlineStatus;
}

private void initHeader(byte[] serialized, int len, Token tCrlf, boolean hadStatus) {
private void initHeader(byte[] serialized, int len, Token tCrlf) {
// REGULAR HEADER
Token peek = new Token(serialized, len, tCrlf, null);
while (peek.isType(TokenType.TEXT)) {
Expand All @@ -90,7 +88,7 @@ private void initHeader(byte[] serialized, int len, Token tCrlf, boolean hadStat
if (headers == null) {
headers = new Headers();
}
headers.add(tKey.getValue(), tVal.getValue());
headers.add(tKey.getValueCheckKnownKeys(), tVal.getValue());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the new function to get the key, checking against known keys.

peek = new Token(serialized, len, tCrlf, null);
}
peek.mustBe(TokenType.CRLF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public interface NatsJetStreamConstants {
String NATS_NUM_PENDING = "Nats-Num-Pending";
String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE, NATS_NUM_PENDING};

// bytes used for faster matching and less string allocation when
byte[] NATS_STREAM_BYTES = NATS_STREAM.getBytes();
byte[] NATS_SEQUENCE_BYTES = NATS_SEQUENCE.getBytes();
byte[] NATS_TIMESTAMP_BYTES = NATS_TIMESTAMP.getBytes();
byte[] NATS_SUBJECT_BYTES = NATS_SUBJECT.getBytes();
byte[] NATS_LAST_SEQUENCE_BYTES = NATS_LAST_SEQUENCE.getBytes();
byte[] NATS_NUM_PENDING_BYTES = NATS_NUM_PENDING.getBytes();

String NATS_PENDING_MESSAGES = "Nats-Pending-Messages";
String NATS_PENDING_BYTES = "Nats-Pending-Bytes";

Expand Down
47 changes: 37 additions & 10 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

public class Status {

public static final String FLOW_CONTROL_TEXT = "FlowControl Request";
public static final String HEARTBEAT_TEXT = "Idle Heartbeat";
public static final String FLOW_CONTROL_TEXT = "FlowControl Request";
public static final String HEARTBEAT_TEXT = "Idle Heartbeat";
public static final String NO_RESPONDERS_TEXT = "No Responders Available For Request";
public static final String EOB_TEXT = "EOB";
public static final String EOB_TEXT = "EOB";
public static byte[] FLOW_CONTROL_TEXT_BYTES = FLOW_CONTROL_TEXT.getBytes();
public static byte[] HEARTBEAT_TEXT_BYTES = HEARTBEAT_TEXT.getBytes();
public static byte[] NO_RESPONDERS_TEXT_BYTES = NO_RESPONDERS_TEXT.getBytes();
public static byte[] EOB_TEXT_BYTES = EOB_TEXT.getBytes();

public static final int FLOW_OR_HEARTBEAT_STATUS_CODE = 100;
public static final int NO_RESPONDERS_CODE = 503;
public static final int BAD_REQUEST_CODE = 400;
Expand All @@ -31,24 +36,46 @@ public class Status {
public static final int CONFLICT_CODE = 409;
public static final int EOB_CODE = 204;

public static final byte[] FLOW_OR_HEARTBEAT_STATUS_CODE_BYTES = ("" + FLOW_OR_HEARTBEAT_STATUS_CODE).getBytes();
public static final byte[] NO_RESPONDERS_CODE_BYTES = ("" + NO_RESPONDERS_CODE).getBytes();
public static final byte[] BAD_REQUEST_CODE_BYTES = ("" + BAD_REQUEST_CODE).getBytes();
public static final byte[] NOT_FOUND_CODE_BYTES = ("" + NOT_FOUND_CODE).getBytes();
public static final byte[] BAD_JS_REQUEST_CODE_BYTES = ("" + BAD_JS_REQUEST_CODE).getBytes();
public static final byte[] CONFLICT_CODE_BYTES = ("" + CONFLICT_CODE).getBytes();
public static final byte[] EOB_CODE_BYTES = ("" + EOB_CODE).getBytes();

// TODO - PINNED CONSUMER SUPPORT
// public static final int PIN_ERROR_CODE = 423;
// public static final byte[] PIN_ERROR_CODE_BYTES = ("" + PIN_ERROR_CODE).getBytes();

public static String BAD_REQUEST = "Bad Request"; // 400
public static String NO_MESSAGES = "No Messages"; // 404
public static String CONSUMER_DELETED = "Consumer Deleted"; // 409
public static String CONSUMER_IS_PUSH_BASED = "Consumer is push based"; // 409
public static byte[] BAD_REQUEST_BYTES = BAD_REQUEST.getBytes();
public static byte[] NO_MESSAGES_BYTES = NO_MESSAGES.getBytes();
public static byte[] CONSUMER_DELETED_BYTES = CONSUMER_DELETED.getBytes();
public static byte[] CONSUMER_IS_PUSH_BASED_BYTES = CONSUMER_IS_PUSH_BASED.getBytes();

public static String MESSAGE_SIZE_EXCEEDS_MAX_BYTES = "Message Size Exceeds MaxBytes"; // 409
public static String EXCEEDED_MAX_PREFIX = "Exceeded Max";
public static String EXCEEDED_MAX_WAITING = "Exceeded MaxWaiting"; // 409
public static String EXCEEDED_MAX_REQUEST_BATCH = "Exceeded MaxRequestBatch"; // 409
public static String EXCEEDED_MAX_REQUEST_EXPIRES = "Exceeded MaxRequestExpires"; // 409
public static String EXCEEDED_MAX_REQUEST_MAX_BYTES = "Exceeded MaxRequestMaxBytes"; // 409
public static byte[] MESSAGE_SIZE_EXCEEDS_MAX_BYTES_BYTES = MESSAGE_SIZE_EXCEEDS_MAX_BYTES.getBytes();
public static byte[] EXCEEDED_MAX_PREFIX_BYTES = EXCEEDED_MAX_PREFIX.getBytes();
public static byte[] EXCEEDED_MAX_WAITING_BYTES = EXCEEDED_MAX_WAITING.getBytes();
public static byte[] EXCEEDED_MAX_REQUEST_BATCH_BYTES = EXCEEDED_MAX_REQUEST_BATCH.getBytes();
public static byte[] EXCEEDED_MAX_REQUEST_EXPIRES_BYTES = EXCEEDED_MAX_REQUEST_EXPIRES.getBytes();
public static byte[] EXCEEDED_MAX_REQUEST_MAX_BYTES_BYTES = EXCEEDED_MAX_REQUEST_MAX_BYTES.getBytes();

public static String BATCH_COMPLETED = "Batch Completed"; // 409 informational
public static String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers
public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409
public static byte[] BATCH_COMPLETED_BYTES = BATCH_COMPLETED.getBytes();
public static byte[] SERVER_SHUTDOWN_BYTES = SERVER_SHUTDOWN.getBytes();
public static byte[] LEADERSHIP_CHANGE_BYTES = LEADERSHIP_CHANGE.getBytes();

public static final Status EOB = new Status(EOB_CODE, EOB_TEXT);
public static final Status TIMEOUT_OR_NO_MESSAGES = new Status(NOT_FOUND_CODE, "Timeout or No Messages");
Expand All @@ -69,7 +96,7 @@ public Status(int code, String message) {
}

public Status(Token codeToken, Token messageToken) {
this(extractCode(codeToken), extractMessage(messageToken));
this(extractCode(codeToken), messageToken.getValueCheckKnownStatuses());
}

public int getCode() {
Expand All @@ -84,15 +111,15 @@ public String getMessageWithCode() {
return code + " " + message;
}

private static String extractMessage(Token messageToken) {
return messageToken.hasValue() ? messageToken.getValue() : null;
}

private static int extractCode(Token codeToken) {
try {
return Integer.parseInt(codeToken.getValue());
Integer i = codeToken.getIntValue();
if (i == null) {
throw new IllegalArgumentException(NatsConstants.INVALID_HEADER_STATUS_CODE);
}
return i;
}
catch (Exception e) {
catch (NumberFormatException e) {
throw new IllegalArgumentException(NatsConstants.INVALID_HEADER_STATUS_CODE);
}
}
Expand Down
181 changes: 177 additions & 4 deletions src/main/java/io/nats/client/support/Token.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

package io.nats.client.support;

import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import static io.nats.client.support.NatsConstants.*;
import static io.nats.client.support.NatsJetStreamConstants.*;
import static io.nats.client.support.Status.*;
import static java.nio.charset.StandardCharsets.US_ASCII;

public class Token {
Expand All @@ -22,6 +27,7 @@ public class Token {
private final int start;
private int end;
private boolean hasValue;
private final int valueLength;

public Token(byte[] serialized, int len, Token prev, TokenType required) {
this(serialized, len, prev.end + (prev.type == TokenType.KEY ? 2 : 1), required);
Expand All @@ -48,11 +54,14 @@ public Token(byte[] serialized, int len, int cur, TokenType required) {
} else if (required == TokenType.CRLF || required == TokenType.SPACE) {
throw new IllegalArgumentException(INVALID_HEADER_COMPOSITION);
} else {
byte ender1 = CR;
byte ender2 = CR;
byte ender1;
byte ender2;
if (required == null || required == TokenType.TEXT) {
type = TokenType.TEXT;
} else if (required == TokenType.WORD) {
ender1 = CR;
ender2 = CR;
}
else if (required == TokenType.WORD) {
ender1 = SP;
ender2 = CR;
type = TokenType.WORD;
Expand All @@ -74,6 +83,7 @@ public Token(byte[] serialized, int len, int cur, TokenType required) {
}
hasValue = true;
}
valueLength = hasValue ? end - start + 1 : 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

holding length in a variable removes the need to calculate it repeatedly

}

private void mustBeCrlf(int len, int cur) {
Expand All @@ -96,8 +106,171 @@ public boolean hasValue() {
return hasValue;
}

@NonNull
public String getValue() {
return hasValue ? new String(serialized, start, end - start + 1, US_ASCII).trim() : EMPTY;
return hasValue ? valueAsString() : EMPTY;
}

private String valueAsString() {
return new String(serialized, start, valueLength, US_ASCII).trim();
}

@NonNull
public String getValueCheckKnownKeys() {
if (valueLength == 0) {
return EMPTY;
}
byte b = serialized[start];
if (b == 'N') {
if (valueEquals(NATS_STREAM_BYTES)) {
return NATS_STREAM;
}
if (valueEquals(NATS_SEQUENCE_BYTES)) {
return NATS_SEQUENCE;
}
if (valueEquals(NATS_TIMESTAMP_BYTES)) {
return NATS_TIMESTAMP;
}
if (valueEquals(NATS_SUBJECT_BYTES)) {
return NATS_SUBJECT;
}
if (valueEquals(NATS_LAST_SEQUENCE_BYTES)) {
return NATS_LAST_SEQUENCE;
}
if (valueEquals(NATS_NUM_PENDING_BYTES)) {
return NATS_NUM_PENDING;
}
}
return valueAsString();
}

@Nullable
public String getValueCheckKnownStatuses() {
if (valueLength == 0) {
return null;
}
byte b = serialized[start];
if (b == 'B') {
if (valueEquals(BATCH_COMPLETED_BYTES)) {
return BATCH_COMPLETED;
}
if (valueEquals(BAD_REQUEST_BYTES)) {
return BAD_REQUEST;
}
}
else if (b == 'E') {
if (valueEquals(EXCEEDED_MAX_PREFIX_BYTES)) {
return EXCEEDED_MAX_PREFIX;
}
if (valueEquals(EXCEEDED_MAX_WAITING_BYTES)) {
return EXCEEDED_MAX_WAITING;
}
if (valueEquals(EXCEEDED_MAX_REQUEST_BATCH_BYTES)) {
return EXCEEDED_MAX_REQUEST_BATCH;
}
if (valueEquals(EXCEEDED_MAX_REQUEST_EXPIRES_BYTES)) {
return EXCEEDED_MAX_REQUEST_EXPIRES;
}
if (valueEquals(EXCEEDED_MAX_REQUEST_MAX_BYTES_BYTES)) {
return EXCEEDED_MAX_REQUEST_MAX_BYTES;
}
if (valueEquals(EOB_TEXT_BYTES)) {
return EOB_TEXT;
}
}
else if (b == 'N') {
if (valueEquals(NO_RESPONDERS_TEXT_BYTES)) {
return NO_RESPONDERS_TEXT;
}
if (valueEquals(NO_MESSAGES_BYTES)) {
return NO_MESSAGES;
}
}
else if (b == 'F') {
if (valueEquals(FLOW_CONTROL_TEXT_BYTES)) {
return FLOW_CONTROL_TEXT;
}
}
else if (b == 'I') {
if (valueEquals(HEARTBEAT_TEXT_BYTES)) {
return HEARTBEAT_TEXT;
}
}
else if (b == 'M') {
if (valueEquals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES_BYTES)) {
return MESSAGE_SIZE_EXCEEDS_MAX_BYTES;
}
}
else if (b == 'L') {
if (valueEquals(LEADERSHIP_CHANGE_BYTES)) {
return LEADERSHIP_CHANGE;
}
}
else if (b == 'S') {
if (valueEquals(SERVER_SHUTDOWN_BYTES)) {
return SERVER_SHUTDOWN;
}
}
else if (b == 'C') {
if (valueEquals(CONSUMER_DELETED_BYTES)) {
return CONSUMER_DELETED;
}
if (valueEquals(CONSUMER_IS_PUSH_BASED_BYTES)) {
return CONSUMER_IS_PUSH_BASED;
}
}
return valueAsString();
}

public Integer getIntValue() throws NumberFormatException {
if (valueLength == 0) {
return null;
}
byte b = serialized[start];
if (b == '4') {
if (valueEquals(BAD_REQUEST_CODE_BYTES)) {
return BAD_REQUEST_CODE;
}
if (valueEquals(NOT_FOUND_CODE_BYTES)) {
return NOT_FOUND_CODE;
}
if (valueEquals(BAD_JS_REQUEST_CODE_BYTES)) {
return BAD_JS_REQUEST_CODE;
}
if (valueEquals(CONFLICT_CODE_BYTES)) {
return CONFLICT_CODE;
}
}
else if (b == '1') {
if (valueEquals(FLOW_OR_HEARTBEAT_STATUS_CODE_BYTES)) {
return FLOW_OR_HEARTBEAT_STATUS_CODE;
}
}
else if (b == '5') {
if (valueEquals(NO_RESPONDERS_CODE_BYTES)) {
return NO_RESPONDERS_CODE;
}
}
else if (b == '2') {
if (valueEquals(EOB_CODE_BYTES)) {
return EOB_CODE;
}
}
return Integer.parseInt(getValue());
}

public boolean valueEquals(byte @NonNull [] bytes) {
if (valueLength != bytes.length) {
return false;
}

for (int i = 0; i < bytes.length; i++) {
if (bytes[i] != serialized[i + start]) {
return false;
}
}

return true;
}

public boolean samePoint(Token token) {
Expand Down
Loading
Loading