-
Notifications
You must be signed in to change notification settings - Fork 79
Do Serialisation in main to enable simplification #877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release/2.7
Are you sure you want to change the base?
Do Serialisation in main to enable simplification #877
Conversation
8529017
to
8ccffbe
Compare
258a24c
to
3c4f6f7
Compare
@mtmk any suggestions/thought on what could be causing the issue. Interesting to note the exception is being thrown in recently added code. |
3c4f6f7
to
ac829d0
Compare
@thompson-tomo will have a look now. (btw don't worry about rebasing anymore if that's easier for you we will squash merge anyway.) |
Signed-off-by: James Thompson <[email protected]>
ac829d0
to
96f3e2f
Compare
Thanks normally I would not have rebased but the branches needed it to fix the pipeline issue. It looks to me like the deserialisation of the response is failing when using jetstream subject and serializing go a string. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
early buffer pool returns seems to be the main issue
@@ -247,12 +247,11 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm | |||
MinAckPending = _priorityGroup?.MinAckPending ?? 0, | |||
}; | |||
|
|||
await commandWriter.PublishAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason for using command writer here is that this particular one is priorityCommandWriter which is processed right after reconnect before anything else is processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting as the method shows it is a standard commandwritter. The reason for moving to connection is so that it can be traced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you follow it's usages you'll see in NatsConnection class there is a priorityCommandWriter variable. type is the same. it's just a temporary one processed just after the connection is established.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not following. There is only 1 reference to PriorityCommandWriter class which is only a local variable within the SetupReaderWriterAsync method hence would not be used especially as priority command writer is not being passed to this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's called from here
nats.net/src/NATS.Client.Core/NatsConnection.cs
Lines 540 to 541 in 55fb6db
// Reestablish subscriptions and consumers | |
reconnectTask = _subscriptionManager.WriteReconnectCommandsAsync(priorityCommandWriter.CommandWriter).AsTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it is not actually using a PriorityCommandWriter but rather a short-lived commandwriter, I will implement changes tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes have been made
@@ -192,13 +192,11 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm | |||
MinPending = _priorityGroup?.MinPending ?? 0, | |||
MinAckPending = _priorityGroup?.MinAckPending ?? 0, | |||
}; | |||
|
|||
await commandWriter.PublishAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes have been made
if (payloadBuffer != null) | ||
{ | ||
payloadBuffer.Reset(); | ||
_pool.Return(payloadBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem here is buffers are retuned to the pool before making sure they are processed.
edit add this to see more tests failing:
diff --git a/src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs
@@ -123,7 +123,7 @@
public void Reset()
{
if (_array != null)
- _pool.Return(_array);
+ _pool.Return(_array, clearArray: true);
_array = _pool.Rent(_size);
_index = 0;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need to look if line 91 is behaving as desired but sending based on tests is working only reply is failing when coming from jetstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As suspected was missing an await on line 91 which has improved reliability of tests.
this maybe related to removed serializer arguments |
But it works for non jetstream subjects. Note the test is directly publishing to the jetstream queue and not using the method |
@@ -53,7 +49,7 @@ public override void SetResult(string? replyTo, ReadOnlySequence<byte> payload, | |||
{ | |||
lock (_gate) | |||
{ | |||
_msg = NatsMsg<T>.Build(Subject, replyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer); | |||
_msg = new NatsRecievedEvent(Subject, replyTo, headersBuffer, payload); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffers need to be processed here. deferring it to be processed later buffers will be in an unknown state because they are reused for socket reads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffers has actually already been processed this is just the ReadOnlyMemory that was extracted. The managing of the buffer has not changed and is still managed in: https://github.com/nats-io/nats.net/blob/release/2.7/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a partial view on the internal buffer not a copy. if it has to be passed potentially to another thread in async calls it must be copied to a separate byte array which is what this design is trying to avoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the copy solution has in fact resulted in an improved throughput with only a small increase in memory allocation hence throughput provides benefits in the change.
perhaps we can create a switch for otel enabled connections so that buffers are copied as raw bytes and passed efficiently as e.g. |
Not sure about given the failing test is not sending any payload so doubt that the serialisation buffers would be having an impact. |
This is not just about otel but also helps for retry logic which is included by default with jetstream. |
127959f
to
8677ffa
Compare
9de111f
to
84e9d5e
Compare
84e9d5e
to
5ad3768
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer issue is fixed by allocating and copying new buffers which defeated the point of pooling buffers hence increasing the allocations.
release/2.7 branch:
RequestReplyBench
Method | Mean | Error | StdDev | Allocated |
---|---|---|---|---|
RequestReplyDirectAsync | 145.0 us | 8.69 us | 0.48 us | 2.61 KB |
PublishSerialBench (increased iter)
Method | Iter | Mean | Error | StdDev | Allocated |
---|---|---|---|---|---|
PublishAsync | 1000000 | 356.1 ms | 46.90 ms | 2.57 ms | 2.03 KB |
this branch:
RequestReplyBench
Method | Mean | Error | StdDev | Allocated |
---|---|---|---|---|
RequestReplyDirectAsync | 166.9 us | 73.34 us | 4.02 us | 3.08 KB |
PublishSerialBench (increased iter)
Method | Iter | Mean | Error | StdDev | Allocated |
---|---|---|---|---|---|
PublishAsync | 1000000 | 387.6 ms | 15.99 ms | 0.88 ms | 3.34 KB |
a solution i can think of is to have a different path for otel (or similar applications requiring this type of behavior) where we can copy the buffers efficiently from the pool (e.g. using NatsMemoryOwner
) with minimal allocation impact. or leave where serialization happens as it is and pass the struct around where we can attach more parameters optionally.
haven't checked the other changes.
ReadOnlySequence<byte>? headerValue = null; | ||
if (headersBuffer != null) | ||
{ | ||
var headerData = new byte[headersBuffer.Value.Length]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allocating here defeats the point of managing pooled buffers and increases GC pressure.
} | ||
|
||
if (!_pool.TryRent(out payloadBuffer!)) | ||
payloadBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again if you don't return these to the pool we'd have increased GC and not much point using the pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what you mean by not returning to the pool especially given that within the finally block i am in fact returning them to the pool in the exact way it was previously being done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see good. i missed that. the increase in allocations is curious though. maybe the object pool size is too low for that bench. need to check that.
That was only the case for direct replies and not inbox replies and certainly not for publish. Other cases have been fixed without copying. In relation to the benchmarks, if the concern is about gc, how come we are not looking at how many GC operations are in fact occurring? Also if performance is that much of a concern, it would be useful to see comparisons with varying qty's so that the growth can be looked at to see if the initial is higher, which skews the data. For the publish
I would much rather have a common solution which overall has a net positive impact which considers performance and use cases offered |
8f07d4b
to
009d403
Compare
looking into that. there is increase in allocations regardless. we might have to think differently about this. I noticed you had a comment about throughput up somewhere as well. do you have any benchmarks or demo proving that which would be very handy. there has been discussions about replacing semaphore in command writer with a channel as well. maybe that's the way to go. wdyt?
yes, performance is always a concern. feel free to add more benchmarks.
apps that don't use otel should not be impacted even if it means code duplication. |
This does some basic refactoring to prepare for #871 with the focus being moving serialisation into nats connection. This is to enable splitting of sending vs serialisation which is needed for OTEL.
As part of this change the following has been introduced:
In the future further optimisations could be done to improve performance for retried messages by avoiding re-serialising the content