Skip to content

Commit ce70a3c

Browse files
authored
Merge branch 'main' into consumer-group-overflow
2 parents 804e230 + d263667 commit ce70a3c

File tree

4 files changed

+28
-24
lines changed

4 files changed

+28
-24
lines changed

src/main/java/io/nats/client/impl/NatsFeatureBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ protected void visitSubject(List<String> subjects, DeliverPolicy deliverPolicy,
8888
.configuration(ccb.build())
8989
.build();
9090

91-
Duration timeout = js.jso.getRequestTimeout();
91+
Duration timeout = js.getTimeout();
9292
JetStreamSubscription sub = js.subscribe(null, pso);
9393
try {
9494
boolean lastWasNull = false;

src/main/java/io/nats/client/impl/NatsJetStream.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,7 @@ private PublishAck publishSyncInternal(String subject, Headers headers, byte[] d
150150
return null;
151151
}
152152

153-
Duration timeout = options == null ? jso.getRequestTimeout() : options.getStreamTimeout();
154-
155-
Message resp = makeInternalRequestResponseRequired(subject, merged, data, timeout, CancelAction.COMPLETE, validateSubjectAndReplyTo);
153+
Message resp = makeInternalRequestResponseRequired(subject, merged, data, getTimeout(), CancelAction.COMPLETE, validateSubjectAndReplyTo);
156154
return processPublishResponse(resp, options);
157155
}
158156

src/main/java/io/nats/client/impl/NatsJetStreamImpl.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public CachedStreamInfo(StreamInfo si) {
4545

4646
final NatsConnection conn;
4747
final JetStreamOptions jso;
48+
final Duration timeout;
4849
final boolean consumerCreate290Available;
4950
final boolean multipleSubjectFilter210Available;
5051
final boolean directBatchGet211Available;
@@ -59,8 +60,8 @@ public CachedStreamInfo(StreamInfo si) {
5960
// Clone the input jsOptions (JetStreamOptions.builder(...) handles null.
6061
// If jsOptions is not supplied or the jsOptions request timeout
6162
// was not set, use the connection options connect timeout.
62-
Duration rt = jsOptions == null || jsOptions.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();
63-
jso = JetStreamOptions.builder(jsOptions).requestTimeout(rt).build();
63+
timeout = jsOptions == null || jsOptions.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();
64+
jso = JetStreamOptions.builder(jsOptions).requestTimeout(timeout).build();
6465

6566
consumerCreate290Available = conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate();
6667
multipleSubjectFilter210Available = conn.getInfo().isNewerVersionThan("2.9.99");
@@ -70,17 +71,22 @@ public CachedStreamInfo(StreamInfo si) {
7071
NatsJetStreamImpl(NatsJetStreamImpl impl) {
7172
conn = impl.conn;
7273
jso = impl.jso;
74+
timeout = impl.timeout;
7375
consumerCreate290Available = impl.consumerCreate290Available;
7476
multipleSubjectFilter210Available = impl.multipleSubjectFilter210Available;
7577
directBatchGet211Available = impl.directBatchGet211Available;
7678
}
7779

80+
Duration getTimeout() {
81+
return timeout;
82+
}
83+
7884
// ----------------------------------------------------------------------------------------------------
7985
// Management that is also needed by regular context
8086
// ----------------------------------------------------------------------------------------------------
8187
ConsumerInfo _getConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
8288
String subj = String.format(JSAPI_CONSUMER_INFO, streamName, consumerName);
83-
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
89+
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
8490
return new ConsumerInfo(resp).throwOnHasError();
8591
}
8692

@@ -122,7 +128,7 @@ else if (durable == null) {
122128
}
123129

124130
ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config, action);
125-
Message resp = makeRequestResponseRequired(subj, ccr.serialize(), jso.getRequestTimeout());
131+
Message resp = makeRequestResponseRequired(subj, ccr.serialize(), getTimeout());
126132
return new ConsumerInfo(resp).throwOnHasError();
127133
}
128134

@@ -147,7 +153,7 @@ StreamInfo _getStreamInfo(String streamName, StreamInfoOptions options) throws I
147153
String subj = String.format(JSAPI_STREAM_INFO, streamName);
148154
StreamInfoReader sir = new StreamInfoReader();
149155
while (sir.hasMore()) {
150-
Message resp = makeRequestResponseRequired(subj, sir.nextJson(options), jso.getRequestTimeout());
156+
Message resp = makeRequestResponseRequired(subj, sir.nextJson(options), getTimeout());
151157
sir.process(resp);
152158
}
153159
return cacheStreamInfo(streamName, sir.getStreamInfo());
@@ -170,7 +176,7 @@ List<StreamInfo> cacheStreamInfo(List<StreamInfo> list) {
170176
List<String> _getStreamNames(String subjectFilter) throws IOException, JetStreamApiException {
171177
StreamNamesReader snr = new StreamNamesReader();
172178
while (snr.hasMore()) {
173-
Message resp = makeRequestResponseRequired(JSAPI_STREAM_NAMES, snr.nextJson(subjectFilter), jso.getRequestTimeout());
179+
Message resp = makeRequestResponseRequired(JSAPI_STREAM_NAMES, snr.nextJson(subjectFilter), getTimeout());
174180
snr.process(resp);
175181
}
176182
return snr.getStrings();

src/main/java/io/nats/client/impl/NatsJetStreamManagement.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public NatsJetStreamManagement(NatsConnection connection, JetStreamOptions jsOpt
4141
*/
4242
@Override
4343
public AccountStatistics getAccountStatistics() throws IOException, JetStreamApiException {
44-
Message resp = makeRequestResponseRequired(JSAPI_ACCOUNT_INFO, null, jso.getRequestTimeout());
44+
Message resp = makeRequestResponseRequired(JSAPI_ACCOUNT_INFO, null, getTimeout());
4545
return new AccountStatistics(resp).throwOnHasError();
4646
}
4747

@@ -69,7 +69,7 @@ private StreamInfo addOrUpdateStream(StreamConfiguration config, String template
6969
}
7070

7171
String subj = String.format(template, streamName);
72-
Message resp = makeRequestResponseRequired(subj, config.toJson().getBytes(StandardCharsets.UTF_8), jso.getRequestTimeout());
72+
Message resp = makeRequestResponseRequired(subj, config.toJson().getBytes(StandardCharsets.UTF_8), getTimeout());
7373
return createAndCacheStreamInfoThrowOnError(streamName, resp);
7474
}
7575

@@ -80,7 +80,7 @@ private StreamInfo addOrUpdateStream(StreamConfiguration config, String template
8080
public boolean deleteStream(String streamName) throws IOException, JetStreamApiException {
8181
validateNotNull(streamName, "Stream Name");
8282
String subj = String.format(JSAPI_STREAM_DELETE, streamName);
83-
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
83+
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
8484
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
8585
}
8686

@@ -109,7 +109,7 @@ public StreamInfo getStreamInfo(String streamName, StreamInfoOptions options) th
109109
public PurgeResponse purgeStream(String streamName) throws IOException, JetStreamApiException {
110110
validateNotNull(streamName, "Stream Name");
111111
String subj = String.format(JSAPI_STREAM_PURGE, streamName);
112-
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
112+
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
113113
return new PurgeResponse(resp).throwOnHasError();
114114
}
115115

@@ -122,7 +122,7 @@ public PurgeResponse purgeStream(String streamName, PurgeOptions options) throws
122122
validateNotNull(options, "Purge Options");
123123
String subj = String.format(JSAPI_STREAM_PURGE, streamName);
124124
byte[] body = options.toJson().getBytes(StandardCharsets.UTF_8);
125-
Message resp = makeRequestResponseRequired(subj, body, jso.getRequestTimeout());
125+
Message resp = makeRequestResponseRequired(subj, body, getTimeout());
126126
return new PurgeResponse(resp).throwOnHasError();
127127
}
128128

@@ -164,7 +164,7 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE
164164
validateNotNull(streamName, "Stream Name");
165165
validateNotNull(consumerName, "Consumer Name");
166166
String subj = String.format(JSAPI_CONSUMER_DELETE, streamName, consumerName);
167-
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
167+
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
168168
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
169169
}
170170

@@ -177,7 +177,7 @@ public ConsumerPauseResponse pauseConsumer(String streamName, String consumerNam
177177
validateNotNull(consumerName, "Consumer Name");
178178
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
179179
ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
180-
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout());
180+
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), getTimeout());
181181
return new ConsumerPauseResponse(resp).throwOnHasError();
182182
}
183183

@@ -189,7 +189,7 @@ public boolean resumeConsumer(String streamName, String consumerName) throws IOE
189189
validateNotNull(streamName, "Stream Name");
190190
validateNotNull(consumerName, "Consumer Name");
191191
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
192-
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
192+
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
193193
ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
194194
return !response.isPaused();
195195
}
@@ -216,7 +216,7 @@ private List<String> getConsumerNames(String streamName, String filter) throws I
216216
String subj = String.format(JSAPI_CONSUMER_NAMES, streamName);
217217
ConsumerNamesReader cnr = new ConsumerNamesReader();
218218
while (cnr.hasMore()) {
219-
Message resp = makeRequestResponseRequired(subj, cnr.nextJson(filter), jso.getRequestTimeout());
219+
Message resp = makeRequestResponseRequired(subj, cnr.nextJson(filter), getTimeout());
220220
cnr.process(resp);
221221
}
222222
return cnr.getStrings();
@@ -230,7 +230,7 @@ public List<ConsumerInfo> getConsumers(String streamName) throws IOException, Je
230230
String subj = String.format(JSAPI_CONSUMER_LIST, streamName);
231231
ConsumerListReader clg = new ConsumerListReader();
232232
while (clg.hasMore()) {
233-
Message resp = makeRequestResponseRequired(subj, clg.nextJson(), jso.getRequestTimeout());
233+
Message resp = makeRequestResponseRequired(subj, clg.nextJson(), getTimeout());
234234
clg.process(resp);
235235
}
236236
return clg.getConsumers();
@@ -264,7 +264,7 @@ public List<StreamInfo> getStreams() throws IOException, JetStreamApiException {
264264
public List<StreamInfo> getStreams(String subjectFilter) throws IOException, JetStreamApiException {
265265
StreamListReader slr = new StreamListReader();
266266
while (slr.hasMore()) {
267-
Message resp = makeRequestResponseRequired(JSAPI_STREAM_LIST, slr.nextJson(subjectFilter), jso.getRequestTimeout());
267+
Message resp = makeRequestResponseRequired(JSAPI_STREAM_LIST, slr.nextJson(subjectFilter), getTimeout());
268268
slr.process(resp);
269269
}
270270
return cacheStreamInfo(slr.getStreams());
@@ -332,15 +332,15 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
332332
subject = String.format(JSAPI_DIRECT_GET, streamName);
333333
payload = messageGetRequest.serialize();
334334
}
335-
Message resp = makeRequestResponseRequired(subject, payload, jso.getRequestTimeout());
335+
Message resp = makeRequestResponseRequired(subject, payload, getTimeout());
336336
if (resp.isStatusMessage()) {
337337
throw new JetStreamApiException(Error.convert(resp.getStatus()));
338338
}
339339
return new MessageInfo(resp, streamName, true);
340340
}
341341
else {
342342
String getSubject = String.format(JSAPI_MSG_GET, streamName);
343-
Message resp = makeRequestResponseRequired(getSubject, messageGetRequest.serialize(), jso.getRequestTimeout());
343+
Message resp = makeRequestResponseRequired(getSubject, messageGetRequest.serialize(), getTimeout());
344344
return new MessageInfo(resp, streamName, false).throwOnHasError();
345345
}
346346
}
@@ -464,7 +464,7 @@ public boolean deleteMessage(String streamName, long seq, boolean erase) throws
464464
validateNotNull(streamName, "Stream Name");
465465
String subj = String.format(JSAPI_MSG_DELETE, streamName);
466466
MessageDeleteRequest mdr = new MessageDeleteRequest(seq, erase);
467-
Message resp = makeRequestResponseRequired(subj, mdr.serialize(), jso.getRequestTimeout());
467+
Message resp = makeRequestResponseRequired(subj, mdr.serialize(), getTimeout());
468468
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
469469
}
470470

0 commit comments

Comments
 (0)