Skip to content

Commit d1c1501

Browse files
committed
More optimisation based on review
1 parent 2afd4f0 commit d1c1501

22 files changed

+213
-241
lines changed

src/NATS.Client.Core/Internal/InboxSub.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ public InboxSub(
2323
}
2424

2525
// Avoid base class error handling since inboxed subscribers will be responsible for that.
26-
public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
27-
_inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);
26+
public override ValueTask ReceiveAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
27+
_inbox.ReceivedAsync(props, headersBuffer, payloadBuffer, _connection);
2828

2929
// Not used. Dummy implementation to keep base happy.
30-
protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
30+
protected override ValueTask ReceiveInternalAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
3131
=> default;
3232

3333
protected override void TryComplete()
@@ -119,11 +119,11 @@ public ValueTask RegisterAsync(NatsSubBase sub)
119119
return sub.ReadyAsync();
120120
}
121121

122-
public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
122+
public async ValueTask ReceivedAsync(NatsProcessProps props, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
123123
{
124-
if (!_bySubject.TryGetValue(subject, out var subTable))
124+
if (!_bySubject.TryGetValue(props.Subject, out var subTable))
125125
{
126-
_logger.LogWarning(NatsLogEvents.InboxSubscription, "Unregistered message inbox received for {Subject}", subject);
126+
_logger.LogWarning(NatsLogEvents.InboxSubscription, "Unregistered message inbox received for {Subject}", props.Subject);
127127
return;
128128
}
129129

@@ -132,13 +132,13 @@ public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySe
132132
{
133133
if (weakReference.TryGetTarget(out var sub))
134134
{
135-
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
135+
await sub.ReceiveAsync(props, headersBuffer, payloadBuffer).ConfigureAwait(false);
136136
}
137137
}
138138
#else
139139
foreach (var (sub, _) in subTable)
140140
{
141-
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
141+
await sub.ReceiveAsync(props, headersBuffer, payloadBuffer).ConfigureAwait(false);
142142
}
143143
#endif
144144
}

src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,6 @@ private async Task ReadLoopAsync()
252252

253253
var props = ParseHMessageHeader(msgHeader);
254254

255-
props.InboxPrefix = _connection.InboxPrefix;
256-
257255
if (_trace)
258256
{
259257
_logger.LogTrace(NatsLogEvents.Protocol, "HMSG trace parsed: {Subject} {Sid} {ReplyTo} {HeadersLength} {TotalLength}", props.Subject, props.SubscriptionId, props.ReplyTo, props.HeaderLength, props.TotalMessageLength);
@@ -478,15 +476,15 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
478476
// MSG <subject> <sid> [reply-to] <#bytes>\r\n[payload]
479477
private NatsProcessProps ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
480478
{
481-
var metaLength = msgHeader.Length;
479+
var framingLength = msgHeader.Length;
482480
msgHeader = msgHeader.Slice(4);
483481
msgHeader.Split(out var subjectBytes, out msgHeader);
484482
msgHeader.Split(out var sidBytes, out msgHeader);
485483
msgHeader.Split(out var replyToOrSizeBytes, out msgHeader);
486484

487-
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes))
485+
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes), _connection.InboxPrefix)
488486
{
489-
MetaLength = metaLength,
487+
FramingLength = framingLength,
490488
HeaderLength = 0,
491489
};
492490

@@ -531,17 +529,17 @@ private NatsProcessProps ParseMessageHeader(in ReadOnlySequence<byte> msgHeader)
531529
private NatsProcessProps ParseHMessageHeader(ReadOnlySpan<byte> msgHeader)
532530
{
533531
// 'HMSG' literal
534-
var metaLength = msgHeader.Length;
532+
var framingLength = msgHeader.Length;
535533
msgHeader.Split(out _, out msgHeader);
536534

537535
msgHeader.Split(out var subjectBytes, out msgHeader);
538536
msgHeader.Split(out var sidBytes, out msgHeader);
539537
msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader);
540538
msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader);
541539

542-
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes))
540+
var props = new NatsProcessProps(Encoding.ASCII.GetString(subjectBytes), GetInt32(sidBytes), _connection.InboxPrefix)
543541
{
544-
MetaLength = metaLength,
542+
FramingLength = framingLength,
545543
};
546544

547545
// We don't have the optional reply-to field

src/NATS.Client.Core/Internal/ReplyTask.cs

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,30 @@
11
using System.Buffers;
2-
using System.Buffers.Text;
32
using System.Collections.Concurrent;
4-
using System.Text;
53

64
namespace NATS.Client.Core.Internal;
75

86
internal sealed class ReplyTask<T> : ReplyTaskBase, IDisposable
97
{
108
private readonly object _gate;
119
private readonly ReplyTaskFactory _factory;
12-
private readonly long _id;
10+
private readonly long _subjectNum;
1311
private readonly NatsConnection _connection;
1412
private readonly INatsDeserialize<T> _deserializer;
1513
private readonly TimeSpan _requestTimeout;
1614
private readonly TaskCompletionSource _tcs;
1715
private NatsMsg<T> _msg;
1816

19-
public ReplyTask(ReplyTaskFactory factory, long id, string subject, NatsConnection connection, INatsDeserialize<T> deserializer, TimeSpan requestTimeout)
17+
public ReplyTask(ReplyTaskFactory factory, long subjectNum, NatsConnection connection, INatsDeserialize<T> deserializer, TimeSpan requestTimeout)
2018
{
2119
_factory = factory;
22-
_id = id;
23-
Subject = subject;
20+
_subjectNum = subjectNum;
2421
_connection = connection;
2522
_deserializer = deserializer;
2623
_requestTimeout = requestTimeout;
2724
_tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
2825
_gate = new object();
2926
}
3027

31-
public string Subject { get; }
32-
3328
public async ValueTask<NatsMsg<T>> GetResultAsync(CancellationToken cancellationToken)
3429
{
3530
try
@@ -53,13 +48,13 @@ public override void SetResult(NatsProcessProps props, ReadOnlySequence<byte> pa
5348
{
5449
lock (_gate)
5550
{
56-
_msg = NatsMsg<T>.Build(Subject, props.ReplyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
51+
_msg = NatsMsg<T>.Build(props.Subject, props.ReplyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
5752
}
5853

5954
_tcs.TrySetResult();
6055
}
6156

62-
public void Dispose() => _factory.Return(_id);
57+
public void Dispose() => _factory.Return(_subjectNum);
6358
}
6459

6560
internal abstract class ReplyTaskBase
@@ -69,23 +64,15 @@ internal abstract class ReplyTaskBase
6964

7065
internal sealed class ReplyTaskFactory
7166
{
72-
private readonly byte[] _inboxPrefix;
73-
private readonly string _inboxPrefixString;
7467
private readonly NatsConnection _connection;
7568
private readonly ConcurrentDictionary<long, ReplyTaskBase> _replies;
7669
private readonly INatsSerializerRegistry _serializerRegistry;
7770
private readonly TimeSpan _requestTimeout;
78-
private readonly int _subjectMaxLength;
79-
private readonly bool _allocSubject;
8071
private long _nextId;
8172

8273
public ReplyTaskFactory(NatsConnection connection)
8374
{
8475
_connection = connection;
85-
_inboxPrefixString = _connection.InboxPrefix + ".";
86-
_inboxPrefix = Encoding.UTF8.GetBytes(_inboxPrefixString);
87-
_subjectMaxLength = _inboxPrefix.Length + 20; // 20 digits for long
88-
_allocSubject = _subjectMaxLength < 128;
8976
_serializerRegistry = _connection.Opts.SerializerRegistry;
9077
_requestTimeout = _connection.Opts.RequestTimeout;
9178
_replies = new ConcurrentDictionary<long, ReplyTaskBase>();
@@ -95,29 +82,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
9582
{
9683
deserializer ??= _serializerRegistry.GetDeserializer<TReply>();
9784
var id = Interlocked.Increment(ref _nextId);
98-
99-
string subject;
100-
if (_allocSubject)
101-
{
102-
Span<byte> buffer = stackalloc byte[_subjectMaxLength];
103-
_inboxPrefix.CopyTo(buffer);
104-
var idSpan = buffer.Slice(_inboxPrefix.Length);
105-
if (Utf8Formatter.TryFormat(id, idSpan, out var written))
106-
{
107-
var subjectSpan = buffer.Slice(0, written + _inboxPrefix.Length);
108-
subject = Encoding.UTF8.GetString(subjectSpan);
109-
}
110-
else
111-
{
112-
subject = _inboxPrefixString + id;
113-
}
114-
}
115-
else
116-
{
117-
subject = _inboxPrefixString + id;
118-
}
119-
120-
var rt = new ReplyTask<TReply>(this, id, subject, _connection, deserializer, requestTimeout ?? _requestTimeout);
85+
var rt = new ReplyTask<TReply>(this, id, _connection, deserializer, requestTimeout ?? _requestTimeout);
12186
_replies.TryAdd(id, rt);
12287
return rt;
12388
}
@@ -126,7 +91,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
12691

12792
public bool TrySetResult(NatsProcessProps props, in ReadOnlySequence<byte> payloadBuffer, in ReadOnlySequence<byte>? headersBuffer)
12893
{
129-
if (_replies.TryGetValue(props.InboxId, out var rt))
94+
if (_replies.TryGetValue(props.SubjectNumber, out var rt))
13095
{
13196
rt.SetResult(props, payloadBuffer, headersBuffer);
13297
return true;

src/NATS.Client.Core/Internal/SocketReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize)
6464
}
6565

6666
totalRead += read;
67-
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToArray());
67+
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToList());
6868
_seqeunceBuilder.Append(_availableMemory.Slice(0, read));
6969
_availableMemory = _availableMemory.Slice(read);
7070
}
@@ -110,7 +110,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadUntilReceiveNewLineAsync()
110110
throw ex;
111111
}
112112

113-
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToArray());
113+
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToList());
114114
var appendMemory = _availableMemory.Slice(0, read);
115115
_seqeunceBuilder.Append(appendMemory);
116116
_availableMemory = _availableMemory.Slice(read);

src/NATS.Client.Core/Internal/SubscriptionManager.cs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System.Buffers;
22
using System.Collections.Concurrent;
33
using System.Runtime.CompilerServices;
4-
using System.Threading;
54
using Microsoft.Extensions.Logging;
65
using NATS.Client.Core.Commands;
76

@@ -42,7 +41,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
4241
_cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval;
4342
_timer = Task.Run(CleanupAsync);
4443
InboxSubBuilder = new InboxSubBuilder(connection.Opts.LoggerFactory.CreateLogger<InboxSubBuilder>());
45-
_inboxSubSentinel = new InboxSub(InboxSubBuilder, new NatsSubscriptionProps(nameof(_inboxSubSentinel)), default, connection, this);
44+
_inboxSubSentinel = new InboxSub(InboxSubBuilder, new NatsSubscriptionProps(nameof(_inboxSubSentinel), _connection.InboxPrefix), default, connection, this);
4645
_inboxSub = _inboxSubSentinel;
4746
}
4847

@@ -62,7 +61,7 @@ public ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadOnl
6261
_logger.LogTrace(NatsLogEvents.Subscription, "Received subscription data for {Subject}/{Sid}", props.Subject, props.SubscriptionId);
6362
}
6463

65-
int? orphanSid = null;
64+
var orphan = false;
6665
lock (_gate)
6766
{
6867
if (_bySid.TryGetValue(props.SubscriptionId, out var sidMetadata))
@@ -75,12 +74,12 @@ public ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadOnl
7574
_logger.LogTrace(NatsLogEvents.Subscription, "Found subscription handler for {Subject}/{Sid}", props.Subject, props.SubscriptionId);
7675
}
7776

78-
return sub.ReceiveAsync(props.Subject, props.ReplyTo, headersBuffer, payloadBuffer);
77+
return sub.ReceiveAsync(props, headersBuffer, payloadBuffer);
7978
}
8079
else
8180
{
8281
_logger.LogWarning(NatsLogEvents.Subscription, "Subscription GCd but was never disposed {Subject}/{Sid}", props.Subject, props.SubscriptionId);
83-
orphanSid = props.SubscriptionId;
82+
orphan = true;
8483
}
8584
}
8685
else
@@ -89,11 +88,11 @@ public ValueTask PublishToClientHandlersAsync(NatsProcessProps props, in ReadOnl
8988
}
9089
}
9190

92-
if (orphanSid != null)
91+
if (orphan)
9392
{
9493
try
9594
{
96-
return _connection.UnsubscribeAsync(new NatsSubscriptionProps(orphanSid.Value));
95+
return _connection.UnsubscribeAsync(new NatsSubscriptionProps(props.SubscriptionId, props.InboxPrefix));
9796
}
9897
catch (Exception e)
9998
{
@@ -148,7 +147,7 @@ public ValueTask RemoveAsync(NatsSubBase sub)
148147
_logger.LogDebug(NatsLogEvents.Subscription, "Removing subscription {Subject}/{Sid}", sub.Subject, subMetadata.Sid);
149148
}
150149

151-
return _connection.UnsubscribeAsync(sub.Props);
150+
return _connection.UnsubscribeAsync(sub.SubscriptionProps(_connection.InboxPrefix));
152151
}
153152

154153
/// <summary>
@@ -183,7 +182,7 @@ internal async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter
183182

184183
foreach (var (sub, sid) in subs)
185184
{
186-
await sub.WriteReconnectCommandsAsync(commandWriter, new NatsSubscriptionProps(sid)).ConfigureAwait(false);
185+
await sub.WriteReconnectCommandsAsync(commandWriter, new NatsSubscriptionProps(sid, _connection.InboxPrefix)).ConfigureAwait(false);
187186

188187
if (_debug)
189188
{
@@ -208,18 +207,14 @@ internal async Task InitializeInboxSubscriptionAsync(CancellationToken cancellat
208207
{
209208
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
210209
{
211-
var inboxSubject = new NatsSubscriptionProps($"{_inboxPrefix}.*");
210+
var inboxSubject = new NatsSubscriptionProps($"{_inboxPrefix}.*", _connection.InboxPrefix);
212211

213212
// We need to subscribe to the real inbox subject before we can register the internal subject.
214213
// We use 'default' options here since options provided by the user are for the internal subscription.
215214
// For example if the user provides a timeout, we don't want to timeout the real inbox subscription
216215
// since it must live duration of the connection.
217216
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts: default, _connection, manager: this);
218-
var props = new NatsSubscriptionProps(_inboxSub.Subject)
219-
{
220-
InboxPrefix = _connection.InboxPrefix,
221-
QueueGroup = _inboxSub.QueueGroup,
222-
};
217+
var props = new NatsSubscriptionProps(_inboxSub.Subject, _connection.InboxPrefix, _inboxSub.QueueGroup);
223218
await SubscribeQueueAsync(props, _inboxSub, cancellationToken).ConfigureAwait(false);
224219
}
225220
}
@@ -233,11 +228,7 @@ internal async Task InitializeInboxSubscriptionAsync(CancellationToken cancellat
233228
private ValueTask SubscribeInternalAsync(NatsSubBase sub, CancellationToken cancellationToken)
234229
{
235230
var start = DateTimeOffset.UtcNow;
236-
var props = new NatsSubscriptionProps(sub.Subject)
237-
{
238-
InboxPrefix = _connection.InboxPrefix,
239-
QueueGroup = sub.QueueGroup,
240-
};
231+
var props = new NatsSubscriptionProps(sub.Subject, _connection.InboxPrefix, sub.QueueGroup);
241232
var tags = Telemetry.GetTags(_connection.ServerInfo, props);
242233
using var activity = Telemetry.StartActivity(start, props, _connection.ServerInfo, Telemetry.Constants.SubscribeActivityName, tags);
243234
ValueTask task;
@@ -364,7 +355,7 @@ private async ValueTask UnsubscribeSidsAsync(List<int> sids)
364355
try
365356
{
366357
_logger.LogWarning(NatsLogEvents.Subscription, "Unsubscribing orphan subscription {Sid}", sid);
367-
await _connection.UnsubscribeAsync(new NatsSubscriptionProps(sid)).ConfigureAwait(false);
358+
await _connection.UnsubscribeAsync(new NatsSubscriptionProps(sid, _connection.InboxPrefix)).ConfigureAwait(false);
368359
}
369360
catch (Exception e)
370361
{

src/NATS.Client.Core/NatsConnection.Publish.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, st
1414
public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
1515
{
1616
serializer ??= Opts.SerializerRegistry.GetSerializer<T>();
17-
var props = new NatsPublishProps(subject)
18-
{
19-
InboxPrefix = InboxPrefix,
20-
};
17+
var props = new NatsPublishProps(subject, InboxPrefix);
2118
props.SetReplyTo(replyTo);
2219
return PublishInternalAsync(props, serializer, data, headers, default, cancellationToken);
2320
}

src/NATS.Client.Core/NatsConnection.RequestReply.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ private async ValueTask<NatsMsg<TReply>> RequestInternalAsync<TRequest, TReply>(
7474
NatsSubOpts? replyOpts = default,
7575
CancellationToken cancellationToken = default)
7676
{
77-
props ??= new NatsPublishProps(subject);
78-
props.InboxPrefix ??= InboxPrefix;
77+
props ??= new NatsPublishProps(subject, InboxPrefix);
7978

8079
var start = DateTimeOffset.UtcNow;
8180
var tags = Telemetry.GetTags(ServerInfo, props);

0 commit comments

Comments
 (0)