Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
abc6bbe
wip: instrumenting kafka connect.
vasantteja Aug 8, 2025
e16a085
wip: modifying the tests and instrumentation
vasantteja Aug 15, 2025
9bc7847
wip: adding a static SinkRecordHeadersGetter in KafkaConnectSingletons.
vasantteja Aug 17, 2025
0efd1ba
polish: removing the unwanted test files.
vasantteja Aug 17, 2025
f8855c0
polish: downgrading to 2.6 kafkaconnect version.
vasantteja Aug 20, 2025
54fabff
wip: spotless apply.
vasantteja Aug 20, 2025
7cd1eef
wip: checkstyle violations fix.
vasantteja Aug 20, 2025
89363d2
wip: fixing checkstyle violations in testing folder files.
vasantteja Aug 20, 2025
27d957d
wip: correcting the build failures and fossa checks.
vasantteja Aug 21, 2025
8f77bc3
polish: removing unwanted dependencies from gradle settings file.
vasantteja Aug 23, 2025
8b89eb2
polish: removing unwanted logging statements, readmes and unwanted co…
vasantteja Aug 26, 2025
d82ed1d
wip: removing unwanted attributes from gradle files.
vasantteja Aug 28, 2025
5addaa1
Merge branch 'main' into kafka-connect
vasantteja Aug 28, 2025
fb014e4
wip: adding a deleted property back.
vasantteja Aug 28, 2025
946fbb9
wip: removing timeout for tests.
vasantteja Aug 28, 2025
9bdb037
wip: refining the span to be consumer instead of internal. Refining t…
vasantteja Sep 3, 2025
6f3ed60
./gradlew spotlessApply
otelbot[bot] Sep 3, 2025
5175997
Merge branch 'main' into kafka-connect
vasantteja Sep 4, 2025
af3d85f
wip: removed the unwanted dependencies and assertions from tests.
vasantteja Sep 5, 2025
1a4ff92
wip: applying the spotless check.
vasantteja Sep 5, 2025
ee88ff0
wip: addressing the feedback and removing unwanted dependencies.
vasantteja Sep 10, 2025
571dba4
./gradlew spotlessApply
otelbot[bot] Sep 10, 2025
77109ac
wip: adding a shadowjar based test similar to smoke tests.
vasantteja Sep 13, 2025
27e1e8a
./gradlew spotlessApply
otelbot[bot] Sep 13, 2025
7d8585f
polish: refining the tests to follow smoke test pattern and improving…
vasantteja Sep 16, 2025
13f8360
polish: adding the removed javaagent.
vasantteja Sep 16, 2025
9a583ea
Merge branch 'main' into kafka-connect
vasantteja Sep 17, 2025
879c1fc
Merge branch 'main' into kafka-connect
vasantteja Sep 20, 2025
34a06ed
Merge branch 'main' into kafka-connect
vasantteja Sep 25, 2025
ddf2694
Merge branch 'main' into kafka-connect
vasantteja Sep 25, 2025
5f53ed3
wip: removing unwanted properties and resolving first batch of pr com…
vasantteja Sep 25, 2025
4a38feb
wip: removing unused properties and resolving pr comments.
vasantteja Sep 25, 2025
df9b9a1
polish: resolving merge conflicts.
vasantteja Sep 25, 2025
3bbaa07
polish: resolving the unwanted messages in merge conflicts.
vasantteja Sep 25, 2025
f9466ed
./gradlew spotlessApply
otelbot[bot] Sep 25, 2025
e4dc787
wip: removing the unwanted variables and functions from test codes.
vasantteja Sep 27, 2025
dedd93d
./gradlew spotlessApply
otelbot[bot] Sep 27, 2025
15518bb
wip: removing the parent child relationship between kafka producer an…
vasantteja Sep 28, 2025
55400e7
Merge remote-tracking branch 'origin/kafka-connect' into kafka-connect
vasantteja Sep 28, 2025
cc9d32e
./gradlew spotlessApply
otelbot[bot] Sep 28, 2025
11b5fa5
Merge branch 'main' into kafka-connect
vasantteja Sep 28, 2025
ffacaf3
wip: converting the tests to use somketestinstrumentation.
vasantteja Sep 28, 2025
adc4bf9
./gradlew spotlessApply
otelbot[bot] Sep 28, 2025
780ba48
wip: test classes refactor to remove redundancy among test classes.
vasantteja Sep 29, 2025
339d99f
./gradlew spotlessApply
otelbot[bot] Sep 29, 2025
20c54bf
wip: reactor to remove comments and unwanted dependencies from gradle…
vasantteja Oct 1, 2025
9c2603c
Merge branch 'kafka-connect' of https://github.com/vasantteja/opentel…
vasantteja Oct 1, 2025
bf61bdf
./gradlew spotlessApply
otelbot[bot] Oct 1, 2025
bb08ab3
Merge branch 'main' into kafka-connect
vasantteja Oct 1, 2025
8c7443f
polish: changing the base class name.
vasantteja Oct 1, 2025
bc1d457
wip: base class.
vasantteja Oct 1, 2025
851ade2
Merge branch 'kafka-connect' of https://github.com/vasantteja/opentel…
vasantteja Oct 1, 2025
8dc904b
Merge branch 'main' into kafka-connect
vasantteja Oct 1, 2025
7f9cc82
Merge branch 'main' into kafka-connect
vasantteja Oct 1, 2025
82b9307
wip: restricting the execution of tests on jdk 11 or higher.
vasantteja Oct 1, 2025
73fc079
Merge branch 'main' into kafka-connect
vasantteja Oct 2, 2025
2f95738
polish: refactor around the span naming for multi topic consumers and…
vasantteja Oct 5, 2025
6ecaf06
./gradlew spotlessApply
otelbot[bot] Oct 5, 2025
69e1ceb
Merge branch 'main' into kafka-connect
vasantteja Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,9 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:jsf:jsf-myfaces-3.0:javaagent'
- type: gradle
path: ./
target: ':instrumentation:kafka:kafka-connect-2.6:javaagent'
- type: gradle
path: ./
target: ':instrumentation:kafka:kafka-streams-0.11:javaagent'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.kafka")
module.set("connect-api")
versions.set("[2.6.0,)")
// we use reflection to access the "pause" and "resume" methods, so we can't reference them
// directly, and so we can't assert that they exist at muzzle-time
skip("org.apache.kafka.connect.sink.SinkTaskContext")
}
}

dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library"))

library("org.apache.kafka:connect-api:2.6.0")

testImplementation("org.apache.kafka:connect-runtime:2.6.0")
}

tasks {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)

// Enable experimental span attributes and receive telemetry for comprehensive testing
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
// Set timeout for integration tests with containers
systemProperty("junit.jupiter.execution.timeout.default", "5m")
}

withType<JavaCompile>().configureEach {
options.compilerArgs.add("-Xlint:-deprecation")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import org.apache.kafka.connect.sink.SinkRecord;

final class KafkaConnectBatchProcessSpanLinksExtractor
implements SpanLinksExtractor<KafkaConnectTask> {

private final SpanLinksExtractor<SinkRecord> singleRecordLinkExtractor;

KafkaConnectBatchProcessSpanLinksExtractor(TextMapPropagator propagator) {
this.singleRecordLinkExtractor =
new PropagatorBasedSpanLinksExtractor<>(propagator, SinkRecordHeadersGetter.INSTANCE);
}

@Override
public void extract(SpanLinksBuilder spanLinks, Context parentContext, KafkaConnectTask task) {

for (SinkRecord record : task.getRecords()) {
singleRecordLinkExtractor.extract(spanLinks, parentContext, record);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KafkaConnectInstrumentationModule extends InstrumentationModule {

public KafkaConnectInstrumentationModule() {
super("kafka-connect", "kafka-connect-2.6");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new SinkTaskInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.kafka.connect.sink.SinkRecord;

public final class KafkaConnectSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-connect-2.6";
private static final TextMapPropagator PROPAGATOR =
GlobalOpenTelemetry.get().getPropagators().getTextMapPropagator();

private static final TextMapGetter<SinkRecord> SINK_RECORD_HEADER_GETTER =
SinkRecordHeadersGetter.INSTANCE;

private static final Instrumenter<KafkaConnectTask, Void> INSTRUMENTER =
Instrumenter.<KafkaConnectTask, Void>builder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, KafkaConnectTask::getSpanName)
.addSpanLinksExtractor(new KafkaConnectBatchProcessSpanLinksExtractor(PROPAGATOR))
.buildInstrumenter();

public static Instrumenter<KafkaConnectTask, Void> instrumenter() {
return INSTRUMENTER;
}

public static TextMapPropagator propagator() {
return PROPAGATOR;
}

public static TextMapGetter<SinkRecord> sinkRecordHeaderGetter() {
return SINK_RECORD_HEADER_GETTER;
}

private KafkaConnectSingletons() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;

import java.util.Collection;
import org.apache.kafka.connect.sink.SinkRecord;

public final class KafkaConnectTask {

private final Collection<SinkRecord> records;

public KafkaConnectTask(Collection<SinkRecord> records) {
this.records = records;
}

public Collection<SinkRecord> getRecords() {
return records;
}

public static String getSpanName(KafkaConnectTask task) {
return "KafkaConnect.put";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;

import io.opentelemetry.context.propagation.TextMapGetter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;

enum SinkRecordHeadersGetter implements TextMapGetter<SinkRecord> {
INSTANCE;

@Override
public Iterable<String> keys(SinkRecord record) {
Headers headers = record.headers();
List<String> keys = new ArrayList<>();
for (Header header : headers) {
keys.add(header.key());
}
return keys;
}

@Nullable
@Override
public String get(@Nullable SinkRecord record, String key) {
if (record == null) {
return null;
}
Headers headers = record.headers();
Header header = headers.lastWithName(key);
if (header == null) {
return null;
}
Object value = header.value();
if (value instanceof byte[]) {
return new String((byte[]) value, StandardCharsets.UTF_8);
}
return value.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6;

import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Collection;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.connect.sink.SinkRecord;

public class SinkTaskInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return hasSuperType(named("org.apache.kafka.connect.sink.SinkTask"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("put").and(takesArgument(0, Collection.class)).and(isPublic()),
SinkTaskInstrumentation.class.getName() + "$SinkTaskPutAdvice");
}

@SuppressWarnings("unused")
public static class SinkTaskPutAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Collection<SinkRecord> records,
@Advice.Local("otelTask") KafkaConnectTask task,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Context parentContext = Java8BytecodeBridge.currentContext();

// Extract context from first record if available
if (!records.isEmpty()) {
SinkRecord firstRecord = records.iterator().next();
Context extractedContext =
KafkaConnectSingletons.propagator()
.extract(
parentContext, firstRecord, KafkaConnectSingletons.sinkRecordHeaderGetter());
parentContext = extractedContext;
}

task = new KafkaConnectTask(records);
if (!KafkaConnectSingletons.instrumenter().shouldStart(parentContext, task)) {
return;
}

context = KafkaConnectSingletons.instrumenter().start(parentContext, task);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelTask") KafkaConnectTask task,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

if (scope == null) {
return;
}
scope.close();
KafkaConnectSingletons.instrumenter().end(context, task, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
plugins {
id("otel.java-conventions")
id("otel.javaagent-testing")
}

dependencies {
api(project(":testing-common"))

implementation("org.apache.kafka:kafka-clients:0.11.0.0")

implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common-0.11:library"))
implementation(project(":instrumentation:kafka:kafka-connect-2.6:javaagent"))
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))

testImplementation("org.apache.kafka:connect-runtime:3.6.1")
testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:testing"))

implementation("org.testcontainers:postgresql:1.21.3") // For PostgreSQLContainer
testImplementation("org.postgresql:postgresql:42.7.2") // PostgreSQL JDBC driver
implementation("org.testcontainers:mongodb:1.21.3") // For MongoDBContainer
testImplementation("org.mongodb:mongodb-driver-sync:4.11.0") // MongoDB Java driver
testImplementation("org.apache.httpcomponents:httpclient") // For HttpStatus (not httpcore)

testImplementation("org.mockito:mockito-junit-jupiter:4.11.0")

// Testcontainers dependencies for integration testing
testImplementation("io.strimzi:strimzi-test-container:0.111.0")
implementation("org.testcontainers:junit-jupiter")
testImplementation("org.awaitility:awaitility")
testImplementation("com.squareup.okhttp3:okhttp:4.12.0")
testImplementation("org.testcontainers:testcontainers:1.19.7")
testImplementation("org.testcontainers:kafka:1.19.7")
implementation("io.rest-assured:rest-assured:5.5.5")
testImplementation("org.junit.jupiter:junit-jupiter:5.10.2")
testImplementation("org.testcontainers:junit-jupiter")
implementation("com.fasterxml.jackson.core:jackson-databind")
testImplementation("com.squareup.okhttp3:okhttp:4.12.0")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
}
Loading