Skip to content

Commit 06699ee

Browse files
authored
Merge pull request #1472 from nats-io/pinned-consumer-support
Pinned Consumer Support
2 parents feaf53c + 2a677ee commit 06699ee

19 files changed

+282
-309
lines changed

src/main/java/io/nats/client/BaseConsumerContext.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,13 @@ public interface BaseConsumerContext {
183183
@NonNull
184184
MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, @Nullable Dispatcher dispatcher, @NonNull MessageHandler handler) throws IOException, JetStreamApiException;
185185

186-
// TODO - PINNED CONSUMER SUPPORT
187-
// /**
188-
// * Unpins this consumer
189-
// * @param group the group name of the consumer's group
190-
// * @throws IOException covers various communication issues with the NATS
191-
// * server such as timeout or interruption
192-
// * @throws JetStreamApiException the request had an error related to the data
193-
// * @return true if the delete succeeded
194-
// */
195-
// boolean unpin(String group) throws IOException, JetStreamApiException;
186+
/**
187+
* Unpins this consumer
188+
* @param group the group name of the consumer's group
189+
* @throws IOException covers various communication issues with the NATS
190+
* server such as timeout or interruption
191+
* @throws JetStreamApiException the request had an error related to the data
192+
* @return true if the delete succeeded
193+
*/
194+
boolean unpin(String group) throws IOException, JetStreamApiException;
196195
}

src/main/java/io/nats/client/JetStreamManagement.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -357,18 +357,17 @@ public interface JetStreamManagement {
357357
*/
358358
boolean deleteMessage(String streamName, long seq, boolean erase) throws IOException, JetStreamApiException;
359359

360-
// TODO - PINNED CONSUMER SUPPORT
361-
// /**
362-
// * Unpins a consumer
363-
// * @param streamName name of the stream
364-
// * @param consumerName name of consumer
365-
// * @param consumerGroup name of the consumer's group
366-
// * @throws IOException covers various communication issues with the NATS
367-
// * server such as timeout or interruption
368-
// * @throws JetStreamApiException the request had an error related to the data
369-
// * @return true if the delete succeeded
370-
// */
371-
// boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;
360+
/**
361+
* Unpins a consumer
362+
* @param streamName name of the stream
363+
* @param consumerName name of consumer
364+
* @param consumerGroup name of the consumer's group
365+
* @throws IOException covers various communication issues with the NATS
366+
* server such as timeout or interruption
367+
* @throws JetStreamApiException the request had an error related to the data
368+
* @return true if the delete succeeded
369+
*/
370+
boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;
372371

373372
/**
374373
* Gets a context for publishing and subscribing to subjects backed by Jetstream streams

src/main/java/io/nats/client/PullRequestOptions.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,15 @@ public String toJson() {
6060
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
6161
JsonUtils.addField(sb, GROUP, group);
6262
JsonUtils.addFieldWhenGtZero(sb, PRIORITY, priority);
63-
// TODO - PINNED CONSUMER SUPPORT
64-
// JsonUtils.addField(sb, ID, getPinId());
63+
JsonUtils.addField(sb, ID, getPinId());
6564
JsonUtils.addField(sb, MIN_PENDING, minPending);
6665
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
6766
return JsonUtils.endJson(sb).toString();
6867
}
6968

70-
// TODO - PINNED CONSUMER SUPPORT
71-
// protected String getPinId() {
72-
// return null;
73-
// }
69+
protected String getPinId() {
70+
return null;
71+
}
7472

7573
/**
7674
* Get the batch size option value

src/main/java/io/nats/client/api/PriorityGroupState.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,30 @@
1616
import io.nats.client.support.JsonValue;
1717
import io.nats.client.support.JsonValueUtils;
1818
import org.jspecify.annotations.NonNull;
19+
import org.jspecify.annotations.Nullable;
1920

21+
import java.time.ZonedDateTime;
2022
import java.util.List;
2123

22-
import static io.nats.client.support.ApiConstants.GROUP;
24+
import static io.nats.client.support.ApiConstants.*;
25+
import static io.nats.client.support.JsonValueUtils.readDate;
2326

2427
/**
2528
* Status of a specific consumer priority group
2629
*/
2730
public class PriorityGroupState {
2831
private final String group;
29-
30-
// TODO - PINNED CONSUMER SUPPORT
31-
// private final String pinnedClientId;
32-
// private final ZonedDateTime pinnedTime;
32+
private final String pinnedClientId;
33+
private final ZonedDateTime pinnedTime;
3334

3435
static List<PriorityGroupState> optionalListOf(JsonValue vpgStates) {
3536
return JsonValueUtils.optionalListOf(vpgStates, PriorityGroupState::new);
3637
}
3738

3839
PriorityGroupState(JsonValue vpgState) {
3940
group = JsonValueUtils.readString(vpgState, GROUP);
40-
// TODO - PINNED CONSUMER SUPPORT
41-
// pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID);
42-
// pinnedTime = readDate(vpgState, PINNED_TS);
41+
pinnedClientId = JsonValueUtils.readString(vpgState, PINNED_CLIENT_ID);
42+
pinnedTime = readDate(vpgState, PINNED_TS);
4343
}
4444

4545
/**
@@ -51,33 +51,30 @@ public String getGroup() {
5151
return group;
5252
}
5353

54-
// TODO - PINNED CONSUMER SUPPORT
55-
// /**
56-
// * The generated ID of the pinned client
57-
// * @return the id
58-
// */
59-
// @Nullable
60-
// public String getPinnedClientId() {
61-
// return pinnedClientId;
62-
// }
54+
/**
55+
* The generated ID of the pinned client
56+
* @return the id
57+
*/
58+
@Nullable
59+
public String getPinnedClientId() {
60+
return pinnedClientId;
61+
}
6362

64-
// TODO - PINNED CONSUMER SUPPORT
65-
// /**
66-
// * The timestamp when the client was pinned
67-
// * @return the timestamp
68-
// */
69-
// @Nullable
70-
// public ZonedDateTime getPinnedTime() {
71-
// return pinnedTime;
72-
// }
63+
/**
64+
* The timestamp when the client was pinned
65+
* @return the timestamp
66+
*/
67+
@Nullable
68+
public ZonedDateTime getPinnedTime() {
69+
return pinnedTime;
70+
}
7371

7472
@Override
7573
public String toString() {
7674
return "PriorityGroupState{" +
7775
"group='" + group + '\'' +
78-
// TODO - PINNED CONSUMER SUPPORT
79-
// ", pinnedClientId='" + pinnedClientId + '\'' +
80-
// ", pinnedTime=" + pinnedTime +
76+
", pinnedClientId='" + pinnedClientId + '\'' +
77+
", pinnedTime=" + pinnedTime +
8178
'}';
8279
}
8380
}

src/main/java/io/nats/client/api/PriorityPolicy.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@
2424
public enum PriorityPolicy {
2525
None("none"),
2626
Overflow("overflow"),
27-
Prioritized("prioritized");
28-
29-
// TODO - PINNED CONSUMER SUPPORT
30-
// PinnedClient("pinned_client")
27+
Prioritized("prioritized"),
28+
PinnedClient("pinned_client");
3129

3230
private final String policy;
3331

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,14 @@ protected void trackJsMessage(Message msg) {
9797
NatsJetStreamMetaData meta = msg.metaData();
9898
lastStreamSeq = meta.streamSequence();
9999
lastConsumerSeq++;
100-
// TODO - PINNED CONSUMER SUPPORT
101-
// subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
100+
subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
102101
}
103102
finally {
104103
stateChangeLock.unlock();
105104
}
106105
}
107106

108-
// TODO - PINNED CONSUMER SUPPORT
109-
// protected void subTrackJsMessage(Message msg) {}
107+
protected void subTrackJsMessage(Message msg) {}
110108

111109
protected void handleHeartbeatError() {
112110
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.nats.client.api.ConsumerConfiguration;
1818
import io.nats.client.api.ConsumerInfo;
1919
import io.nats.client.api.OrderedConsumerConfiguration;
20+
import io.nats.client.api.PriorityPolicy;
2021
import io.nats.client.support.Validator;
2122
import org.jspecify.annotations.NonNull;
2223
import org.jspecify.annotations.Nullable;
@@ -144,15 +145,14 @@ private void checkState() throws IOException {
144145
}
145146
}
146147

147-
// TODO - PINNED CONSUMER SUPPORT
148-
// private void checkNotPinned(String label) throws IOException {
149-
// ConsumerInfo ci = cachedConsumerInfo.get();
150-
// if (ci != null) {
151-
// if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) {
152-
// throw new IOException("Pinned not allowed with " + label);
153-
// }
154-
// }
155-
// }
148+
private void checkNotPinned(String label) throws IOException {
149+
ConsumerInfo ci = cachedConsumerInfo.get();
150+
if (ci != null) {
151+
if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) {
152+
throw new IOException("Pinned not allowed with " + label);
153+
}
154+
}
155+
}
156156

157157
private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) {
158158
lastConsumer.set(con);
@@ -222,8 +222,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
222222
try {
223223
stateLock.lock();
224224
checkState();
225-
// TODO - PINNED CONSUMER SUPPORT
226-
// checkNotPinned("Next");
225+
checkNotPinned("Next");
227226

228227
try {
229228
long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
@@ -292,8 +291,7 @@ public FetchConsumer fetch(@NonNull FetchConsumeOptions fetchConsumeOptions) thr
292291
try {
293292
stateLock.lock();
294293
checkState();
295-
// TODO - PINNED CONSUMER SUPPORT
296-
// checkNotPinned("Fetch");
294+
checkNotPinned("Fetch");
297295
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
298296
}
299297
finally {
@@ -378,17 +376,16 @@ public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions,
378376
}
379377
}
380378

381-
// TODO - PINNED CONSUMER SUPPORT
382-
// @Override
383-
// public boolean unpin(String group) throws IOException, JetStreamApiException {
384-
// String name = consumerName.get();
385-
// if (name == null) {
386-
// ConsumerInfo ci = cachedConsumerInfo.get();
387-
// if (ci == null) {
388-
// ci = getConsumerInfo();
389-
// }
390-
// name = ci.getName();
391-
// }
392-
// return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group);
393-
// }
379+
@Override
380+
public boolean unpin(String group) throws IOException, JetStreamApiException {
381+
String name = consumerName.get();
382+
if (name == null) {
383+
ConsumerInfo ci = cachedConsumerInfo.get();
384+
if (ci == null) {
385+
ci = getConsumerInfo();
386+
}
387+
name = ci.getName();
388+
}
389+
return streamCtx.jsm.unpinConsumer(streamCtx.streamName, name, group);
390+
}
394391
}

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,16 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
4848
inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait
4949
}
5050

51-
// TODO - PINNED CONSUMER SUPPORT
52-
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
53-
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
54-
.maxBytes(fetchConsumeOptions.getMaxBytes())
55-
.expiresIn(expiresInMillis)
56-
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
57-
.noWait(isNoWait)
58-
.group(fetchConsumeOptions.getGroup())
59-
.priority(fetchConsumeOptions.getPriority())
60-
.minPending(fetchConsumeOptions.getMinPending())
61-
.minAckPending(fetchConsumeOptions.getMinAckPending())
62-
.build();
51+
PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm == null ? null : pmm.currentPinId,
52+
PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
53+
.maxBytes(fetchConsumeOptions.getMaxBytes())
54+
.expiresIn(expiresInMillis)
55+
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
56+
.noWait(isNoWait)
57+
.group(fetchConsumeOptions.getGroup())
58+
.priority(fetchConsumeOptions.getPriority())
59+
.minPending(fetchConsumeOptions.getMinPending())
60+
.minAckPending(fetchConsumeOptions.getMinAckPending()));
6361
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold), false);
6462
pullSubject = sub._pull(pro, fetchConsumeOptions.raiseStatusWarnings(), this);
6563
startNanos = -1;

src/main/java/io/nats/client/impl/NatsJetStreamManagement.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -362,20 +362,19 @@ public boolean deleteMessage(String streamName, long seq, boolean erase) throws
362362
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
363363
}
364364

365-
// TODO - PINNED CONSUMER SUPPORT
366-
// /**
367-
// * {@inheritDoc}
368-
// */
369-
// @Override
370-
// public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException {
371-
// validateNotNull(streamName, "Stream Name");
372-
// validateNotNull(consumerName, "Consumer Name");
373-
// validateNotNull(consumerGroup, "Consumer Group");
374-
// String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName);
375-
// byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes();
376-
// Message resp = makeRequestResponseRequired(subj, payload, getTimeout());
377-
// return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
378-
// }
365+
/**
366+
* {@inheritDoc}
367+
*/
368+
@Override
369+
public boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException {
370+
validateNotNull(streamName, "Stream Name");
371+
validateNotNull(consumerName, "Consumer Name");
372+
validateNotNull(consumerGroup, "Consumer Group");
373+
String subj = String.format(JSAPI_CONSUMER_UNPIN, streamName, consumerName);
374+
byte[] payload = String.format("{\"group\": \"%s\"}", consumerGroup).getBytes();
375+
Message resp = makeRequestResponseRequired(subj, payload, getTimeout());
376+
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
377+
}
379378

380379
/**
381380
* {@inheritDoc}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,17 +104,15 @@ else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.p
104104
private void repull() {
105105
int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
106106
long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
107-
// TODO - PINNED CONSUMER SUPPORT
108-
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
109-
PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
110-
.maxBytes(rePullBytes)
111-
.expiresIn(consumeOpts.getExpiresInMillis())
112-
.idleHeartbeat(consumeOpts.getIdleHeartbeat())
113-
.group(consumeOpts.getGroup())
114-
.priority(consumeOpts.getPriority())
115-
.minPending(consumeOpts.getMinPending())
116-
.minAckPending(consumeOpts.getMinAckPending())
117-
.build();
107+
PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
108+
PullRequestOptions.builder(rePullMessages)
109+
.maxBytes(rePullBytes)
110+
.expiresIn(consumeOpts.getExpiresInMillis())
111+
.idleHeartbeat(consumeOpts.getIdleHeartbeat())
112+
.group(consumeOpts.getGroup())
113+
.priority(consumeOpts.getPriority())
114+
.minPending(consumeOpts.getMinPending())
115+
.minAckPending(consumeOpts.getMinAckPending()));
118116
sub._pull(pro, consumeOpts.raiseStatusWarnings(), this);
119117
}
120118
}

0 commit comments

Comments
 (0)