@@ -6,14 +6,13 @@ namespace NATS.Client.Core.Internal;
6
6
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
7
7
internal static class Telemetry
8
8
{
9
- internal static readonly ActivitySource NatsActivities = new ( name : NatsActivitySource ) ;
10
-
11
- private const string NatsActivitySource = "NATS.Net" ;
9
+ public const string NatsActivitySource = "NATS.Net" ;
10
+ public static readonly ActivitySource NatsActivities = new ( name : NatsActivitySource ) ;
12
11
private static readonly object BoxedTrue = true ;
13
12
14
- internal static bool HasListeners ( ) => NatsActivities . HasListeners ( ) ;
13
+ public static bool HasListeners ( ) => NatsActivities . HasListeners ( ) ;
15
14
16
- internal static Activity ? StartSendActivity (
15
+ public static Activity ? StartSendActivity (
17
16
string name ,
18
17
INatsConnection ? connection ,
19
18
string subject ,
@@ -23,6 +22,19 @@ internal static class Telemetry
23
22
if ( ! NatsActivities . HasListeners ( ) )
24
23
return null ;
25
24
25
+ var instrumentationContext = new NatsInstrumentationContext (
26
+ Subject : subject ,
27
+ Headers : null ,
28
+ ReplyTo : replyTo ,
29
+ QueueGroup : null ,
30
+ BodySize : null ,
31
+ Size : null ,
32
+ Connection : connection ,
33
+ ParentContext : parentContext ) ;
34
+
35
+ if ( NatsInstrumentationOptions . Default . Filter is { } filter && ! filter ( instrumentationContext ) )
36
+ return null ;
37
+
26
38
KeyValuePair < string , object ? > [ ] tags ;
27
39
if ( connection is NatsConnection { ServerInfo : not null } conn )
28
40
{
@@ -63,14 +75,19 @@ internal static class Telemetry
63
75
tags [ 3 ] = new KeyValuePair < string , object ? > ( Constants . ReplyTo , replyTo ) ;
64
76
}
65
77
66
- return NatsActivities . StartActivity (
78
+ var activity = NatsActivities . StartActivity (
67
79
name ,
68
80
kind : ActivityKind . Producer ,
69
81
parentContext : parentContext ?? default ,
70
82
tags : tags ) ;
83
+
84
+ if ( activity is not null )
85
+ NatsInstrumentationOptions . Default . Enrich ? . Invoke ( activity , instrumentationContext ) ;
86
+
87
+ return activity ;
71
88
}
72
89
73
- internal static void AddTraceContextHeaders ( Activity ? activity , ref NatsHeaders ? headers )
90
+ public static void AddTraceContextHeaders ( Activity ? activity , ref NatsHeaders ? headers )
74
91
{
75
92
if ( activity is null )
76
93
return ;
@@ -87,15 +104,15 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
87
104
return ;
88
105
}
89
106
90
- // There are cases where headers reused internally (e.g. JetStream publish retry)
107
+ // There are cases where headers reused publicly (e.g. JetStream publish retry)
91
108
// there may also be cases where application can reuse the same header
92
109
// in which case we should still be able to overwrite headers with telemetry fields
93
110
// even though headers would be set to readonly before being passed down in publish methods.
94
111
headers . SetOverrideReadOnly ( fieldName , fieldValue ) ;
95
112
} ) ;
96
113
}
97
114
98
- internal static Activity ? StartReceiveActivity (
115
+ public static Activity ? StartReceiveActivity (
99
116
INatsConnection ? connection ,
100
117
string name ,
101
118
string subscriptionSubject ,
@@ -109,6 +126,22 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
109
126
if ( ! NatsActivities . HasListeners ( ) )
110
127
return null ;
111
128
129
+ if ( headers is null || ! TryParseTraceContext ( headers , out var context ) )
130
+ context = default ;
131
+
132
+ var instrumentationContext = new NatsInstrumentationContext (
133
+ Subject : subject ,
134
+ Headers : headers ,
135
+ ReplyTo : replyTo ,
136
+ QueueGroup : queueGroup ,
137
+ BodySize : bodySize ,
138
+ Size : size ,
139
+ Connection : connection ,
140
+ ParentContext : context ) ;
141
+
142
+ if ( NatsInstrumentationOptions . Default . Filter is { } filter && ! filter ( instrumentationContext ) )
143
+ return null ;
144
+
112
145
KeyValuePair < string , object ? > [ ] tags ;
113
146
if ( connection is NatsConnection { ServerInfo : not null } conn )
114
147
{
@@ -162,17 +195,19 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
162
195
tags [ 9 ] = new KeyValuePair < string , object ? > ( Constants . ReplyTo , replyTo ) ;
163
196
}
164
197
165
- if ( headers is null || ! TryParseTraceContext ( headers , out var context ) )
166
- context = default ;
167
-
168
- return NatsActivities . StartActivity (
198
+ var activity = NatsActivities . StartActivity (
169
199
name ,
170
200
kind : ActivityKind . Consumer ,
171
201
parentContext : context ,
172
202
tags : tags ) ;
203
+
204
+ if ( activity is not null )
205
+ NatsInstrumentationOptions . Default . Enrich ? . Invoke ( activity , instrumentationContext ) ;
206
+
207
+ return activity ;
173
208
}
174
209
175
- internal static void SetException ( Activity ? activity , Exception exception )
210
+ public static void SetException ( Activity ? activity , Exception exception )
176
211
{
177
212
if ( activity is null )
178
213
return ;
@@ -251,11 +286,14 @@ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContex
251
286
} ,
252
287
out var traceParent ,
253
288
out var traceState ) ;
254
-
289
+ #if NETSTANDARD2_0_OR_GREATER || NET7_0_OR_GREATER
290
+ return ActivityContext . TryParse ( traceParent , traceState , isRemote : true , out context ) ;
291
+ #else
255
292
return ActivityContext . TryParse ( traceParent , traceState , out context ) ;
293
+ #endif
256
294
}
257
295
258
- internal class Constants
296
+ public class Constants
259
297
{
260
298
public const string True = "true" ;
261
299
public const string False = "false" ;
0 commit comments