Skip to content

Commit 487dffb

Browse files
committed
review & fixes
1 parent 2f78f45 commit 487dffb

File tree

45 files changed

+1886
-2609
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1886
-2609
lines changed

docs/supported-libraries.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ These are the supported libraries and frameworks:
105105
| [Micrometer](https://micrometer.io/) | 1.5+ (disabled by default) | [opentelemetry-micrometer-1.5](../instrumentation/micrometer/micrometer-1.5/library) | none |
106106
| [MongoDB Driver](https://mongodb.github.io/mongo-java-driver/) | 3.1+ | [opentelemetry-mongo-3.1](../instrumentation/mongo/mongo-3.1/library) | [Database Client Spans], [Database Client Metrics] [6] |
107107
| [MyBatis](https://mybatis.org/mybatis-3/) | 3.2+ | N/A | none |
108+
| [NATS](https://github.com/nats-io/nats.java) | 3.8+ | [nats-2.21](../instrumentation/nats/nats-2.21/library) | [Messaging Spans] |
108109
| [Netty HTTP codec [5]](https://github.com/netty/netty) | 3.8+ | [opentelemetry-netty-4.1](../instrumentation/netty/netty-4.1/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
109110
| [OpenSearch Rest Client](https://github.com/opensearch-project/opensearch-java) | 1.0+ | | [Database Client Spans], [Database Client Metrics] [6] |
110111
| [OkHttp](https://github.com/square/okhttp/) | 2.2+ | [opentelemetry-okhttp-3.0](../instrumentation/okhttp/okhttp-3.0/library) | [HTTP Client Spans], [HTTP Client Metrics] |
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Auth-instrumentation for NATS version 2.21
2+
3+
Provides OpenTelemetry auto-instrumentation for [NATS 2.21](https://github.com/nats-io/nats.java).
4+
5+
### Trace propagation
6+
7+
It's recommended to provide `Message` with a writable `Header` structure
8+
to allow propagation between publishers and subscribers. Without headers,
9+
the tracing context will not be propagated in the headers.
10+
11+
```java
12+
import io.nats.client.impl.Headers;
13+
import io.nats.client.impl.NatsMessage;
14+
15+
// don't
16+
Message msg = NatsMessage.builder().subject("sub").build();
17+
18+
// do
19+
Message msg = NatsMessage.builder().subject("sub").headers(new Headers()).build();
20+
```

instrumentation/nats/nats-2.21/javaagent/build.gradle.kts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,33 @@ muzzle {
77
group.set("io.nats")
88
module.set("jnats")
99
versions.set("[2.21.0,)")
10+
assertInverse.set(true)
1011
}
1112
}
1213

1314
dependencies {
1415
library("io.nats:jnats:2.21.0")
1516

1617
implementation(project(":instrumentation:nats:nats-2.21:library"))
18+
testImplementation(project(":instrumentation:nats:nats-2.21:testing"))
1719
}
1820

1921
tasks {
22+
val testMessagingReceive by registering(Test::class) {
23+
filter {
24+
includeTestsMatching("NatsInstrumentationMessagingReceiveTest")
25+
}
26+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
27+
}
28+
2029
test {
2130
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
31+
filter {
32+
excludeTestsMatching("NatsInstrumentationMessagingReceiveTest")
33+
}
34+
}
35+
36+
check {
37+
dependsOn(testMessagingReceive)
2238
}
2339
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionRequestInstrumentation.java

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
77

88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9-
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.CLIENT_INSTRUMENTER;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.PRODUCER_INSTRUMENTER;
1010
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1111
import static net.bytebuddy.matcher.ElementMatchers.named;
1212
import static net.bytebuddy.matcher.ElementMatchers.returns;
@@ -21,7 +21,6 @@
2121
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
2222
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2323
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
24-
import io.opentelemetry.javaagent.instrumentation.nats.v2_21.internal.MessageConsumer;
2524
import java.time.Duration;
2625
import java.util.concurrent.CompletableFuture;
2726
import java.util.concurrent.TimeoutException;
@@ -133,11 +132,11 @@ public static void onEnter(
133132
natsRequest = NatsRequest.create(connection, null, subject, null, body);
134133
Context parentContext = Context.current();
135134

136-
if (!CLIENT_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
135+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
137136
return;
138137
}
139138

140-
otelContext = CLIENT_INSTRUMENTER.start(parentContext, natsRequest);
139+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
141140
otelScope = otelContext.makeCurrent();
142141
}
143142

@@ -155,14 +154,14 @@ public static void onExit(
155154

156155
Throwable error = throwable;
157156
NatsRequest natsResponse = null;
158-
if (message == null) {
157+
if (error == null && message == null) {
159158
error = new TimeoutException("Timed out waiting for message");
160159
} else {
161160
natsResponse = NatsRequest.create(connection, message);
162161
}
163162

164163
otelScope.close();
165-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, error);
164+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, error);
166165
}
167166
}
168167

@@ -181,11 +180,11 @@ public static void onEnter(
181180
natsRequest = NatsRequest.create(connection, null, subject, headers, body);
182181
Context parentContext = Context.current();
183182

184-
if (!CLIENT_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
183+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
185184
return;
186185
}
187186

188-
otelContext = CLIENT_INSTRUMENTER.start(parentContext, natsRequest);
187+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
189188
otelScope = otelContext.makeCurrent();
190189
}
191190

@@ -203,14 +202,14 @@ public static void onExit(
203202

204203
Throwable error = throwable;
205204
NatsRequest natsResponse = null;
206-
if (message == null) {
205+
if (error == null && message == null) {
207206
error = new TimeoutException("Timed out waiting for message");
208207
} else {
209208
natsResponse = NatsRequest.create(connection, message);
210209
}
211210

212211
otelScope.close();
213-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, error);
212+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, error);
214213
}
215214
}
216215

@@ -227,11 +226,11 @@ public static void onEnter(
227226
natsRequest = NatsRequest.create(connection, message);
228227
Context parentContext = Context.current();
229228

230-
if (!CLIENT_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
229+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
231230
return;
232231
}
233232

234-
otelContext = CLIENT_INSTRUMENTER.start(parentContext, natsRequest);
233+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
235234
otelScope = otelContext.makeCurrent();
236235
}
237236

@@ -249,14 +248,14 @@ public static void onExit(
249248

250249
Throwable error = throwable;
251250
NatsRequest natsResponse = null;
252-
if (message == null) {
251+
if (error == null && message == null) {
253252
error = new TimeoutException("Timed out waiting for message");
254253
} else {
255254
natsResponse = NatsRequest.create(connection, message);
256255
}
257256

258257
otelScope.close();
259-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, error);
258+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, error);
260259
}
261260
}
262261

@@ -275,11 +274,11 @@ public static void onEnter(
275274
natsRequest = NatsRequest.create(connection, null, subject, null, body);
276275
otelParentContext = Context.current();
277276

278-
if (!CLIENT_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
277+
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
279278
return;
280279
}
281280

282-
otelContext = CLIENT_INSTRUMENTER.start(otelParentContext, natsRequest);
281+
otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest);
283282
otelScope = otelContext.makeCurrent();
284283
}
285284

@@ -298,10 +297,11 @@ public static void onExit(
298297

299298
otelScope.close();
300299
if (throwable != null) {
301-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
300+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
302301
} else {
303-
messageFuture.whenComplete(
304-
new MessageConsumer(CLIENT_INSTRUMENTER, otelContext, connection, natsRequest));
302+
messageFuture =
303+
messageFuture.whenComplete(
304+
new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest));
305305
messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext);
306306
}
307307
}
@@ -323,11 +323,11 @@ public static void onEnter(
323323
natsRequest = NatsRequest.create(connection, null, subject, headers, body);
324324
otelParentContext = Context.current();
325325

326-
if (!CLIENT_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
326+
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
327327
return;
328328
}
329329

330-
otelContext = CLIENT_INSTRUMENTER.start(otelParentContext, natsRequest);
330+
otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest);
331331
otelScope = otelContext.makeCurrent();
332332
}
333333

@@ -346,10 +346,11 @@ public static void onExit(
346346

347347
otelScope.close();
348348
if (throwable != null) {
349-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
349+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
350350
} else {
351-
messageFuture.whenComplete(
352-
new MessageConsumer(CLIENT_INSTRUMENTER, otelContext, connection, natsRequest));
351+
messageFuture =
352+
messageFuture.whenComplete(
353+
new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest));
353354
messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext);
354355
}
355356
}
@@ -369,11 +370,11 @@ public static void onEnter(
369370
natsRequest = NatsRequest.create(connection, message);
370371
otelParentContext = Context.current();
371372

372-
if (!CLIENT_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
373+
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
373374
return;
374375
}
375376

376-
otelContext = CLIENT_INSTRUMENTER.start(otelParentContext, natsRequest);
377+
otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest);
377378
otelScope = otelContext.makeCurrent();
378379
}
379380

@@ -392,10 +393,11 @@ public static void onExit(
392393

393394
otelScope.close();
394395
if (throwable != null) {
395-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
396+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
396397
} else {
397-
messageFuture.whenComplete(
398-
new MessageConsumer(CLIENT_INSTRUMENTER, otelContext, connection, natsRequest));
398+
messageFuture =
399+
messageFuture.whenComplete(
400+
new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest));
399401
messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext);
400402
}
401403
}
@@ -416,11 +418,11 @@ public static void onEnter(
416418
natsRequest = NatsRequest.create(connection, null, subject, null, body);
417419
otelParentContext = Context.current();
418420

419-
if (!CLIENT_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
421+
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
420422
return;
421423
}
422424

423-
otelContext = CLIENT_INSTRUMENTER.start(otelParentContext, natsRequest);
425+
otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest);
424426
otelScope = otelContext.makeCurrent();
425427
}
426428

@@ -439,10 +441,11 @@ public static void onExit(
439441

440442
otelScope.close();
441443
if (throwable != null) {
442-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
444+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
443445
} else {
444-
messageFuture.whenComplete(
445-
new MessageConsumer(CLIENT_INSTRUMENTER, otelContext, connection, natsRequest));
446+
messageFuture =
447+
messageFuture.whenComplete(
448+
new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest));
446449
messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext);
447450
}
448451
}
@@ -464,11 +467,11 @@ public static void onEnter(
464467
natsRequest = NatsRequest.create(connection, null, subject, headers, body);
465468
otelParentContext = Context.current();
466469

467-
if (!CLIENT_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
470+
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
468471
return;
469472
}
470473

471-
otelContext = CLIENT_INSTRUMENTER.start(otelParentContext, natsRequest);
474+
otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest);
472475
otelScope = otelContext.makeCurrent();
473476
}
474477

@@ -487,10 +490,11 @@ public static void onExit(
487490

488491
otelScope.close();
489492
if (throwable != null) {
490-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
493+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
491494
} else {
492-
messageFuture.whenComplete(
493-
new MessageConsumer(CLIENT_INSTRUMENTER, otelContext, connection, natsRequest));
495+
messageFuture =
496+
messageFuture.whenComplete(
497+
new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest));
494498
messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext);
495499
}
496500
}
@@ -510,11 +514,11 @@ public static void onEnter(
510514
natsRequest = NatsRequest.create(connection, message);
511515
otelParentContext = Context.current();
512516

513-
if (!CLIENT_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
517+
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
514518
return;
515519
}
516520

517-
otelContext = CLIENT_INSTRUMENTER.start(otelParentContext, natsRequest);
521+
otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest);
518522
otelScope = otelContext.makeCurrent();
519523
}
520524

@@ -533,10 +537,11 @@ public static void onExit(
533537

534538
otelScope.close();
535539
if (throwable != null) {
536-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
540+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
537541
} else {
538-
messageFuture.whenComplete(
539-
new MessageConsumer(CLIENT_INSTRUMENTER, otelContext, connection, natsRequest));
542+
messageFuture =
543+
messageFuture.whenComplete(
544+
new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest));
540545
messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext);
541546
}
542547
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionSubscribeInstrumentation.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

0 commit comments

Comments
 (0)