Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/api/SourceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readValue;
import static io.nats.client.support.Validator.consumerFilterSubjectsAreEquivalent;
import static io.nats.client.support.Validator.listsAreEquivalent;

public abstract class SourceBase implements JsonSerializable {
private final String name;
Expand Down Expand Up @@ -194,7 +194,7 @@ public boolean equals(Object o) {
if (!Objects.equals(filterSubject, that.filterSubject))
return false;
if (!Objects.equals(external, that.external)) return false;
return consumerFilterSubjectsAreEquivalent(subjectTransforms, that.subjectTransforms);
return listsAreEquivalent(subjectTransforms, that.subjectTransforms);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ else if (!serverCC.getDeliverGroup().equals(settledDeliverGroup)) {
settledFilterSubjects = serverCC.getFilterSubjects();
}
}
else if (!consumerFilterSubjectsAreEquivalent(settledFilterSubjects, serverCC.getFilterSubjects())) {
else if (!listsAreEquivalent(settledFilterSubjects, serverCC.getFilterSubjects())) {
throw JsSubSubjectDoesNotMatchFilter.instance();
}

Expand Down Expand Up @@ -528,9 +528,9 @@ public List<String> getChanges(ConsumerConfiguration serverCc) {
if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); }
if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); }

if (backoff != null && !consumerFilterSubjectsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); }
if (backoff != null && !listsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); }
if (metadata != null && !mapsAreEquivalent(metadata, serverCcc.metadata)) { changes.add("metadata"); }
if (filterSubjects != null && !consumerFilterSubjectsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); }
if (filterSubjects != null && !listsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); }

// do not need to check Durable because the original is retrieved by the durable name

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public interface ApiConstants {
String FIRST_TS = "first_ts";
String FLOW_CONTROL = "flow_control";
String GO = "go";
String GROUP = "group";
String HDRS = "hdrs";
String HEADERS = "headers";
String HEADERS_ONLY = "headers_only";
Expand Down Expand Up @@ -117,6 +118,8 @@ public interface ApiConstants {
String MAX_STORAGE = "max_storage";
String MAX_STREAMS = "max_streams";
String MAX_WAITING = "max_waiting"; // this is correct! the meaning name is different than the field name
String MIN_PENDING = "min_pending";
String MIN_ACK_PENDING = "min_ack_pending";
String MEM_STORAGE = "mem_storage";
String MEMORY = "memory";
String MEMORY_MAX_STREAM_BYTES = "memory_max_stream_bytes";
Expand Down Expand Up @@ -153,6 +156,8 @@ public interface ApiConstants {
String PAUSE_UNTIL = "pause_until";
String PLACEMENT = "placement";
String PORT = "port";
String PRIORITY_GROUPS = "priority_groups";
String PRIORITY_POLICY = "priority_policy";
String PROCESSING_TIME = "processing_time";
String PROTO = "proto";
String PURGED = "purged";
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class Status {
public static final int NOT_FOUND_CODE = 404;
public static final int REQUEST_TIMEOUT_CODE = 408;
public static final int CONFLICT_CODE = 409;
public static final int EOB = 204;

public static String BAD_REQUEST = "Bad Request"; // 400
public static String NO_MESSAGES = "No Messages"; // 404
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/support/Validator.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ public static boolean isSemVer(String s) {
// This function tests filter subject equivalency
// It does not care what order and also assumes that there are no duplicates.
// From the server: consumer subject filters cannot overlap [10138]
public static <T> boolean consumerFilterSubjectsAreEquivalent(List<T> l1, List<T> l2)
public static <T> boolean listsAreEquivalent(List<T> l1, List<T> l2)
{
if (l1 == null || l1.isEmpty()) {
return l2 == null || l2.isEmpty();
Expand Down
82 changes: 41 additions & 41 deletions src/test/java/io/nats/client/support/ValidatorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,47 +560,47 @@ public void testConsumerFilterSubjectsAreEquivalent() {
List<String> l5 = null;
List<String> l6 = new ArrayList<>();

assertTrue(consumerFilterSubjectsAreEquivalent(l1, l1));
assertTrue(consumerFilterSubjectsAreEquivalent(l1, l2));
assertFalse(consumerFilterSubjectsAreEquivalent(l1, l3));
assertFalse(consumerFilterSubjectsAreEquivalent(l1, l4));
assertFalse(consumerFilterSubjectsAreEquivalent(l1, l5));
assertFalse(consumerFilterSubjectsAreEquivalent(l1, l6));

assertTrue(consumerFilterSubjectsAreEquivalent(l2, l1));
assertTrue(consumerFilterSubjectsAreEquivalent(l2, l2));
assertFalse(consumerFilterSubjectsAreEquivalent(l2, l3));
assertFalse(consumerFilterSubjectsAreEquivalent(l2, l4));
assertFalse(consumerFilterSubjectsAreEquivalent(l2, l5));
assertFalse(consumerFilterSubjectsAreEquivalent(l2, l6));

assertFalse(consumerFilterSubjectsAreEquivalent(l3, l1));
assertFalse(consumerFilterSubjectsAreEquivalent(l3, l2));
assertTrue(consumerFilterSubjectsAreEquivalent(l3, l3));
assertFalse(consumerFilterSubjectsAreEquivalent(l3, l4));
assertFalse(consumerFilterSubjectsAreEquivalent(l3, l5));
assertFalse(consumerFilterSubjectsAreEquivalent(l3, l6));

assertFalse(consumerFilterSubjectsAreEquivalent(l4, l1));
assertFalse(consumerFilterSubjectsAreEquivalent(l4, l2));
assertFalse(consumerFilterSubjectsAreEquivalent(l4, l3));
assertTrue(consumerFilterSubjectsAreEquivalent(l4, l4));
assertFalse(consumerFilterSubjectsAreEquivalent(l4, l5));
assertFalse(consumerFilterSubjectsAreEquivalent(l4, l6));

assertFalse(consumerFilterSubjectsAreEquivalent(l5, l1));
assertFalse(consumerFilterSubjectsAreEquivalent(l5, l2));
assertFalse(consumerFilterSubjectsAreEquivalent(l5, l3));
assertFalse(consumerFilterSubjectsAreEquivalent(l5, l4));
assertTrue(consumerFilterSubjectsAreEquivalent(l5, l5));
assertTrue(consumerFilterSubjectsAreEquivalent(l5, l6));

assertFalse(consumerFilterSubjectsAreEquivalent(l6, l1));
assertFalse(consumerFilterSubjectsAreEquivalent(l6, l2));
assertFalse(consumerFilterSubjectsAreEquivalent(l6, l3));
assertFalse(consumerFilterSubjectsAreEquivalent(l6, l4));
assertTrue(consumerFilterSubjectsAreEquivalent(l6, l5));
assertTrue(consumerFilterSubjectsAreEquivalent(l6, l6));
assertTrue(listsAreEquivalent(l1, l1));
assertTrue(listsAreEquivalent(l1, l2));
assertFalse(listsAreEquivalent(l1, l3));
assertFalse(listsAreEquivalent(l1, l4));
assertFalse(listsAreEquivalent(l1, l5));
assertFalse(listsAreEquivalent(l1, l6));

assertTrue(listsAreEquivalent(l2, l1));
assertTrue(listsAreEquivalent(l2, l2));
assertFalse(listsAreEquivalent(l2, l3));
assertFalse(listsAreEquivalent(l2, l4));
assertFalse(listsAreEquivalent(l2, l5));
assertFalse(listsAreEquivalent(l2, l6));

assertFalse(listsAreEquivalent(l3, l1));
assertFalse(listsAreEquivalent(l3, l2));
assertTrue(listsAreEquivalent(l3, l3));
assertFalse(listsAreEquivalent(l3, l4));
assertFalse(listsAreEquivalent(l3, l5));
assertFalse(listsAreEquivalent(l3, l6));

assertFalse(listsAreEquivalent(l4, l1));
assertFalse(listsAreEquivalent(l4, l2));
assertFalse(listsAreEquivalent(l4, l3));
assertTrue(listsAreEquivalent(l4, l4));
assertFalse(listsAreEquivalent(l4, l5));
assertFalse(listsAreEquivalent(l4, l6));

assertFalse(listsAreEquivalent(l5, l1));
assertFalse(listsAreEquivalent(l5, l2));
assertFalse(listsAreEquivalent(l5, l3));
assertFalse(listsAreEquivalent(l5, l4));
assertTrue(listsAreEquivalent(l5, l5));
assertTrue(listsAreEquivalent(l5, l6));

assertFalse(listsAreEquivalent(l6, l1));
assertFalse(listsAreEquivalent(l6, l2));
assertFalse(listsAreEquivalent(l6, l3));
assertFalse(listsAreEquivalent(l6, l4));
assertTrue(listsAreEquivalent(l6, l5));
assertTrue(listsAreEquivalent(l6, l6));
}

@Test
Expand Down
Loading