Skip to content

Commit 0f5f936

Browse files
committed
Merge branch 'main' into wbl3
2 parents 01f3ab1 + 8b4bce7 commit 0f5f936

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

src/main/java/io/nats/client/impl/MessageQueue.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ void drainTo(MessageQueue target) {
9898
editLock.lock();
9999
try {
100100
queue.drainTo(target.queue);
101-
target.length.set(queue.size());
101+
target.length.set(length.get());
102+
target.sizeInBytes.set(sizeInBytes.get());
102103
} finally {
103104
editLock.unlock();
104105
}

src/test/java/io/nats/client/impl/MessageQueueTests.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -443,13 +443,17 @@ public void testSizeInBytes() throws InterruptedException {
443443
NatsMessage msg3 = new ProtocolMessage(THREE);
444444
long expected = 0;
445445

446-
q.push(msg1); expected += msg1.getSizeInBytes();
446+
q.push(msg1);
447+
expected += msg1.getSizeInBytes();
447448
assertEquals(expected, q.sizeInBytes());
448-
q.push(msg2); expected += msg2.getSizeInBytes();
449+
q.push(msg2);
450+
expected += msg2.getSizeInBytes();
449451
assertEquals(expected, q.sizeInBytes());
450-
q.push(msg3); expected += msg3.getSizeInBytes();
452+
q.push(msg3);
453+
expected += msg3.getSizeInBytes();
451454
assertEquals(expected, q.sizeInBytes());
452-
q.popNow(); expected -= msg1.getSizeInBytes();
455+
q.popNow();
456+
expected -= msg1.getSizeInBytes();
453457
assertEquals(expected, q.sizeInBytes());
454458
q.accumulate(100,100, null); expected = 0;
455459
assertEquals(expected, q.sizeInBytes());
@@ -471,18 +475,57 @@ public void testSizeInBytesWithData() throws InterruptedException {
471475
assertEquals(72, msg2.getSizeInBytes());
472476
assertEquals(78, msg3.getSizeInBytes());
473477

474-
q.push(msg1); expected += msg1.getSizeInBytes();
478+
q.push(msg1);
479+
expected += msg1.getSizeInBytes();
475480
assertEquals(expected, q.sizeInBytes());
476-
q.push(msg2); expected += msg2.getSizeInBytes();
481+
q.push(msg2);
482+
expected += msg2.getSizeInBytes();
477483
assertEquals(expected, q.sizeInBytes());
478-
q.push(msg3); expected += msg3.getSizeInBytes();
484+
q.push(msg3);
485+
expected += msg3.getSizeInBytes();
479486
assertEquals(expected, q.sizeInBytes());
480-
q.popNow(); expected -= msg1.getSizeInBytes();
487+
q.popNow();
488+
expected -= msg1.getSizeInBytes();
481489
assertEquals(expected, q.sizeInBytes());
482490
q.accumulate(1000,100, null); expected = 0;
483491
assertEquals(expected, q.sizeInBytes());
484492
}
485493

494+
@Test
495+
public void testDrainTo() throws InterruptedException {
496+
MessageQueue q1 = new MessageQueue(true, REQUEST_CLEANUP_INTERVAL);
497+
498+
String subject = "subj";
499+
String replyTo = "reply";
500+
Headers h = new Headers().add("Content-Type", "text/plain");
501+
NatsMessage msg1 = new NatsMessage(subject, null, h, new byte[8]);
502+
NatsMessage msg2 = new NatsMessage(subject, null, h, new byte[16]);
503+
NatsMessage msg3 = new NatsMessage(subject, replyTo, h, new byte[16]);
504+
long expected = 0;
505+
506+
assertEquals(64, msg1.getSizeInBytes());
507+
assertEquals(72, msg2.getSizeInBytes());
508+
assertEquals(78, msg3.getSizeInBytes());
509+
510+
q1.push(msg1);
511+
expected += msg1.getSizeInBytes();
512+
assertEquals(1, q1.length());
513+
assertEquals(expected, q1.sizeInBytes());
514+
q1.push(msg2);
515+
expected += msg2.getSizeInBytes();
516+
assertEquals(2, q1.length());
517+
assertEquals(expected, q1.sizeInBytes());
518+
q1.push(msg3);
519+
expected += msg3.getSizeInBytes();
520+
assertEquals(3, q1.length());
521+
assertEquals(expected, q1.sizeInBytes());
522+
523+
MessageQueue q2 = new MessageQueue(true, REQUEST_CLEANUP_INTERVAL);
524+
q1.drainTo(q2);
525+
assertEquals(3, q2.length());
526+
assertEquals(expected, q2.sizeInBytes());
527+
}
528+
486529
@Test
487530
public void testFilterTail() throws InterruptedException, UnsupportedEncodingException {
488531
MessageQueue q = new MessageQueue(true, REQUEST_CLEANUP_INTERVAL);

0 commit comments

Comments
 (0)