Skip to content

Commit 6ea40f6

Browse files
committed
More optimisation based on review
Signed-off-by: James Thompson <[email protected]>
1 parent b3e5f0a commit 6ea40f6

24 files changed

+279
-299
lines changed

src/NATS.Client.Core/Commands/CommandWriter.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,6 @@ public ValueTask PublishAsync(NatsPublishProps props, ReadOnlyMemory<byte> paylo
291291
}
292292

293293
var size = payloadBuffer.Length + (headersBuffer?.Length ?? 0);
294-
Activity.Current?.AddTag("messaging.message.body.size", payloadBuffer.Length);
295-
Activity.Current?.AddTag("messaging.message.envelope.size", size);
296294
if (_connection.ServerInfo is { } info && size > info.MaxPayload)
297295
{
298296
throw new NatsPayloadTooLargeException($"Payload size {size} exceeds server's maximum payload size {info.MaxPayload}");

src/NATS.Client.Core/Internal/InboxSub.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ public InboxSub(
2323
}
2424

2525
// Avoid base class error handling since inboxed subscribers will be responsible for that.
26-
public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
27-
_inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);
26+
public override ValueTask ReceiveAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
27+
_inbox.ReceivedAsync(props, headersBuffer, payloadBuffer, _connection);
2828

2929
// Not used. Dummy implementation to keep base happy.
30-
protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
30+
protected override ValueTask ReceiveInternalAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
3131
=> default;
3232

3333
protected override void TryComplete()
@@ -119,11 +119,11 @@ public ValueTask RegisterAsync(NatsSubBase sub)
119119
return sub.ReadyAsync();
120120
}
121121

122-
public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
122+
public async ValueTask ReceivedAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
123123
{
124-
if (!_bySubject.TryGetValue(subject, out var subTable))
124+
if (!_bySubject.TryGetValue(props.Subject, out var subTable))
125125
{
126-
_logger.LogWarning(NatsLogEvents.InboxSubscription, "Unregistered message inbox received for {Subject}", subject);
126+
_logger.LogWarning(NatsLogEvents.InboxSubscription, "Unregistered message inbox received for {Subject}", props.Subject);
127127
return;
128128
}
129129

@@ -132,13 +132,13 @@ public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySe
132132
{
133133
if (weakReference.TryGetTarget(out var sub))
134134
{
135-
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
135+
await sub.ReceiveAsync(props, headersBuffer, payloadBuffer).ConfigureAwait(false);
136136
}
137137
}
138138
#else
139139
foreach (var (sub, _) in subTable)
140140
{
141-
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
141+
await sub.ReceiveAsync(props, headersBuffer, payloadBuffer).ConfigureAwait(false);
142142
}
143143
#endif
144144
}

src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,10 @@ private async Task ReadLoopAsync()
220220
}
221221

222222
Telemetry.AddPendingMessages(1, tags);
223+
Telemetry.RecordReceivedBytes(props.TotalEnvelopeLength, tags);
223224
await _connection.PublishToClientHandlersAsync(props, null, payloadBuffer).ConfigureAwait(false);
224225
Telemetry.AddPendingMessages(-1, tags);
225226
Telemetry.AddReceivedMessages(1, tags);
226-
Telemetry.RecordReceivedBytes(props.TotalEnvelopeLength, tags);
227227
}
228228
else if (code == ServerOpCodes.HMsg)
229229
{
@@ -252,8 +252,6 @@ private async Task ReadLoopAsync()
252252

253253
var props = ParseHMessageHeader(msgHeader);
254254

255-
props.InboxPrefix = _connection.InboxPrefix;
256-
257255
if (_trace)
258256
{
259257
_logger.LogTrace(NatsLogEvents.Protocol, "HMSG trace parsed: {Subject} {Sid} {ReplyTo} {HeadersLength} {TotalLength}", props.Subject, props.SubscriptionId, props.ReplyTo, props.HeaderLength, props.TotalMessageLength);
@@ -289,11 +287,11 @@ private async Task ReadLoopAsync()
289287
var payloadSlice = totalSlice.Slice(props.HeaderLength, props.PayloadLength);
290288

291289
Telemetry.AddPendingMessages(1, tags);
290+
Telemetry.RecordReceivedBytes(props.TotalEnvelopeLength, tags);
292291
await _connection.PublishToClientHandlersAsync(props, headerSlice, payloadSlice)
293292
.ConfigureAwait(false);
294293
Telemetry.AddPendingMessages(-1, tags);
295294
Telemetry.AddReceivedMessages(1, tags);
296-
Telemetry.RecordReceivedBytes(props.TotalEnvelopeLength, tags);
297295
}
298296
else
299297
{
@@ -478,15 +476,15 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
478476
// MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]
479477
private NatsProcessProps ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
480478
{
481-
var metaLength = msgHeader.Length;
479+
var framingLength = msgHeader.Length;
482480
msgHeader = msgHeader.Slice(4);
483481
msgHeader.Split(out var subjectBytes, out msgHeader);
484482
msgHeader.Split(out var sidBytes, out msgHeader);
485483
msgHeader.Split(out var replyToOrSizeBytes, out msgHeader);
486484

487-
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes))
485+
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes), _connection.InboxPrefix)
488486
{
489-
MetaLength = metaLength,
487+
FramingLength = framingLength,
490488
HeaderLength = 0,
491489
};
492490

@@ -531,17 +529,17 @@ private NatsProcessProps ParseMessageHeader(in ReadOnlySequence<byte> msgHeader)
531529
private NatsProcessProps ParseHMessageHeader(ReadOnlySpan<byte> msgHeader)
532530
{
533531
// 'HMSG' literal
534-
var metaLength = msgHeader.Length;
532+
var framingLength = msgHeader.Length;
535533
msgHeader.Split(out _, out msgHeader);
536534

537535
msgHeader.Split(out var subjectBytes, out msgHeader);
538536
msgHeader.Split(out var sidBytes, out msgHeader);
539537
msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader);
540538
msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader);
541539

542-
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes))
540+
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes), _connection.InboxPrefix)
543541
{
544-
MetaLength = metaLength,
542+
FramingLength = framingLength,
545543
};
546544

547545
// We don't have the optional reply-to field
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using System.Buffers;
2+
3+
namespace NATS.Client.Core.Internal;
4+
5+
public readonly record struct NatsRecievedEvent
6+
{
7+
public NatsRecievedEvent(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payload)
8+
{
9+
Props = props;
10+
HeadersBuffer = headersBuffer;
11+
Payload = payload;
12+
}
13+
14+
public NatsProcessProps Props { get; }
15+
16+
public ReadOnlySequence<byte>? HeadersBuffer { get; }
17+
18+
public ReadOnlySequence<byte> Payload { get; }
19+
}

src/NATS.Client.Core/Internal/ReplyTask.cs

Lines changed: 12 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,27 @@
11
using System.Buffers;
2-
using System.Buffers.Text;
32
using System.Collections.Concurrent;
4-
using System.Text;
53

64
namespace NATS.Client.Core.Internal;
75

8-
internal sealed class ReplyTask<T> : ReplyTaskBase, IDisposable
6+
internal sealed class ReplyTask : ReplyTaskBase, IDisposable
97
{
108
private readonly object _gate;
119
private readonly ReplyTaskFactory _factory;
12-
private readonly long _id;
13-
private readonly NatsConnection _connection;
14-
private readonly INatsDeserialize<T> _deserializer;
10+
private readonly long _subjectNum;
1511
private readonly TimeSpan _requestTimeout;
1612
private readonly TaskCompletionSource _tcs;
17-
private NatsMsg<T> _msg;
13+
private NatsRecievedEvent _msg;
1814

19-
public ReplyTask(ReplyTaskFactory factory, long id, string subject, NatsConnection connection, INatsDeserialize<T> deserializer, TimeSpan requestTimeout)
15+
public ReplyTask(ReplyTaskFactory factory, long subjectNum, TimeSpan requestTimeout)
2016
{
2117
_factory = factory;
22-
_id = id;
23-
Subject = subject;
24-
_connection = connection;
25-
_deserializer = deserializer;
18+
_subjectNum = subjectNum;
2619
_requestTimeout = requestTimeout;
2720
_tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
2821
_gate = new object();
2922
}
3023

31-
public string Subject { get; }
32-
33-
public async ValueTask<NatsMsg<T>> GetResultAsync(CancellationToken cancellationToken)
24+
public async ValueTask<NatsRecievedEvent> GetResultAsync(CancellationToken cancellationToken)
3425
{
3526
try
3627
{
@@ -53,13 +44,13 @@ public override void SetResult(NatsProcessProps props, ReadOnlySequence<byte> pa
5344
{
5445
lock (_gate)
5546
{
56-
_msg = NatsMsg<T>.Build(Subject, props.ReplyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
47+
_msg = new NatsRecievedEvent(props, headersBuffer, payload);
5748
}
5849

5950
_tcs.TrySetResult();
6051
}
6152

62-
public void Dispose() => _factory.Return(_id);
53+
public void Dispose() => _factory.Return(_subjectNum);
6354
}
6455

6556
internal abstract class ReplyTaskBase
@@ -69,55 +60,18 @@ internal abstract class ReplyTaskBase
6960

7061
internal sealed class ReplyTaskFactory
7162
{
72-
private readonly byte[] _inboxPrefix;
73-
private readonly string _inboxPrefixString;
74-
private readonly NatsConnection _connection;
7563
private readonly ConcurrentDictionary<long, ReplyTaskBase> _replies;
76-
private readonly INatsSerializerRegistry _serializerRegistry;
77-
private readonly TimeSpan _requestTimeout;
78-
private readonly int _subjectMaxLength;
79-
private readonly bool _allocSubject;
8064
private long _nextId;
8165

82-
public ReplyTaskFactory(NatsConnection connection)
66+
public ReplyTaskFactory()
8367
{
84-
_connection = connection;
85-
_inboxPrefixString = _connection.InboxPrefix + ".";
86-
_inboxPrefix = Encoding.UTF8.GetBytes(_inboxPrefixString);
87-
_subjectMaxLength = _inboxPrefix.Length + 20; // 20 digits for long
88-
_allocSubject = _subjectMaxLength < 128;
89-
_serializerRegistry = _connection.Opts.SerializerRegistry;
90-
_requestTimeout = _connection.Opts.RequestTimeout;
9168
_replies = new ConcurrentDictionary<long, ReplyTaskBase>();
9269
}
9370

94-
public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deserializer, TimeSpan? requestTimeout)
71+
public ReplyTask CreateReplyTask(TimeSpan requestTimeout)
9572
{
96-
deserializer ??= _serializerRegistry.GetDeserializer<TReply>();
9773
var id = Interlocked.Increment(ref _nextId);
98-
99-
string subject;
100-
if (_allocSubject)
101-
{
102-
Span<byte> buffer = stackalloc byte[_subjectMaxLength];
103-
_inboxPrefix.CopyTo(buffer);
104-
var idSpan = buffer.Slice(_inboxPrefix.Length);
105-
if (Utf8Formatter.TryFormat(id, idSpan, out var written))
106-
{
107-
var subjectSpan = buffer.Slice(0, written + _inboxPrefix.Length);
108-
subject = Encoding.UTF8.GetString(subjectSpan);
109-
}
110-
else
111-
{
112-
subject = _inboxPrefixString + id;
113-
}
114-
}
115-
else
116-
{
117-
subject = _inboxPrefixString + id;
118-
}
119-
120-
var rt = new ReplyTask<TReply>(this, id, subject, _connection, deserializer, requestTimeout ?? _requestTimeout);
74+
var rt = new ReplyTask(this, id, requestTimeout);
12175
_replies.TryAdd(id, rt);
12276
return rt;
12377
}
@@ -126,7 +80,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
12680

12781
public bool TrySetResult(NatsProcessProps props, in ReadOnlySequence<byte> payloadBuffer, in ReadOnlySequence<byte>? headersBuffer)
12882
{
129-
if (_replies.TryGetValue(props.InboxId, out var rt))
83+
if (_replies.TryGetValue(props.SubjectNumber, out var rt))
13084
{
13185
rt.SetResult(props, payloadBuffer, headersBuffer);
13286
return true;

src/NATS.Client.Core/Internal/SocketReader.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize)
6464
}
6565

6666
totalRead += read;
67-
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToArray());
6867
_seqeunceBuilder.Append(_availableMemory.Slice(0, read));
6968
_availableMemory = _availableMemory.Slice(read);
7069
}
@@ -110,7 +109,6 @@ public async ValueTask<ReadOnlySequence<byte>> ReadUntilReceiveNewLineAsync()
110109
throw ex;
111110
}
112111

113-
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToArray());
114112
var appendMemory = _availableMemory.Slice(0, read);
115113
_seqeunceBuilder.Append(appendMemory);
116114
_availableMemory = _availableMemory.Slice(read);

0 commit comments

Comments
 (0)