Skip to content

Conversation

thompson-tomo
Copy link
Contributor

@thompson-tomo thompson-tomo commented Jun 3, 2025

This does some basic refactoring to prepare for #871 with the focus being moving serialisation into nats connection. This is to enable splitting of sending vs serialisation which is needed for OTEL.

As part of this change the following has been introduced:

  • The serialisation occurs within the methods of nats connection to enable the publishing to focus on dealing with the bytes

In the future further optimisations could be done to improve performance for retried messages by avoiding re-serialising the content

@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch 2 times, most recently from 8529017 to 8ccffbe Compare June 3, 2025 07:33
@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch 7 times, most recently from 258a24c to 3c4f6f7 Compare June 10, 2025 09:16
@thompson-tomo
Copy link
Contributor Author

@mtmk any suggestions/thought on what could be causing the issue. Interesting to note the exception is being thrown in recently added code.

@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch from 3c4f6f7 to ac829d0 Compare June 10, 2025 12:11
@mtmk
Copy link
Member

mtmk commented Jun 10, 2025

@thompson-tomo will have a look now.

(btw don't worry about rebasing anymore if that's easier for you we will squash merge anyway.)

@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch from ac829d0 to 96f3e2f Compare June 10, 2025 12:36
@thompson-tomo
Copy link
Contributor Author

thompson-tomo commented Jun 10, 2025

Thanks normally I would not have rebased but the branches needed it to fix the pipeline issue.

It looks to me like the deserialisation of the response is failing when using jetstream subject and serializing go a string.

Copy link
Member

@mtmk mtmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

early buffer pool returns seems to be the main issue

@@ -247,12 +247,11 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
};

await commandWriter.PublishAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reason for using command writer here is that this particular one is priorityCommandWriter which is processed right after reconnect before anything else is processed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting as the method shows it is a standard commandwritter. The reason for moving to connection is so that it can be traced.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you follow it's usages you'll see in NatsConnection class there is a priorityCommandWriter variable. type is the same. it's just a temporary one processed just after the connection is established.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following. There is only 1 reference to PriorityCommandWriter class which is only a local variable within the SetupReaderWriterAsync method hence would not be used especially as priority command writer is not being passed to this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's called from here

// Reestablish subscriptions and consumers
reconnectTask = _subscriptionManager.WriteReconnectCommandsAsync(priorityCommandWriter.CommandWriter).AsTask();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah it is not actually using a PriorityCommandWriter but rather a short-lived commandwriter, I will implement changes tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes have been made

@@ -192,13 +192,11 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
MinPending = _priorityGroup?.MinPending ?? 0,
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
};

await commandWriter.PublishAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes have been made

if (payloadBuffer != null)
{
payloadBuffer.Reset();
_pool.Return(payloadBuffer);
Copy link
Member

@mtmk mtmk Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem here is buffers are retuned to the pool before making sure they are processed.

edit add this to see more tests failing:

diff --git a/src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs
@@ -123,7 +123,7 @@
     public void Reset()
     {
         if (_array != null)
-            _pool.Return(_array);
+            _pool.Return(_array, clearArray: true);
         _array = _pool.Rent(_size);
         _index = 0;
     }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to look if line 91 is behaving as desired but sending based on tests is working only reply is failing when coming from jetstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suspected was missing an await on line 91 which has improved reliability of tests.

@mtmk
Copy link
Member

mtmk commented Jun 10, 2025

It looks to me like the deserialisation of the response is failing when using jetstream subject and serializing go a string.

this maybe related to removed serializer arguments

@thompson-tomo
Copy link
Contributor Author

But it works for non jetstream subjects. Note the test is directly publishing to the jetstream queue and not using the method

@@ -53,7 +49,7 @@ public override void SetResult(string? replyTo, ReadOnlySequence<byte> payload,
{
lock (_gate)
{
_msg = NatsMsg<T>.Build(Subject, replyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
_msg = new NatsRecievedEvent(Subject, replyTo, headersBuffer, payload);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffers need to be processed here. deferring it to be processed later buffers will be in an unknown state because they are reused for socket reads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffers has actually already been processed this is just the ReadOnlyMemory that was extracted. The managing of the buffer has not changed and is still managed in: https://github.com/nats-io/nats.net/blob/release/2.7/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a partial view on the internal buffer not a copy. if it has to be passed potentially to another thread in async calls it must be copied to a separate byte array which is what this design is trying to avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the copy solution has in fact resulted in an improved throughput with only a small increase in memory allocation hence throughput provides benefits in the change.

@mtmk
Copy link
Member

mtmk commented Jun 10, 2025

perhaps we can create a switch for otel enabled connections so that buffers are copied as raw bytes and passed efficiently as e.g. NatsMemoryOwner to minimize GC then the serialization can happen anywhere along the stack. keeping serialization close to the wire was done to minimize buffer copies but in this case it seems to be getting in the way.

@thompson-tomo
Copy link
Contributor Author

early buffer pool returns seems to be the main issue

Not sure about given the failing test is not sending any payload so doubt that the serialisation buffers would be having an impact.

@thompson-tomo
Copy link
Contributor Author

perhaps we can create a switch for otel enabled connections so that buffers are copied as raw bytes and passed efficiently as e.g. NatsMemoryOwner to minimize GC then the serialization can happen anywhere along the stack. keeping serialization close to the wire was done to minimize buffer copies but in this case it seems to be getting in the way.

This is not just about otel but also helps for retry logic which is included by default with jetstream.

@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch 5 times, most recently from 127959f to 8677ffa Compare June 12, 2025 04:54
@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch 22 times, most recently from 9de111f to 84e9d5e Compare June 13, 2025 10:19
@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch from 84e9d5e to 5ad3768 Compare June 13, 2025 10:51
@thompson-tomo thompson-tomo requested a review from mtmk June 13, 2025 11:05
Copy link
Member

@mtmk mtmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer issue is fixed by allocating and copying new buffers which defeated the point of pooling buffers hence increasing the allocations.

release/2.7 branch:

RequestReplyBench

Method Mean Error StdDev Allocated
RequestReplyDirectAsync 145.0 us 8.69 us 0.48 us 2.61 KB

PublishSerialBench (increased iter)

Method Iter Mean Error StdDev Allocated
PublishAsync 1000000 356.1 ms 46.90 ms 2.57 ms 2.03 KB

this branch:

RequestReplyBench

Method Mean Error StdDev Allocated
RequestReplyDirectAsync 166.9 us 73.34 us 4.02 us 3.08 KB

PublishSerialBench (increased iter)

Method Iter Mean Error StdDev Allocated
PublishAsync 1000000 387.6 ms 15.99 ms 0.88 ms 3.34 KB

a solution i can think of is to have a different path for otel (or similar applications requiring this type of behavior) where we can copy the buffers efficiently from the pool (e.g. using NatsMemoryOwner) with minimal allocation impact. or leave where serialization happens as it is and pass the struct around where we can attach more parameters optionally.

haven't checked the other changes.

ReadOnlySequence<byte>? headerValue = null;
if (headersBuffer != null)
{
var headerData = new byte[headersBuffer.Value.Length];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocating here defeats the point of managing pooled buffers and increases GC pressure.

}

if (!_pool.TryRent(out payloadBuffer!))
payloadBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again if you don't return these to the pool we'd have increased GC and not much point using the pool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean by not returning to the pool especially given that within the finally block i am in fact returning them to the pool in the exact way it was previously being done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see good. i missed that. the increase in allocations is curious though. maybe the object pool size is too low for that bench. need to check that.

@thompson-tomo
Copy link
Contributor Author

The buffer issue is fixed by allocating and copying new buffers which defeated the point of pooling buffers hence increasing the allocations.

That was only the case for direct replies and not inbox replies and certainly not for publish. Other cases have been fixed without copying.

In relation to the benchmarks, if the concern is about gc, how come we are not looking at how many GC operations are in fact occurring? Also if performance is that much of a concern, it would be useful to see comparisons with varying qty's so that the growth can be looked at to see if the initial is higher, which skews the data. For the publish

a solution i can think of is to have a different path for otel (or similar applications requiring this type of behavior) where we can copy the buffers efficiently from the pool (e.g. using NatsMemoryOwner) with minimal allocation impact

I would much rather have a common solution which overall has a net positive impact which considers performance and use cases offered

@thompson-tomo thompson-tomo force-pushed the Task/DoSerialisartionInConnection branch from 8f07d4b to 009d403 Compare June 14, 2025 04:27
@mtmk
Copy link
Member

mtmk commented Jun 16, 2025

The buffer issue is fixed by allocating and copying new buffers which defeated the point of pooling buffers hence increasing the allocations.

That was only the case for direct replies and not inbox replies and certainly not for publish. Other cases have been fixed without copying.

looking into that. there is increase in allocations regardless. we might have to think differently about this. I noticed you had a comment about throughput up somewhere as well. do you have any benchmarks or demo proving that which would be very handy. there has been discussions about replacing semaphore in command writer with a channel as well. maybe that's the way to go. wdyt?

In relation to the benchmarks, if the concern is about gc, how come we are not looking at how many GC operations are in fact occurring? Also if performance is that much of a concern, it would be useful to see comparisons with varying qty's so that the growth can be looked at to see if the initial is higher, which skews the data. For the publish

yes, performance is always a concern. feel free to add more benchmarks.

a solution i can think of is to have a different path for otel (or similar applications requiring this type of behavior) where we can copy the buffers efficiently from the pool (e.g. using NatsMemoryOwner) with minimal allocation impact

I would much rather have a common solution which overall has a net positive impact which considers performance and use cases offered

apps that don't use otel should not be impacted even if it means code duplication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants