Skip to content

Commit 9dce150

Browse files
vasanttejaotelbot[bot]laurit
authored
Add initial instrumentation of kafka connect SinkTask (#14478)
Co-authored-by: otelbot <[email protected]> Co-authored-by: Lauri Tulmin <[email protected]>
1 parent 1110550 commit 9dce150

File tree

18 files changed

+1690
-1
lines changed

18 files changed

+1690
-1
lines changed

.fossa.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,9 @@ targets:
619619
- type: gradle
620620
path: ./
621621
target: ':instrumentation:jsf:jsf-myfaces-3.0:javaagent'
622+
- type: gradle
623+
path: ./
624+
target: ':instrumentation:kafka:kafka-connect-2.6:javaagent'
622625
- type: gradle
623626
path: ./
624627
target: ':instrumentation:kafka:kafka-streams-0.11:javaagent'

docs/supported-libraries.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ These are the supported libraries and frameworks:
3232
| [Apache HttpAsyncClient](https://hc.apache.org/index.html) | 4.1+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
3333
| [Apache HttpClient](https://hc.apache.org/index.html) | 2.0+ | [opentelemetry-apache-httpclient-4.3](../instrumentation/apache-httpclient/apache-httpclient-4.3/library),<br>[opentelemetry-apache-httpclient-5.2](../instrumentation/apache-httpclient/apache-httpclient-5.2/library) | [HTTP Client Spans], [HTTP Client Metrics] |
3434
| [Apache ShenYu](https://shenyu.apache.org/) | 2.4+ | N/A | Provides `http.route` [2] |
35+
| [Apache Kafka Connect API](https://kafka.apache.org/documentation/#connect) | 2.6+ | N/A | [Messaging Spans] |
3536
| [Apache Kafka Producer/Consumer API](https://kafka.apache.org/documentation/#producerapi) | 0.11+ | [opentelemetry-kafka-clients-2.6](../instrumentation/kafka/kafka-clients/kafka-clients-2.6/library) | [Messaging Spans] |
3637
| [Apache Kafka Streams API](https://kafka.apache.org/documentation/streams/) | 0.11+ | N/A | [Messaging Spans] |
3738
| [Apache MyFaces](https://myfaces.apache.org/) | 1.2+ (not including 4.0+ yet) | N/A | Provides `http.route` [2], Controller Spans [3] |
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("org.apache.kafka")
8+
module.set("connect-api")
9+
versions.set("[2.6.0,)")
10+
assertInverse.set(true)
11+
}
12+
}
13+
14+
dependencies {
15+
bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
16+
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library"))
17+
library("org.apache.kafka:connect-api:2.6.0")
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA;
9+
10+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.List;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.StreamSupport;
15+
import javax.annotation.Nullable;
16+
import org.apache.kafka.connect.header.Header;
17+
18+
enum KafkaConnectAttributesGetter implements MessagingAttributesGetter<KafkaConnectTask, Void> {
19+
INSTANCE;
20+
21+
@Override
22+
public String getSystem(KafkaConnectTask request) {
23+
return KAFKA;
24+
}
25+
26+
@Override
27+
@Nullable
28+
public String getDestination(KafkaConnectTask request) {
29+
return request.getDestinationName();
30+
}
31+
32+
@Nullable
33+
@Override
34+
public String getDestinationTemplate(KafkaConnectTask request) {
35+
return null;
36+
}
37+
38+
@Override
39+
public boolean isTemporaryDestination(KafkaConnectTask request) {
40+
return false;
41+
}
42+
43+
@Override
44+
public boolean isAnonymousDestination(KafkaConnectTask request) {
45+
return false;
46+
}
47+
48+
@Override
49+
@Nullable
50+
public String getConversationId(KafkaConnectTask request) {
51+
return null;
52+
}
53+
54+
@Nullable
55+
@Override
56+
public Long getMessageBodySize(KafkaConnectTask request) {
57+
return null;
58+
}
59+
60+
@Nullable
61+
@Override
62+
public Long getMessageEnvelopeSize(KafkaConnectTask request) {
63+
return null;
64+
}
65+
66+
@Override
67+
@Nullable
68+
public String getMessageId(KafkaConnectTask request, @Nullable Void unused) {
69+
return null;
70+
}
71+
72+
@Nullable
73+
@Override
74+
public String getClientId(KafkaConnectTask request) {
75+
return null;
76+
}
77+
78+
@Override
79+
public Long getBatchMessageCount(KafkaConnectTask request, @Nullable Void unused) {
80+
return (long) request.getRecords().size();
81+
}
82+
83+
@Override
84+
public List<String> getMessageHeader(KafkaConnectTask request, String name) {
85+
return request.getRecords().stream()
86+
.filter(record -> record.headers() != null)
87+
.flatMap(record -> StreamSupport.stream(record.headers().spliterator(), false))
88+
.filter(header -> name.equals(header.key()) && header.value() != null)
89+
.map(header -> convertHeaderValue(header))
90+
.collect(Collectors.toList());
91+
}
92+
93+
private static String convertHeaderValue(Header header) {
94+
Object value = header.value();
95+
if (value instanceof byte[]) {
96+
return new String((byte[]) value, StandardCharsets.UTF_8);
97+
}
98+
return value.toString();
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.propagation.TextMapPropagator;
10+
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
11+
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
12+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
13+
import org.apache.kafka.connect.sink.SinkRecord;
14+
15+
final class KafkaConnectBatchProcessSpanLinksExtractor
16+
implements SpanLinksExtractor<KafkaConnectTask> {
17+
18+
private final SpanLinksExtractor<SinkRecord> singleRecordLinkExtractor;
19+
20+
KafkaConnectBatchProcessSpanLinksExtractor(TextMapPropagator propagator) {
21+
this.singleRecordLinkExtractor =
22+
new PropagatorBasedSpanLinksExtractor<>(propagator, SinkRecordHeadersGetter.INSTANCE);
23+
}
24+
25+
@Override
26+
public void extract(SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask request) {
27+
for (SinkRecord record : request.getRecords()) {
28+
singleRecordLinkExtractor.extract(spanLinks, parentContext, record);
29+
}
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9+
import static java.util.Arrays.asList;
10+
11+
import com.google.auto.service.AutoService;
12+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import java.util.List;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
17+
@AutoService(InstrumentationModule.class)
18+
public class KafkaConnectInstrumentationModule extends InstrumentationModule {
19+
20+
public KafkaConnectInstrumentationModule() {
21+
super("kafka-connect", "kafka-connect-2.6");
22+
}
23+
24+
@Override
25+
public List<TypeInstrumentation> typeInstrumentations() {
26+
return asList(new SinkTaskInstrumentation(), new WorkerSinkTaskInstrumentation());
27+
}
28+
29+
@Override
30+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
31+
// class added in 2.6.0
32+
return hasClassesNamed("org.apache.kafka.connect.sink.SinkConnectorContext");
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.context.propagation.TextMapPropagator;
10+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
12+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
13+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
15+
16+
public final class KafkaConnectSingletons {
17+
18+
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-connect-2.6";
19+
private static final TextMapPropagator PROPAGATOR =
20+
GlobalOpenTelemetry.get().getPropagators().getTextMapPropagator();
21+
22+
private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER;
23+
24+
static {
25+
KafkaConnectBatchProcessSpanLinksExtractor spanLinksExtractor =
26+
new KafkaConnectBatchProcessSpanLinksExtractor(PROPAGATOR);
27+
28+
INSTRUMENTER =
29+
Instrumenter.<KafkaConnectTask, Void>builder(
30+
GlobalOpenTelemetry.get(),
31+
INSTRUMENTATION_NAME,
32+
MessagingSpanNameExtractor.create(
33+
KafkaConnectAttributesGetter.INSTANCE, MessageOperation.PROCESS))
34+
.addAttributesExtractor(
35+
MessagingAttributesExtractor.builder(
36+
KafkaConnectAttributesGetter.INSTANCE, MessageOperation.PROCESS)
37+
.build())
38+
.addSpanLinksExtractor(spanLinksExtractor)
39+
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
40+
}
41+
42+
public static Instrumenter<KafkaConnectTask, Void> instrumenter() {
43+
return INSTRUMENTER;
44+
}
45+
46+
private KafkaConnectSingletons() {}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import java.util.Collection;
9+
import java.util.LinkedHashSet;
10+
import java.util.Set;
11+
import java.util.stream.Collectors;
12+
import org.apache.kafka.connect.sink.SinkRecord;
13+
14+
public final class KafkaConnectTask {
15+
16+
private final Collection<SinkRecord> records;
17+
18+
public KafkaConnectTask(Collection<SinkRecord> records) {
19+
this.records = records;
20+
}
21+
22+
public Collection<SinkRecord> getRecords() {
23+
return records;
24+
}
25+
26+
private Set<String> getTopics() {
27+
return records.stream()
28+
.map(SinkRecord::topic)
29+
.collect(Collectors.toCollection(LinkedHashSet::new));
30+
}
31+
32+
public String getDestinationName() {
33+
Set<String> topics = getTopics();
34+
if (topics.isEmpty()) {
35+
return null;
36+
}
37+
// Return the topic name only if all records are from the same topic.
38+
// When records are from multiple topics, return null as there is no standard way
39+
// to represent multiple destination names in messaging.destination.name attribute.
40+
if (topics.size() == 1) {
41+
return topics.iterator().next();
42+
}
43+
return null;
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;
7+
8+
import static java.util.Collections.emptyList;
9+
import static java.util.stream.Collectors.toList;
10+
11+
import io.opentelemetry.context.propagation.TextMapGetter;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.stream.StreamSupport;
14+
import javax.annotation.Nullable;
15+
import org.apache.kafka.connect.header.Header;
16+
import org.apache.kafka.connect.sink.SinkRecord;
17+
18+
enum SinkRecordHeadersGetter implements TextMapGetter<SinkRecord> {
19+
INSTANCE;
20+
21+
@Override
22+
public Iterable<String> keys(SinkRecord record) {
23+
if (record.headers() == null) {
24+
return emptyList();
25+
}
26+
27+
return StreamSupport.stream(record.headers().spliterator(), false)
28+
.map(Header::key)
29+
.collect(toList());
30+
}
31+
32+
@Override
33+
@Nullable
34+
public String get(@Nullable SinkRecord record, String key) {
35+
if (record == null || record.headers() == null) {
36+
return null;
37+
}
38+
39+
Header header = record.headers().lastWithName(key);
40+
if (header == null || header.value() == null) {
41+
return null;
42+
}
43+
44+
Object value = header.value();
45+
if (value instanceof byte[]) {
46+
return new String((byte[]) value, StandardCharsets.UTF_8);
47+
}
48+
return value.toString();
49+
}
50+
}

0 commit comments

Comments
 (0)