Skip to content

Commit b7a706f

Browse files
AlixBaotelbot[bot]lauritjaydeluca
authored andcommitted
Add NATS instrumentation (open-telemetry#13999)
Co-authored-by: otelbot <[email protected]> Co-authored-by: Lauri Tulmin <[email protected]> Co-authored-by: Jay DeLuca <[email protected]>
1 parent 1be4d49 commit b7a706f

File tree

46 files changed

+2950
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2950
-0
lines changed

.fossa.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,12 @@ targets:
709709
- type: gradle
710710
path: ./
711711
target: ':instrumentation:mongo:mongo-async-3.3:javaagent'
712+
- type: gradle
713+
path: ./
714+
target: ':instrumentation:nats:nats-2.17:javaagent'
715+
- type: gradle
716+
path: ./
717+
target: ':instrumentation:nats:nats-2.17:library'
712718
- type: gradle
713719
path: ./
714720
target: ':instrumentation:netty:netty-3.8:javaagent'

.github/scripts/check-package-names.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ for dir in $(find instrumentation -name "*.java" | grep library/src/main/java |
2929
if [[ "$dir" == "instrumentation/lettuce/lettuce-5.1/library/src/main/java/io/lettuce/core/protocol" ]]; then
3030
continue
3131
fi
32+
if [[ "$dir" == "instrumentation/nats/nats-2.17/library/src/main/java/io/nats/client/impl" ]]; then
33+
continue
34+
fi
3235

3336
# some common modules don't have any base version
3437
# - lettuce-common

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ out/
4545
######################
4646
.vscode
4747
**/bin/
48+
.metals
4849

4950
# Others #
5051
##########

docs/instrumentation-list.yaml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5274,6 +5274,59 @@ libraries:
52745274
target_versions:
52755275
javaagent:
52765276
- org.mybatis:mybatis:[3.2.0,)
5277+
nats:
5278+
- name: nats-2.17
5279+
description: This instrumentation provides messaging spans for NATS
5280+
disabled_by_default: false
5281+
source_path: instrumentation/nats/nats-2.17
5282+
scope:
5283+
name: io.opentelemetry.nats-2.17
5284+
target_versions:
5285+
javaagent:
5286+
- io.nats:jnats:[2.17.7,)
5287+
library:
5288+
- io.nats:jnats:2.17.7
5289+
configurations:
5290+
- name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled
5291+
description: |
5292+
Enables experimental receive telemetry, which will cause consumers to start a new trace, with only a span link connecting it to the producer trace.
5293+
type: boolean
5294+
default: false
5295+
- name: otel.instrumentation.messaging.experimental.capture-headers
5296+
description: Allows configuring headers to capture as span attributes.
5297+
type: list
5298+
default: ''
5299+
telemetry:
5300+
- when: default
5301+
spans:
5302+
- span_kind: CONSUMER
5303+
attributes:
5304+
- name: messaging.client_id
5305+
type: STRING
5306+
- name: messaging.destination.name
5307+
type: STRING
5308+
- name: messaging.header.captured_header
5309+
type: STRING_ARRAY
5310+
- name: messaging.message.body.size
5311+
type: LONG
5312+
- name: messaging.operation
5313+
type: STRING
5314+
- name: messaging.system
5315+
type: STRING
5316+
- span_kind: PRODUCER
5317+
attributes:
5318+
- name: messaging.client_id
5319+
type: STRING
5320+
- name: messaging.destination.name
5321+
type: STRING
5322+
- name: messaging.header.captured_header
5323+
type: STRING_ARRAY
5324+
- name: messaging.message.body.size
5325+
type: LONG
5326+
- name: messaging.operation
5327+
type: STRING
5328+
- name: messaging.system
5329+
type: STRING
52775330
netty:
52785331
- name: netty-3.8
52795332
source_path: instrumentation/netty/netty-3.8

docs/supported-libraries.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ These are the supported libraries and frameworks:
107107
| [Micrometer](https://micrometer.io/) | 1.5+ (disabled by default) | [opentelemetry-micrometer-1.5](../instrumentation/micrometer/micrometer-1.5/library) | none |
108108
| [MongoDB Driver](https://mongodb.github.io/mongo-java-driver/) | 3.1+ | [opentelemetry-mongo-3.1](../instrumentation/mongo/mongo-3.1/library) | [Database Client Spans], [Database Client Metrics]&nbsp;[6] |
109109
| [MyBatis](https://mybatis.org/mybatis-3/) | 3.2+ | N/A | none |
110+
| [NATS Client](https://github.com/nats-io/nats.java) | 2.17.2+ | [nats-2.17](../instrumentation/nats/nats-2.17/library) | [Messaging Spans] |
110111
| [Netty HTTP codec [5]](https://github.com/netty/netty) | 3.8+ | [opentelemetry-netty-4.1](../instrumentation/netty/netty-4.1/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
111112
| [OpenAI Java SDK](https://github.com/openai/openai-java) | 1.1+ | [openai-java-1.1](../instrumentation/openai/openai-java-1.1/library) | [GenAI Client Spans], [GenAI Client Metrics] |
112113
| [OpenSearch Rest Client](https://github.com/opensearch-project/opensearch-java) | 1.0+ | | [Database Client Spans], [Database Client Metrics]&nbsp;[6] |

instrumentation-docs/instrumentations.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ readonly INSTRUMENTATIONS=(
121121
"jetty-httpclient:jetty-httpclient-12.0:javaagent:test"
122122
"jetty-httpclient:jetty-httpclient-9.2:javaagent:test"
123123
"jodd-http-4.2:javaagent:test"
124+
"nats:nats-2.17:javaagent:test"
125+
"nats:nats-2.17:javaagent:testExperimental"
124126
"netty:netty-3.8:javaagent:test"
125127
"netty:netty-4.0:javaagent:test"
126128
"netty:netty-4.1:javaagent:test"
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("io.nats")
8+
module.set("jnats")
9+
versions.set("[2.17.2,)")
10+
11+
// Could not find io.nats:nats-parent:1.0-SNAPSHOT
12+
skip("0.5.0", "0.5.1")
13+
14+
assertInverse.set(true)
15+
}
16+
}
17+
18+
dependencies {
19+
library("io.nats:jnats:2.17.2")
20+
21+
implementation(project(":instrumentation:nats:nats-2.17:library"))
22+
testImplementation(project(":instrumentation:nats:nats-2.17:testing"))
23+
}
24+
25+
tasks {
26+
withType<Test>().configureEach {
27+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
28+
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
29+
}
30+
31+
val testExperimental by registering(Test::class) {
32+
testClassesDirs = sourceSets.test.get().output.classesDirs
33+
classpath = sourceSets.test.get().runtimeClasspath
34+
filter {
35+
includeTestsMatching("NatsExperimentalTest")
36+
}
37+
jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=captured-header")
38+
}
39+
40+
test {
41+
filter {
42+
excludeTestsMatching("NatsExperimentalTest")
43+
}
44+
}
45+
46+
check {
47+
dependsOn(testExperimental)
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_17;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.concurrent.CompletableFuture;
11+
12+
public final class CompletableFutureWrapper {
13+
14+
private CompletableFutureWrapper() {}
15+
16+
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
17+
CompletableFuture<T> result = new CompletableFuture<>();
18+
future.whenComplete(
19+
(T value, Throwable throwable) -> {
20+
try (Scope ignored = context.makeCurrent()) {
21+
if (throwable != null) {
22+
result.completeExceptionally(throwable);
23+
} else {
24+
result.complete(value);
25+
}
26+
}
27+
});
28+
29+
return result;
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_17;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_17.NatsSingletons.PRODUCER_INSTRUMENTER;
10+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.nats.client.Connection;
16+
import io.nats.client.Message;
17+
import io.nats.client.impl.Headers;
18+
import io.opentelemetry.context.Context;
19+
import io.opentelemetry.context.Scope;
20+
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsMessageWritableHeaders;
21+
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest;
22+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
23+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
24+
import net.bytebuddy.asm.Advice;
25+
import net.bytebuddy.description.type.TypeDescription;
26+
import net.bytebuddy.matcher.ElementMatcher;
27+
28+
public class ConnectionPublishInstrumentation implements TypeInstrumentation {
29+
30+
@Override
31+
public ElementMatcher<TypeDescription> typeMatcher() {
32+
return implementsInterface(named("io.nats.client.Connection"));
33+
}
34+
35+
@Override
36+
public void transform(TypeTransformer transformer) {
37+
transformer.applyAdviceToMethod(
38+
isPublic()
39+
.and(named("publish"))
40+
.and(takesArguments(2))
41+
.and(takesArgument(0, String.class))
42+
.and(takesArgument(1, byte[].class)),
43+
ConnectionPublishInstrumentation.class.getName() + "$PublishBodyAdvice");
44+
transformer.applyAdviceToMethod(
45+
isPublic()
46+
.and(named("publish"))
47+
.and(takesArguments(3))
48+
.and(takesArgument(0, String.class))
49+
.and(takesArgument(1, named("io.nats.client.impl.Headers")))
50+
.and(takesArgument(2, byte[].class)),
51+
ConnectionPublishInstrumentation.class.getName() + "$PublishHeadersBodyAdvice");
52+
transformer.applyAdviceToMethod(
53+
isPublic()
54+
.and(named("publish"))
55+
.and(takesArguments(3))
56+
.and(takesArgument(0, String.class))
57+
.and(takesArgument(1, String.class))
58+
.and(takesArgument(2, byte[].class)),
59+
ConnectionPublishInstrumentation.class.getName() + "$PublishReplyToBodyAdvice");
60+
transformer.applyAdviceToMethod(
61+
isPublic()
62+
.and(named("publish"))
63+
.and(takesArguments(4))
64+
.and(takesArgument(0, String.class))
65+
.and(takesArgument(1, String.class))
66+
.and(takesArgument(2, named("io.nats.client.impl.Headers")))
67+
.and(takesArgument(3, byte[].class)),
68+
ConnectionPublishInstrumentation.class.getName() + "$PublishReplyToHeadersBodyAdvice");
69+
transformer.applyAdviceToMethod(
70+
isPublic()
71+
.and(named("publish"))
72+
.and(takesArguments(1))
73+
.and(takesArgument(0, named("io.nats.client.Message"))),
74+
ConnectionPublishInstrumentation.class.getName() + "$PublishMessageAdvice");
75+
}
76+
77+
@SuppressWarnings("unused")
78+
public static class PublishBodyAdvice {
79+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
80+
public static boolean onEnter(
81+
@Advice.This Connection connection,
82+
@Advice.Argument(0) String subject,
83+
@Advice.Argument(1) byte[] body) {
84+
// call the instrumented publish method
85+
connection.publish(subject, null, null, body);
86+
return true;
87+
}
88+
}
89+
90+
@SuppressWarnings("unused")
91+
public static class PublishHeadersBodyAdvice {
92+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
93+
public static boolean onEnter(
94+
@Advice.This Connection connection,
95+
@Advice.Argument(0) String subject,
96+
@Advice.Argument(1) Headers headers,
97+
@Advice.Argument(2) byte[] body) {
98+
// call the instrumented publish method
99+
connection.publish(subject, null, headers, body);
100+
return true;
101+
}
102+
}
103+
104+
@SuppressWarnings("unused")
105+
public static class PublishReplyToBodyAdvice {
106+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
107+
public static boolean onEnter(
108+
@Advice.This Connection connection,
109+
@Advice.Argument(0) String subject,
110+
@Advice.Argument(1) String replyTo,
111+
@Advice.Argument(2) byte[] body) {
112+
// call the instrumented publish method
113+
connection.publish(subject, replyTo, null, body);
114+
return true;
115+
}
116+
}
117+
118+
@SuppressWarnings("unused")
119+
public static class PublishReplyToHeadersBodyAdvice {
120+
121+
@Advice.OnMethodEnter(suppress = Throwable.class)
122+
public static void onEnter(
123+
@Advice.This Connection connection,
124+
@Advice.Argument(0) String subject,
125+
@Advice.Argument(1) String replyTo,
126+
@Advice.Argument(value = 2, readOnly = false) Headers headers,
127+
@Advice.Argument(3) byte[] body,
128+
@Advice.Local("otelContext") Context otelContext,
129+
@Advice.Local("otelScope") Scope otelScope,
130+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
131+
headers = NatsMessageWritableHeaders.create(headers);
132+
133+
Context parentContext = Context.current();
134+
natsRequest = NatsRequest.create(connection, subject, replyTo, headers, body);
135+
136+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
137+
return;
138+
}
139+
140+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
141+
otelScope = otelContext.makeCurrent();
142+
}
143+
144+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
145+
public static void onExit(
146+
@Advice.Thrown Throwable throwable,
147+
@Advice.Local("otelContext") Context otelContext,
148+
@Advice.Local("otelScope") Scope otelScope,
149+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
150+
if (otelScope == null) {
151+
return;
152+
}
153+
154+
otelScope.close();
155+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
156+
}
157+
}
158+
159+
@SuppressWarnings("unused")
160+
public static class PublishMessageAdvice {
161+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
162+
public static boolean onEnter(
163+
@Advice.This Connection connection, @Advice.Argument(0) Message message) {
164+
if (message == null) {
165+
return false;
166+
}
167+
168+
// call the instrumented publish method
169+
connection.publish(
170+
message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData());
171+
return true;
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)