Skip to content

Commit 3c4f6f7

Browse files
committed
Do Serialisation in nats con to enable simplification
Signed-off-by: James Thompson <[email protected]>
1 parent 7d9cb11 commit 3c4f6f7

File tree

10 files changed

+119
-100
lines changed

10 files changed

+119
-100
lines changed

src/NATS.Client.Core/Commands/CommandWriter.cs

Lines changed: 8 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,13 @@ internal sealed class CommandWriter : IAsyncDisposable
3333
private readonly bool _trace;
3434
private readonly string _name;
3535
private readonly NatsConnection _connection;
36-
private readonly ObjectPool _pool;
37-
private readonly int _arrayPoolInitialSize;
3836
private readonly object _lock = new();
3937
private readonly CancellationTokenSource _cts;
4038
private readonly ConnectionStatsCounter _counter;
4139
private readonly Memory<byte> _consolidateMem = new byte[SendMemSize].AsMemory();
4240
private readonly TimeSpan _defaultCommandTimeout;
4341
private readonly Action<PingCommand> _enqueuePing;
4442
private readonly ProtocolWriter _protocolWriter;
45-
private readonly HeaderWriter _headerWriter;
4643
private readonly Channel<int> _channelLock;
4744
private readonly Channel<int> _channelSize;
4845
private readonly PipeReader _pipeReader;
@@ -55,25 +52,19 @@ internal sealed class CommandWriter : IAsyncDisposable
5552
private CancellationTokenSource? _ctsReader;
5653
private volatile bool _disposed;
5754

58-
public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
55+
public CommandWriter(string name, NatsConnection connection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
5956
{
6057
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
6158
_trace = _logger.IsEnabled(LogLevel.Trace);
6259
_name = name;
6360
_connection = connection;
64-
_pool = pool;
65-
66-
// Derive ArrayPool rent size from buffer size to
67-
// avoid defining another option.
68-
_arrayPoolInitialSize = opts.WriterBufferSize / 256;
6961

7062
_counter = counter;
7163
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
7264
_enqueuePing = enqueuePing;
7365
_protocolWriter = new ProtocolWriter(opts.SubjectEncoding);
7466
_channelLock = Channel.CreateBounded<int>(1);
7567
_channelSize = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true, SingleReader = true });
76-
_headerWriter = new HeaderWriter(opts.HeaderEncoding);
7768
_cts = new CancellationTokenSource();
7869

7970
var pipe = new Pipe(new PipeOptions(
@@ -294,49 +285,23 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)
294285
return default;
295286
}
296287

297-
public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
288+
public ValueTask PublishAsync(string subject, ReadOnlyMemory<byte>? headersBuffer, ReadOnlyMemory<byte> payloadBuffer, string? replyTo, CancellationToken cancellationToken)
298289
{
299290
if (_trace)
300291
{
301292
_logger.LogTrace(NatsLogEvents.Protocol, "PUB {Subject} {ReplyTo}", subject, replyTo);
302293
}
303294

304-
NatsPooledBufferWriter<byte>? headersBuffer = null;
305-
if (headers != null)
306-
{
307-
if (!_pool.TryRent(out headersBuffer))
308-
headersBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
309-
}
310-
311-
NatsPooledBufferWriter<byte> payloadBuffer;
312-
if (!_pool.TryRent(out payloadBuffer!))
313-
payloadBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
314-
315295
try
316296
{
317-
if (headers != null)
318-
_headerWriter.Write(headersBuffer!, headers);
319-
320-
if (value != null)
321-
serializer.Serialize(payloadBuffer, value);
322-
323-
var size = payloadBuffer.WrittenMemory.Length + (headersBuffer?.WrittenMemory.Length ?? 0);
297+
var size = payloadBuffer.Length + (headersBuffer?.Length ?? 0);
324298
if (_connection.ServerInfo is { } info && size > info.MaxPayload)
325299
{
326300
throw new NatsPayloadTooLargeException($"Payload size {size} exceeds server's maximum payload size {info.MaxPayload}");
327301
}
328302
}
329303
catch
330304
{
331-
payloadBuffer.Reset();
332-
_pool.Return(payloadBuffer);
333-
334-
if (headersBuffer != null)
335-
{
336-
headersBuffer.Reset();
337-
_pool.Return(headersBuffer);
338-
}
339-
340305
throw;
341306
}
342307

@@ -361,21 +326,12 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
361326
throw new ObjectDisposedException(nameof(CommandWriter));
362327
}
363328

364-
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
329+
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer, payloadBuffer);
365330
EnqueueCommand();
366331
}
367332
finally
368333
{
369334
_semLock.Release();
370-
371-
payloadBuffer.Reset();
372-
_pool.Return(payloadBuffer);
373-
374-
if (headersBuffer != null)
375-
{
376-
headersBuffer.Reset();
377-
_pool.Return(headersBuffer);
378-
}
379335
}
380336

381337
return default;
@@ -815,7 +771,7 @@ private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken c
815771
#if !NETSTANDARD
816772
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
817773
#endif
818-
private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, string? replyTo, NatsPooledBufferWriter<byte>? headersBuffer, NatsPooledBufferWriter<byte> payloadBuffer, CancellationToken cancellationToken)
774+
private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, string? replyTo, ReadOnlyMemory<byte>? headersBuffer, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
819775
{
820776
try
821777
{
@@ -839,7 +795,7 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
839795
await _flushTask!.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false);
840796
}
841797

842-
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
798+
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer, payloadBuffer);
843799
EnqueueCommand();
844800
}
845801
catch (TimeoutException)
@@ -853,16 +809,9 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
853809
_semLock.Release();
854810
}
855811
}
856-
finally
812+
catch
857813
{
858-
payloadBuffer.Reset();
859-
_pool.Return(payloadBuffer);
860-
861-
if (headersBuffer != null)
862-
{
863-
headersBuffer.Reset();
864-
_pool.Return(headersBuffer);
865-
}
814+
throw;
866815
}
867816
}
868817

src/NATS.Client.Core/Commands/PriorityCommandWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable
88

99
public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, SocketConnectionWrapper socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing)
1010
{
11-
CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing);
11+
CommandWriter = new CommandWriter("init", connection, opts, counter, enqueuePing);
1212
CommandWriter.Reset(socketConnection);
1313
}
1414

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Buffers;
2+
3+
namespace NATS.Client.Core.Internal;
4+
5+
public readonly record struct NatsRecievedEvent
6+
{
7+
public NatsRecievedEvent(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payload)
8+
{
9+
Subject = subject;
10+
ReplyTo = replyTo;
11+
HeadersBuffer = headersBuffer;
12+
Payload = payload;
13+
}
14+
15+
public string Subject { get; }
16+
17+
public string? ReplyTo { get; }
18+
19+
public ReadOnlySequence<byte>? HeadersBuffer { get; }
20+
21+
public ReadOnlySequence<byte> Payload { get; }
22+
}

src/NATS.Client.Core/Internal/ReplyTask.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,23 @@ internal sealed class ReplyTask<T> : ReplyTaskBase, IDisposable
1010
private readonly object _gate;
1111
private readonly ReplyTaskFactory _factory;
1212
private readonly long _id;
13-
private readonly NatsConnection _connection;
14-
private readonly INatsDeserialize<T> _deserializer;
1513
private readonly TimeSpan _requestTimeout;
1614
private readonly TaskCompletionSource _tcs;
17-
private NatsMsg<T> _msg;
15+
private NatsRecievedEvent _msg;
1816

19-
public ReplyTask(ReplyTaskFactory factory, long id, string subject, NatsConnection connection, INatsDeserialize<T> deserializer, TimeSpan requestTimeout)
17+
public ReplyTask(ReplyTaskFactory factory, long id, string subject, TimeSpan requestTimeout)
2018
{
2119
_factory = factory;
2220
_id = id;
2321
Subject = subject;
24-
_connection = connection;
25-
_deserializer = deserializer;
2622
_requestTimeout = requestTimeout;
2723
_tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
2824
_gate = new object();
2925
}
3026

3127
public string Subject { get; }
3228

33-
public async ValueTask<NatsMsg<T>> GetResultAsync(CancellationToken cancellationToken)
29+
public async ValueTask<NatsRecievedEvent> GetResultAsync(CancellationToken cancellationToken)
3430
{
3531
try
3632
{
@@ -53,7 +49,7 @@ public override void SetResult(string? replyTo, ReadOnlySequence<byte> payload,
5349
{
5450
lock (_gate)
5551
{
56-
_msg = NatsMsg<T>.Build(Subject, replyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
52+
_msg = new NatsRecievedEvent(Subject, replyTo, headersBuffer, payload);
5753
}
5854

5955
_tcs.TrySetResult();
@@ -73,7 +69,6 @@ internal sealed class ReplyTaskFactory
7369
private readonly string _inboxPrefixString;
7470
private readonly NatsConnection _connection;
7571
private readonly ConcurrentDictionary<long, ReplyTaskBase> _replies;
76-
private readonly INatsSerializerRegistry _serializerRegistry;
7772
private readonly TimeSpan _requestTimeout;
7873
private readonly int _subjectMaxLength;
7974
private readonly bool _allocSubject;
@@ -86,14 +81,12 @@ public ReplyTaskFactory(NatsConnection connection)
8681
_inboxPrefix = Encoding.UTF8.GetBytes(_inboxPrefixString);
8782
_subjectMaxLength = _inboxPrefix.Length + 20; // 20 digits for long
8883
_allocSubject = _subjectMaxLength < 128;
89-
_serializerRegistry = _connection.Opts.SerializerRegistry;
9084
_requestTimeout = _connection.Opts.RequestTimeout;
9185
_replies = new ConcurrentDictionary<long, ReplyTaskBase>();
9286
}
9387

94-
public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deserializer, TimeSpan? requestTimeout)
88+
public ReplyTask<TReply> CreateReplyTask<TReply>(TimeSpan? requestTimeout)
9589
{
96-
deserializer ??= _serializerRegistry.GetDeserializer<TReply>();
9790
var id = Interlocked.Increment(ref _nextId);
9891

9992
string subject;
@@ -117,7 +110,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
117110
subject = _inboxPrefixString + id;
118111
}
119112

120-
var rt = new ReplyTask<TReply>(this, id, subject, _connection, deserializer, requestTimeout ?? _requestTimeout);
113+
var rt = new ReplyTask<TReply>(this, id, subject, requestTimeout ?? _requestTimeout);
121114
_replies.TryAdd(id, rt);
122115
return rt;
123116
}

src/NATS.Client.Core/NatsConnection.Publish.cs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Diagnostics;
2+
using NATS.Client.Core.Commands;
13
using NATS.Client.Core.Internal;
24

35
namespace NATS.Client.Core;
@@ -14,9 +16,7 @@ public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, st
1416
try
1517
{
1618
headers?.SetReadOnly();
17-
return ConnectionState != NatsConnectionState.Open
18-
? ConnectAndPublishAsync(subject, default, headers, replyTo, NatsRawSerializer<byte[]>.Default, cancellationToken)
19-
: CommandWriter.PublishAsync(subject, default, headers, replyTo, NatsRawSerializer<byte[]>.Default, cancellationToken);
19+
return PerformPublishAsync(subject, headers, default, NatsRawSerializer<byte[]>.Default, replyTo, cancellationToken);
2020
}
2121
catch (Exception ex)
2222
{
@@ -26,9 +26,7 @@ public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, st
2626
}
2727

2828
headers?.SetReadOnly();
29-
return ConnectionState != NatsConnectionState.Open
30-
? ConnectAndPublishAsync(subject, default, headers, replyTo, NatsRawSerializer<byte[]>.Default, cancellationToken)
31-
: CommandWriter.PublishAsync(subject, default, headers, replyTo, NatsRawSerializer<byte[]>.Default, cancellationToken);
29+
return PerformPublishAsync(subject, headers, default, NatsRawSerializer<byte[]>.Default, replyTo, cancellationToken);
3230
}
3331

3432
/// <inheritdoc />
@@ -42,9 +40,7 @@ public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers =
4240
{
4341
serializer ??= Opts.SerializerRegistry.GetSerializer<T>();
4442
headers?.SetReadOnly();
45-
return ConnectionState != NatsConnectionState.Open
46-
? ConnectAndPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken)
47-
: CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken);
43+
return PerformPublishAsync(subject, headers, data, serializer, replyTo, cancellationToken);
4844
}
4945
catch (Exception ex)
5046
{
@@ -55,18 +51,71 @@ public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers =
5551

5652
serializer ??= Opts.SerializerRegistry.GetSerializer<T>();
5753
headers?.SetReadOnly();
58-
return ConnectionState != NatsConnectionState.Open
59-
? ConnectAndPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken)
60-
: CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken);
54+
return PerformPublishAsync<T>(subject, headers, data, serializer, replyTo, cancellationToken);
6155
}
6256

6357
/// <inheritdoc />
6458
public ValueTask PublishAsync<T>(in NatsMsg<T> msg, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) =>
6559
PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, serializer, opts, cancellationToken);
6660

67-
private async ValueTask ConnectAndPublishAsync<T>(string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
61+
private async ValueTask ConnectAndPublishAsync(string subject, ReadOnlyMemory<byte>? headers, ReadOnlyMemory<byte> data, string? replyTo, CancellationToken cancellationToken)
6862
{
6963
await ConnectAsync().AsTask().WaitAsync(cancellationToken).ConfigureAwait(false);
70-
await CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false);
64+
await CommandWriter.PublishAsync(subject, headers, data, replyTo, cancellationToken).ConfigureAwait(false);
65+
}
66+
67+
private ValueTask PerformPublishAsync<T>(string subject, NatsHeaders? headers, T? data, INatsSerialize<T> serializer, string? replyTo = default, CancellationToken cancellationToken = default)
68+
{
69+
NatsPooledBufferWriter<byte>? headersBuffer = null;
70+
NatsPooledBufferWriter<byte>? payloadBuffer = null;
71+
72+
ValueTask task;
73+
74+
try
75+
{
76+
if (headers != null)
77+
{
78+
headers?.SetReadOnly();
79+
if (!_pool.TryRent(out headersBuffer))
80+
headersBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
81+
}
82+
83+
if (!_pool.TryRent(out payloadBuffer!))
84+
payloadBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
85+
86+
if (headers != null)
87+
_headerWriter.Write(headersBuffer!, headers);
88+
89+
if (data != null)
90+
serializer.Serialize(payloadBuffer, data);
91+
task = DoPublishAsync(subject, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory, replyTo, cancellationToken);
92+
}
93+
catch (Exception ex)
94+
{
95+
throw;
96+
}
97+
finally
98+
{
99+
if (payloadBuffer != null)
100+
{
101+
payloadBuffer.Reset();
102+
_pool.Return(payloadBuffer);
103+
}
104+
105+
if (headersBuffer != null)
106+
{
107+
headersBuffer.Reset();
108+
_pool.Return(headersBuffer);
109+
}
110+
}
111+
112+
return task;
113+
}
114+
115+
private ValueTask DoPublishAsync(string subject, ReadOnlyMemory<byte>? headers = default, ReadOnlyMemory<byte> data = default, string? replyTo = default, CancellationToken cancellationToken = default)
116+
{
117+
return ConnectionState != NatsConnectionState.Open
118+
? ConnectAndPublishAsync(subject, headers, data, replyTo, cancellationToken)
119+
: CommandWriter.PublishAsync(subject, headers, data, replyTo, cancellationToken);
71120
}
72121
}

0 commit comments

Comments
 (0)