Skip to content

Commit a568cfe

Browse files
committed
#316 Start work on metrics
Signed-off-by: James Thompson <[email protected]>
1 parent 56b4095 commit a568cfe

11 files changed

+192
-143
lines changed

src/NATS.Client.Core/Commands/CommandWriter.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ internal sealed class CommandWriter : IAsyncDisposable
3737
private readonly int _arrayPoolInitialSize;
3838
private readonly object _lock = new();
3939
private readonly CancellationTokenSource _cts;
40-
private readonly ConnectionStatsCounter _counter;
4140
private readonly Memory<byte> _consolidateMem = new byte[SendMemSize].AsMemory();
4241
private readonly TimeSpan _defaultCommandTimeout;
4342
private readonly Action<PingCommand> _enqueuePing;
@@ -55,7 +54,7 @@ internal sealed class CommandWriter : IAsyncDisposable
5554
private CancellationTokenSource? _ctsReader;
5655
private volatile bool _disposed;
5756

58-
public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
57+
public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
5958
{
6059
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
6160
_trace = _logger.IsEnabled(LogLevel.Trace);
@@ -67,7 +66,6 @@ public CommandWriter(string name, NatsConnection connection, ObjectPool pool, Na
6766
// avoid defining another option.
6867
_arrayPoolInitialSize = opts.WriterBufferSize / 256;
6968

70-
_counter = counter;
7169
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
7270
_enqueuePing = enqueuePing;
7371
_protocolWriter = new ProtocolWriter(opts.SubjectEncoding);
@@ -321,6 +319,8 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
321319
serializer.Serialize(payloadBuffer, value);
322320

323321
var size = payloadBuffer.WrittenMemory.Length + (headersBuffer?.WrittenMemory.Length ?? 0);
322+
Activity.Current?.AddTag("messaging.message.body.size", payloadBuffer.WrittenMemory.Length);
323+
Activity.Current?.AddTag("messaging.message.envelope.size", size);
324324
if (_connection.ServerInfo is { } info && size > info.MaxPayload)
325325
{
326326
throw new NatsPayloadTooLargeException($"Payload size {size} exceeds server's maximum payload size {info.MaxPayload}");
@@ -693,11 +693,19 @@ private void EnqueueCommand()
693693
return;
694694
}
695695

696-
Interlocked.Add(ref _counter.PendingMessages, 1);
696+
Telemetry.AddPendingMessages(1, Activity.Current.TagObjects.ToArray());
697697

698698
_channelSize.Writer.TryWrite(size);
699699
var flush = _pipeWriter.FlushAsync();
700-
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();
700+
if (flush.IsCompletedSuccessfully)
701+
{
702+
_flushTask = null;
703+
Telemetry.AddPendingMessages(-1, Activity.Current.TagObjects.ToArray());
704+
}
705+
else
706+
{
707+
_flushTask = flush.AsTask();
708+
}
701709
}
702710

703711
private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken)

src/NATS.Client.Core/Commands/PriorityCommandWriter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable
66
{
77
private int _disposed;
88

9-
public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, SocketConnectionWrapper socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing)
9+
public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, SocketConnectionWrapper socketConnection, NatsOpts opts, Action<PingCommand> enqueuePing)
1010
{
11-
CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing);
11+
CommandWriter = new CommandWriter("init", connection, pool, opts, enqueuePing);
1212
CommandWriter.Reset(socketConnection);
1313
}
1414

src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public NatsReadProtocolProcessor(SocketConnectionWrapper socketConnection, NatsC
3333
_waitForPongOrErrorSignal = waitForPongOrErrorSignal;
3434
_infoParsed = infoParsed;
3535
_pingCommands = new ConcurrentQueue<PingCommand>();
36-
_socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Counter, connection.Opts.LoggerFactory);
36+
_socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Opts.LoggerFactory);
3737
_readLoop = Task.Run(ReadLoopAsync);
3838
}
3939

@@ -156,7 +156,7 @@ private async Task ReadLoopAsync()
156156
code = GetCode(buffer);
157157
}
158158

159-
Interlocked.Increment(ref _connection.Counter.ReceivedMessages);
159+
Telemetry.AddReceivedMessages(1, Activity.Current.TagObjects.ToArray());
160160

161161
// Optimize for Msg parsing, Inline async code
162162
if (code == ServerOpCodes.Msg)
@@ -445,7 +445,7 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
445445
{
446446
// reaches invalid line, log warn and try to get newline and go to nextloop.
447447
_logger.LogWarning(NatsLogEvents.Protocol, "Reached invalid line");
448-
Interlocked.Decrement(ref _connection.Counter.ReceivedMessages);
448+
Telemetry.AddReceivedMessages(-1, Activity.Current.TagObjects.ToArray());
449449

450450
var position = buffer.PositionOf((byte)'\n');
451451
if (position == null)

src/NATS.Client.Core/Internal/SocketReader.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ namespace NATS.Client.Core.Internal;
99
internal sealed class SocketReader
1010
{
1111
private readonly int _minimumBufferSize;
12-
private readonly ConnectionStatsCounter _counter;
1312
private readonly SeqeunceBuilder _seqeunceBuilder = new SeqeunceBuilder();
1413
private readonly Stopwatch _stopwatch = new Stopwatch();
1514
private readonly ILogger<SocketReader> _logger;
@@ -18,11 +17,10 @@ internal sealed class SocketReader
1817

1918
private Memory<byte> _availableMemory;
2019

21-
public SocketReader(SocketConnectionWrapper socketConnection, int minimumBufferSize, ConnectionStatsCounter counter, ILoggerFactory loggerFactory)
20+
public SocketReader(SocketConnectionWrapper socketConnection, int minimumBufferSize, ILoggerFactory loggerFactory)
2221
{
2322
_socketConnection = socketConnection;
2423
_minimumBufferSize = minimumBufferSize;
25-
_counter = counter;
2624
_logger = loggerFactory.CreateLogger<SocketReader>();
2725
_isTraceLogging = _logger.IsEnabled(LogLevel.Trace);
2826
}
@@ -66,7 +64,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize)
6664
}
6765

6866
totalRead += read;
69-
Interlocked.Add(ref _counter.ReceivedBytes, read);
67+
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToArray());
7068
_seqeunceBuilder.Append(_availableMemory.Slice(0, read));
7169
_availableMemory = _availableMemory.Slice(read);
7270
}
@@ -112,7 +110,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadUntilReceiveNewLineAsync()
112110
throw ex;
113111
}
114112

115-
Interlocked.Add(ref _counter.ReceivedBytes, read);
113+
Telemetry.RecordReceivedBytes(read, Activity.Current.TagObjects.ToArray());
116114
var appendMemory = _availableMemory.Slice(0, read);
117115
_seqeunceBuilder.Append(appendMemory);
118116
_availableMemory = _availableMemory.Slice(read);

src/NATS.Client.Core/Internal/SubscriptionManager.cs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Buffers;
22
using System.Collections.Concurrent;
33
using System.Runtime.CompilerServices;
4+
using System.Threading;
45
using Microsoft.Extensions.Logging;
56
using NATS.Client.Core.Commands;
67

@@ -51,41 +52,27 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
5152

5253
public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationToken)
5354
{
54-
if (Telemetry.HasListeners())
55+
ValueTask task;
56+
using var activity = Telemetry.StartSendActivity($"{_connection.SpanDestinationName(sub.Subject)} {Telemetry.Constants.SubscribeActivityName}", _connection, sub.Subject, null, null);
57+
if (activity != null)
5558
{
56-
using var activity = Telemetry.StartSendActivity($"{_connection.SpanDestinationName(sub.Subject)} {Telemetry.Constants.SubscribeActivityName}", _connection, sub.Subject, null, null);
5759
try
5860
{
59-
if (IsInboxSubject(sub.Subject))
60-
{
61-
if (sub.QueueGroup != null)
62-
{
63-
throw new NatsException("Inbox subscriptions don't support queue groups");
64-
}
65-
66-
return SubscribeInboxAsync(sub, cancellationToken);
67-
}
68-
69-
return SubscribeInternalAsync(sub.Subject, sub.QueueGroup, sub.Opts, sub, cancellationToken);
61+
task = SubscribeImpAsync(sub, cancellationToken);
62+
Telemetry.RecordOperationDuration(activity.Duration.TotalSeconds, activity.TagObjects.ToArray());
7063
}
7164
catch (Exception ex)
7265
{
7366
Telemetry.SetException(activity, ex);
7467
throw;
7568
}
7669
}
77-
78-
if (IsInboxSubject(sub.Subject))
70+
else
7971
{
80-
if (sub.QueueGroup != null)
81-
{
82-
throw new NatsException("Inbox subscriptions don't support queue groups");
83-
}
84-
85-
return SubscribeInboxAsync(sub, cancellationToken);
72+
task = SubscribeImpAsync(sub, cancellationToken);
8673
}
8774

88-
return SubscribeInternalAsync(sub.Subject, sub.QueueGroup, sub.Opts, sub, cancellationToken);
75+
return task;
8976
}
9077

9178
public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
@@ -262,6 +249,21 @@ await SubscribeInternalAsync(
262249
}
263250
}
264251

252+
private ValueTask SubscribeImpAsync(NatsSubBase sub, CancellationToken cancellationToken)
253+
{
254+
if (IsInboxSubject(sub.Subject))
255+
{
256+
if (sub.QueueGroup != null)
257+
{
258+
throw new NatsException("Inbox subscriptions don't support queue groups");
259+
}
260+
261+
return SubscribeInboxAsync(sub, cancellationToken);
262+
}
263+
264+
return SubscribeInternalAsync(sub.Subject, sub.QueueGroup, sub.Opts, sub, cancellationToken);
265+
}
266+
265267
private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
266268
{
267269
await InitializeInboxSubscriptionAsync(cancellationToken).ConfigureAwait(false);

src/NATS.Client.Core/Internal/Telemetry.cs

Lines changed: 100 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,58 @@
11
using System.Diagnostics;
2+
using System.Diagnostics.Metrics;
23

34
namespace NATS.Client.Core.Internal;
45

56
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging/
67
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
78
internal static class Telemetry
89
{
9-
internal static readonly ActivitySource NatsActivities = new(name: NatsActivitySource);
10-
1110
private const string NatsActivitySource = "NATS.Net";
12-
private static readonly object BoxedTrue = true;
11+
private const string MeterName = "NATS.Net";
12+
private static readonly ActivitySource NatsActivities = new(name: NatsActivitySource);
13+
private static readonly Meter NatsMeter = new Meter(MeterName);
14+
15+
private static readonly Counter<long> _subscriptionCounter = NatsMeter.CreateCounter<long>(
16+
Constants.SubscriptionInstrumentName,
17+
unit: "{subscriptions}",
18+
description: "Number of subscriptions");
19+
20+
private static readonly Counter<long> _pendingMessagesCounter = NatsMeter.CreateCounter<long>(
21+
Constants.PendingMessagesInstrumentName,
22+
unit: "{messages}",
23+
description: "Number of pending messages");
24+
25+
private static readonly Histogram<long> _sentBytesHistogram = NatsMeter.CreateHistogram<long>(
26+
Constants.SentBytesInstrumentName,
27+
unit: "{bytes}",
28+
description: "Number of bytes sent");
29+
30+
private static readonly Histogram<long> _receivedBytesHistogram = NatsMeter.CreateHistogram<long>(
31+
Constants.ReceivedBytesInstrumentName,
32+
unit: "{bytes}",
33+
description: "Number of bytes received");
34+
35+
private static readonly Counter<long> _sentMessagesCounter = NatsMeter.CreateCounter<long>(
36+
Constants.SentMessagesInstrumentName,
37+
unit: "{messages}",
38+
description: "Number of messages sent");
39+
40+
private static readonly Counter<long> _receivedMessagesCounter = NatsMeter.CreateCounter<long>(
41+
Constants.ReceivedMessagesInstrumentName,
42+
unit: "{messages}",
43+
description: "Number of messages received");
44+
45+
private static readonly Histogram<double> _durationOperationHistogram = NatsMeter.CreateHistogram<double>(
46+
Constants.DurationOperationInstrumentName,
47+
unit: "{s}",
48+
description: "Duration of messaging operation initiated by a producer or consumer client.");
49+
50+
private static readonly Histogram<double> _durationProcessHistogram = NatsMeter.CreateHistogram<double>(
51+
Constants.DurationProcessInstrumentName,
52+
unit: "{s}",
53+
description: "Duration of processing operation within client.");
1354

14-
internal static bool HasListeners() => NatsActivities.HasListeners();
55+
private static readonly object BoxedTrue = true;
1556

1657
internal static Activity? StartSendActivity(
1758
string name,
@@ -20,9 +61,6 @@ internal static class Telemetry
2061
string? replyTo,
2162
ActivityContext? parentContext = null)
2263
{
23-
if (!NatsActivities.HasListeners())
24-
return null;
25-
2664
KeyValuePair<string, object?>[] tags;
2765
if (connection is NatsConnection { ServerInfo: not null } conn)
2866
{
@@ -106,9 +144,6 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
106144
long size,
107145
NatsHeaders? headers)
108146
{
109-
if (!NatsActivities.HasListeners())
110-
return null;
111-
112147
KeyValuePair<string, object?>[] tags;
113148
if (connection is NatsConnection { ServerInfo: not null } conn)
114149
{
@@ -216,6 +251,51 @@ static string GetStackTrace(Exception? exception)
216251
}
217252
}
218253

254+
internal static void IncrementSubscriptionCount(KeyValuePair<string, object?>[] tags)
255+
{
256+
_subscriptionCounter.Add(1, tags);
257+
}
258+
259+
internal static void DecrementSubscriptionCount(KeyValuePair<string, object?>[] tags)
260+
{
261+
_subscriptionCounter.Add(-1, tags);
262+
}
263+
264+
internal static void AddPendingMessages(long messages, KeyValuePair<string, object?>[] tags)
265+
{
266+
_pendingMessagesCounter.Add(messages, tags);
267+
}
268+
269+
internal static void RecordSentBytes(long bytes, KeyValuePair<string, object?>[] tags)
270+
{
271+
_sentBytesHistogram.Record(bytes, tags);
272+
}
273+
274+
internal static void RecordReceivedBytes(long bytes, KeyValuePair<string, object?>[] tags)
275+
{
276+
_receivedBytesHistogram.Record(bytes, tags);
277+
}
278+
279+
internal static void AddSentMessages(long messages, KeyValuePair<string, object?>[] tags)
280+
{
281+
_sentMessagesCounter.Add(messages, tags);
282+
}
283+
284+
internal static void AddReceivedMessages(long messages, KeyValuePair<string, object?>[] tags)
285+
{
286+
_receivedMessagesCounter.Add(messages, tags);
287+
}
288+
289+
internal static void RecordOperationDuration(double duration, KeyValuePair<string, object?>[] tags)
290+
{
291+
_durationOperationHistogram.Record(duration, tags);
292+
}
293+
294+
internal static void RecordProcessDuration(long bytes, KeyValuePair<string, object?>[] tags)
295+
{
296+
_durationProcessHistogram.Record(bytes, tags);
297+
}
298+
219299
private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContext context)
220300
{
221301
DistributedContextPropagator.Current.ExtractTraceIdAndState(
@@ -278,7 +358,7 @@ internal class Constants
278358
public const string DestIsTemporary = "messaging.destination.temporary";
279359
public const string DestPubName = "messaging.destination_publish.name";
280360

281-
public const string QueueGroup = "messaging.nats.consumer.group";
361+
public const string QueueGroup = "messaging.consumer.group.name";
282362
public const string ReplyTo = "messaging.nats.message.reply_to";
283363
public const string Subject = "messaging.nats.message.subject";
284364

@@ -289,5 +369,14 @@ internal class Constants
289369
public const string NetworkPeerAddress = "network.peer.address";
290370
public const string NetworkPeerPort = "network.peer.port";
291371
public const string NetworkLocalAddress = "network.local.address";
372+
373+
public const string PendingMessagesInstrumentName = $"messaging.client.pending.messages";
374+
public const string SentBytesInstrumentName = $"messaging.client.sent.bytes";
375+
public const string ReceivedBytesInstrumentName = $"messaging.client.consumed.bytes";
376+
public const string SentMessagesInstrumentName = $"messaging.client.sent.messages";
377+
public const string ReceivedMessagesInstrumentName = $"messaging.client.consumed.messages";
378+
public const string SubscriptionInstrumentName = $"messaging.client.nats.subscription.count";
379+
public const string DurationOperationInstrumentName = $"messaging.client.operation.duration";
380+
public const string DurationProcessInstrumentName = $"messaging.process.duration";
292381
}
293382
}

0 commit comments

Comments
 (0)