@@ -79,8 +79,8 @@ class MessageQueue {
7979 this .queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue <>(maxMessagesInOutgoingQueue ) : new LinkedBlockingQueue <>();
8080 this .discardWhenFull = discardWhenFull ;
8181 this .running = new AtomicInteger (RUNNING );
82- this . sizeInBytes = new AtomicLong (0 );
83- this . length = new AtomicLong (0 );
82+ sizeInBytes = new AtomicLong (0 );
83+ length = new AtomicLong (0 );
8484 this .offerLockNanos = requestCleanupInterval .toNanos ();
8585 this .offerTimeoutNanos = Math .max (MIN_OFFER_TIMEOUT_NANOS , requestCleanupInterval .toMillis () * NANOS_PER_MILLI * 95 / 100 ) ;
8686
@@ -98,8 +98,8 @@ void drainTo(MessageQueue target) {
9898 editLock .lock ();
9999 try {
100100 queue .drainTo (target .queue );
101- target .length .set (length .get ( ));
102- target .sizeInBytes .set (sizeInBytes .get ( ));
101+ target .length .set (length .getAndSet ( 0 ));
102+ target .sizeInBytes .set (sizeInBytes .getAndSet ( 0 ));
103103 } finally {
104104 editLock .unlock ();
105105 }
@@ -132,8 +132,8 @@ void drain() {
132132 }
133133
134134 boolean isDrained () {
135- // poison pill is not included in the length count, or the size
136- return this .running .get () == DRAINING && this . length .get () == 0 ;
135+ // poison pill and any other "mark" messages are not included in the length count, or the size
136+ return this .running .get () == DRAINING && length .get () == 0 ;
137137 }
138138
139139 boolean push (NatsMessage msg ) {
@@ -165,17 +165,19 @@ boolean push(NatsMessage msg, boolean internal) {
165165 */
166166 if (editLock .tryLock (offerLockNanos , TimeUnit .NANOSECONDS )) {
167167 try {
168- if (!internal && this .discardWhenFull ) {
169- return this .queue .offer (msg );
168+ // offer with no timeout returns true if the queue was not full
169+ if (!internal && discardWhenFull ) {
170+ return queue .offer (msg );
170171 }
171172
172173 long timeoutNanosLeft = Math .max (MIN_OFFER_TIMEOUT_NANOS , offerTimeoutNanos - (NatsSystemClock .nanoTime () - startNanos ));
173174
174- if (!this .queue .offer (msg , timeoutNanosLeft , TimeUnit .NANOSECONDS )) {
175+ // offer with timeout
176+ if (!queue .offer (msg , timeoutNanosLeft , TimeUnit .NANOSECONDS )) {
175177 throw new IllegalStateException (OUTPUT_QUEUE_IS_FULL + queue .size ());
176178 }
177- this . sizeInBytes .getAndAdd (msg .getSizeInBytes ());
178- this . length .incrementAndGet ();
179+ sizeInBytes .getAndAdd (msg .getSizeInBytes ());
180+ length .incrementAndGet ();
179181 return true ;
180182
181183 }
@@ -196,13 +198,20 @@ boolean push(NatsMessage msg, boolean internal) {
196198 /**
197199 * poisoning the queue puts the known poison pill into the queue, forcing any waiting code to stop
198200 * waiting and return.
201+ * This is done outside of push so it is not counted in length or size
202+ * offer is used instead of add since we don't care if it fails because it was full
199203 */
200204 void poisonTheQueue () {
201- try {
202- this .queue .add (POISON_PILL );
203- } catch (IllegalStateException ie ) { // queue was full, so we don't really need poison pill
204- // ok to ignore this
205- }
205+ queue .offer (POISON_PILL );
206+ }
207+
208+ /**
209+ * Marking the queue, like POISON, is a message we don't want to count.
210+ * Intended to only be used with an unbounded queue. Use at your own risk.
211+ * @param msg the mark
212+ */
213+ void markTheQueue (NatsMessage msg ) {
214+ queue .offer (msg );
206215 }
207216
208217 NatsMessage poll (Duration timeout ) throws InterruptedException {
@@ -240,8 +249,8 @@ NatsMessage pop(Duration timeout) throws InterruptedException {
240249 return null ;
241250 }
242251
243- this . sizeInBytes .getAndAdd (-msg .getSizeInBytes ());
244- this . length .decrementAndGet ();
252+ sizeInBytes .getAndAdd (-msg .getSizeInBytes ());
253+ length .decrementAndGet ();
245254
246255 return msg ;
247256 }
@@ -276,12 +285,12 @@ NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate,
276285 long size = msg .getSizeInBytes ();
277286
278287 if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate ) {
279- this . sizeInBytes .addAndGet (-size );
280- this . length .decrementAndGet ();
288+ sizeInBytes .addAndGet (-size );
289+ length .decrementAndGet ();
281290 return msg ;
282291 }
283292
284- long count = 1 ;
293+ long accumulated = 1 ;
285294 NatsMessage cursor = msg ;
286295
287296 while (true ) {
@@ -290,15 +299,15 @@ NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate,
290299 long s = next .getSizeInBytes ();
291300 if (maxBytesToAccumulate < 0 || (size + s ) < maxBytesToAccumulate ) { // keep going
292301 size += s ;
293- count ++;
302+ accumulated ++;
294303
295304 this .queue .poll (); // we need to get the message out of the queue b/c we only peeked
296305 cursor .next = next ;
297306 if (next .flushImmediatelyAfterPublish ) {
298307 // if we are going to flush, then don't accumulate more
299308 break ;
300309 }
301- if (count == maxMessagesToAccumulate ) {
310+ if (accumulated == maxMessagesToAccumulate ) {
302311 break ;
303312 }
304313 cursor = cursor .next ;
@@ -310,8 +319,8 @@ NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate,
310319 }
311320 }
312321
313- this . sizeInBytes .addAndGet (-size );
314- this . length .addAndGet (-count );
322+ sizeInBytes .addAndGet (-size );
323+ length .addAndGet (-accumulated );
315324
316325 return msg ;
317326 }
@@ -322,11 +331,11 @@ NatsMessage popNow() throws InterruptedException {
322331 }
323332
324333 long length () {
325- return this . length .get ();
334+ return length .get ();
326335 }
327336
328337 long sizeInBytes () {
329- return this . sizeInBytes .get ();
338+ return sizeInBytes .get ();
330339 }
331340
332341 void filter (Predicate <NatsMessage > p ) {
@@ -341,8 +350,8 @@ void filter(Predicate<NatsMessage> p) {
341350 if (!p .test (cursor )) {
342351 newQueue .add (cursor );
343352 } else {
344- this . sizeInBytes .addAndGet (-cursor .getSizeInBytes ());
345- this . length .decrementAndGet ();
353+ sizeInBytes .addAndGet (-cursor .getSizeInBytes ());
354+ length .decrementAndGet ();
346355 }
347356 cursor = this .queue .poll ();
348357 }
@@ -356,8 +365,8 @@ void clear() {
356365 editLock .lock ();
357366 try {
358367 this .queue .clear ();
359- this . length .set (0 );
360- this . sizeInBytes .set (0 );
368+ length .set (0 );
369+ sizeInBytes .set (0 );
361370 } finally {
362371 editLock .unlock ();
363372 }
0 commit comments