Skip to content

Conversation

vasantteja
Copy link

📋 PR Description

Overview

This PR adds OpenTelemetry instrumentation for Apache Kafka Connect SinkTask operations, providing observability into data pipeline processing from Kafka topics to external systems.

What This Adds

  • Instrumentation for SinkTask.put() method: Creates spans for batch processing operations
  • Trace context propagation: Links producer spans to sink processing spans using Kafka message headers
  • Comprehensive integration tests: Docker-based tests with MongoDB and PostgreSQL sinks
  • Support for Kafka Connect 2.6+: Compatible with modern Kafka Connect deployments

Key Features

🔍 Span Creation

  • Span name: KafkaConnect.put
  • Span kind: INTERNAL
  • Attributes: Thread information (thread.name, thread.id)
  • Links: Creates span links to producer spans extracted from Kafka message headers

🔗 Trace Context Propagation

  • Extracts trace context from Kafka message headers using TextMapPropagator
  • Links sink processing spans to original producer spans for end-to-end tracing
  • Handles batch processing scenarios with multiple records

🧪 Testing Approach

Uses service-side instrumentation testing pattern (similar to JMX metrics instrumentation):

  • Docker containers for Kafka Connect, Kafka, and sink databases
  • OTLP backend to collect spans from containerized services
  • Integration tests with real MongoDB and PostgreSQL connectors
  • Robust container lifecycle management with proper cleanup

Technical Implementation

Instrumentation Details

// Instruments all SinkTask implementations
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
  return hasSuperType(named("org.apache.kafka.connect.sink.SinkTask"));
}

// Instruments the put() method
@Override
public void transform(TypeTransformer transformer) {
  transformer.applyAdviceToMethod(
    isPublic().and(named("put")).and(takesArgument(0, Collection.class)),
    SinkTaskInstrumentation.class.getName() + "$SinkTaskPutAdvice");
}

Context Extraction

  • Uses PropagatorBasedSpanLinksExtractor for creating span links
  • Implements SinkRecordHeadersGetter for extracting headers from Kafka records
  • Handles classloader isolation challenges in Kafka Connect plugin architecture

Classloader Considerations

Kafka Connect's PluginClassLoader isolation required the Singletons Static Reference pattern:

// Access TextMapGetter through singletons to avoid classloader issues
Context extractedContext = KafkaConnectSingletons.propagator()
    .extract(parentContext, firstRecord, KafkaConnectSingletons.sinkRecordHeaderGetter());

Testing Strategy

Why Not InstrumentationExtension?

Unlike client-side instrumentations, Kafka Connect runs as a separate service in its own JVM. The standard InstrumentationExtension cannot collect spans from containerized services, requiring an OTLP-based approach.

Test Architecture

┌─────────────────┐    ┌──────────────┐    ┌─────────────────┐
│   Test JVM      │    │ Kafka Connect│    │  OTLP Backend   │
│                 │    │  Container    │    │   Container     │
│ • Produces msgs │───▶│ • Processes   │───▶│ • Collects      │
│ • Verifies spans│    │ • Instruments │    │   spans         │
└─────────────────┘    └──────────────┘    └─────────────────┘

Test Coverage

  • MongoDB Sink: Tests with MongoDB connector and document insertion
  • PostgreSQL Sink: Tests with JDBC connector and table insertion
  • Span verification: Validates span names, attributes, and trace linking
  • Container lifecycle: Proper setup, execution, and cleanup
  • ARM64 compatibility: Handles Docker architecture emulation with increased timeouts

Performance Considerations

  • Minimal overhead: Only instruments the put() method during batch processing
  • Efficient context extraction: Extracts context only from the first record in batch
  • Proper resource cleanup: All containers and connections are properly closed

Future Enhancements

  • Support for source tasks (SourceTask instrumentation)

Testing Instructions

# Run all Kafka Connect tests
./gradlew :instrumentation:kafka:kafka-connect-2.6:testing:test

# Run specific tests
./gradlew :instrumentation:kafka:kafka-connect-2.6:testing:test --tests="*MongoKafkaConnectSinkTaskTest*"
./gradlew :instrumentation:kafka:kafka-connect-2.6:testing:test --tests="*PostgresKafkaConnectSinkTaskTest*"

# Build the instrumentation
./gradlew :instrumentation:kafka:kafka-connect-2.6:javaagent:build

Related Issues

  1. Multiple Trace-ids are generated for Kafka sink connector  #12322
  2. Traces are not propagated from kafka connect to mongoDB  #12261

Thanks in advance and please let me know your thoughts.

@vasantteja vasantteja requested a review from a team as a code owner August 20, 2025 21:49
Copy link
Contributor

@laurit laurit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.

@vasantteja
Copy link
Author

vasantteja commented Aug 28, 2025

Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

MongoDB Kafka Connector: Uses collection.bulkWrite(). We don't have instrumentation for the bulkWrite() function (MongoDB instrumentation only covers wire protocol commands like insert, update, delete), hence we will not see parent-child relationships between the Kafka Connect span and the resulting MongoDB spans.

Cosmos DB Connector: Uses Cosmos DB SDK calls. Since Cosmos DB doesn't have OpenTelemetry instrumentation, the trace stops at the Kafka Connect span, but span links to producers are preserved.

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

Let me know if this answers your question!!

@laurit
Copy link
Contributor

laurit commented Sep 2, 2025

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate?

@vasantteja
Copy link
Author

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate?

@laurit Thanks for the reply!! Appreciate the help and thanks for correcting me.

There are three things which I want to breakdown here:

  1. Thanks for the context on the database part. It makes sense now. So I believe if we link the producer trace with kafka-connect we are good.
  2. I am sorry I was mistaken on the JDBC instrumentation. In fact, I wrote a test to confirm if it's being instrumented or not and it looks like it's being instrumented. Here are the logs from the container showing the complete trace flow:
[otel.javaagent 2025-09-02 22:14:56:041 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce 36869b3ce35d42f3 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:041 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=36869b3ce35d42f3, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296025217709, endEpochNanos=1756851296040659750, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[2025-09-02 22:14:56,041] INFO Using PostgreSql dialect TABLE "person" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[otel.javaagent 2025-09-02 22:14:56:049 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce 6d767da1a2b935f7 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT result.TABLE_CAT, result.TABLE_SCHEM, result.TABLE_NAME, result.COLUMN_NAME, result.KEY_SEQ, result.PK_NAME FROM (SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME, (information_schema._pg_expandarray(i.indkey)).n AS KEY_SEQ, ci.relname AS PK_NAME, information_schema._pg_expandarray(i.indkey) AS KEYS, a.attnum AS A_ATTNUM FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) JOIN pg_catalog.pg_index i ON ( a.attrelid = i.indrelid) JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE true AND ct.relname = ? AND i.indisprimary ) result where result.A_ATTNUM = (result.KEYS).x ORDER BY result.table_name, result.pk_name, result.key_seq}, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:050 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=6d767da1a2b935f7, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296042146375, endEpochNanos=1756851296049224084, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT result.TABLE_CAT, result.TABLE_SCHEM, result.TABLE_NAME, result.COLUMN_NAME, result.KEY_SEQ, result.PK_NAME FROM (SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME, (information_schema._pg_expandarray(i.indkey)).n AS KEY_SEQ, ci.relname AS PK_NAME, information_schema._pg_expandarray(i.indkey) AS KEYS, a.attnum AS A_ATTNUM FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) JOIN pg_catalog.pg_index i ON ( a.attrelid = i.indrelid) JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE true AND ct.relname = ? AND i.indisprimary ) result where result.A_ATTNUM = (result.KEYS).x ORDER BY result.table_name, result.pk_name, result.key_seq}, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[otel.javaagent 2025-09-02 22:14:56:074 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce bc140d85a1f10b67 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT * FROM (SELECT n.nspname,c.relname,a.attname,a.atttypid,a.attnotnull OR (t.typtype = ? AND t.typnotnull) AS attnotnull,a.atttypmod,a.attlen,t.typtypmod,row_number() OVER (PARTITION BY a.attrelid ORDER BY a.attnum) AS attnum, nullif(a.attidentity, ?) as attidentity,null as attgenerated,pg_catalog.pg_get_expr(def.adbin, def.adrelid) AS adsrc,dsc.description,t.typbasetype,t.typtype FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (a.attrelid=c.oid) JOIN pg_catalog.pg_type t ON (a.atttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) LEFT JOIN pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND dc.relname=?) LEFT JOIN pg_catalog.pg_namespace dn ON (dc.relnamespace=dn.oid AND dn.nspname=?) WHERE c.relkind in (?) and a.attnum > ? AND NOT a.attisdropped AND c.relname LIKE ?) c WHERE true ORDER BY nspname,c.relname,attnum }, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:074 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=bc140d85a1f10b67, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296053238250, endEpochNanos=1756851296073618584, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT * FROM (SELECT n.nspname,c.relname,a.attname,a.atttypid,a.attnotnull OR (t.typtype = ? AND t.typnotnull) AS attnotnull,a.atttypmod,a.attlen,t.typtypmod,row_number() OVER (PARTITION BY a.attrelid ORDER BY a.attnum) AS attnum, nullif(a.attidentity, ?) as attidentity,null as attgenerated,pg_catalog.pg_get_expr(def.adbin, def.adrelid) AS adsrc,dsc.description,t.typbasetype,t.typtype FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (a.attrelid=c.oid) JOIN pg_catalog.pg_type t ON (a.atttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) LEFT JOIN pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND dc.relname=?) LEFT JOIN pg_catalog.pg_namespace dn ON (dc.relnamespace=dn.oid AND dn.nspname=?) WHERE c.relkind in (?) and a.attnum > ? AND NOT a.attisdropped AND c.relname LIKE ?) c WHERE true ORDER BY nspname,c.relname,attnum }, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[2025-09-02 22:14:56,082] INFO Checking PostgreSql dialect for type of TABLE "person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[otel.javaagent 2025-09-02 22:14:56:086 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce 501aa90eb31583b9 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:087 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=501aa90eb31583b9, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296082378959, endEpochNanos=1756851296086323542, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[2025-09-02 22:14:56,090] INFO Setting metadata for table "person" to Table{name='"person"', type=TABLE columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=int4}, Column{'name', isPrimaryKey=false, allowsNull=false, sqlType=varchar}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[otel.javaagent 2025-09-02 22:14:56:163 +0000] [task-thread-test-connector-0] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener - Transformed org.postgresql.jdbc.PgPreparedStatement -- PluginClassLoader{pluginLocation=file:/usr/share/java/confluentinc-kafka-connect-jdbc/}
[otel.javaagent 2025-09-02 22:14:56:196 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'INSERT test.person' : faa1a2f506b412fc46b6daf1c8bb9bce 999244f75d4a7d29 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=INSERT, db.sql.table=person, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=INSERT INTO "person" ("id","name") VALUES (?,?)}, capacity=128, totalAddedValues=11}
[otel.javaagent 2025-09-02 22:14:56:196 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=999244f75d4a7d29, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=INSERT test.person, kind=CLIENT, startEpochNanos=1756851296189256750, endEpochNanos=1756851296196073625, attributes=AttributesMap{data={thread.id=129, db.operation=INSERT, db.sql.table=person, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=INSERT INTO "person" ("id","name") VALUES (?,?)}, capacity=128, totalAddedValues=11}, totalAttributeCount=11, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[otel.javaagent 2025-09-02 22:14:56:201 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'KafkaConnect.put' : faa1a2f506b412fc46b6daf1c8bb9bce ebe3a0102b0469cf INTERNAL [tracer: io.opentelemetry.kafka-connect-2.6:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, thread.name=task-thread-test-connector-0}, capacity=128, totalAddedValues=2}
[otel.javaagent 2025-09-02 22:14:56:202 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=e37a2c4da90a8684, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=true, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.kafka-connect-2.6, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=KafkaConnect.put, kind=INTERNAL, startEpochNanos=1756851295420017000, endEpochNanos=1756851296201312542, attributes=AttributesMap{data={thread.id=129, thread.name=task-thread-test-connector-0}, capacity=128, totalAddedValues=2}, totalAttributeCount=2, events=[], totalRecordedEvents=0, links=[ImmutableLinkData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=e37a2c4da90a8684, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=true, valid=true}, attributes={}, totalAttributeCount=0}], totalRecordedLinks=1, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}

I see that, we have perfect trace propagation:
Producer span: e37a2c4da90a8684 (remote=true, the original message)
Kafka Connect span: ebe3a0102b0469cf (parent: e37a2c4da90a8684, with span link)
JDBC spans: All have parent ebe3a0102b0469cf, including the crucial INSERT test.person operation

  1. Regarding the span kind: I felt Kafka Connect acts more like an internal processing/transformation layer rather than a direct consumer client. It receives messages, processes them (potentially transforming data), and forwards them to sinks. This seems more aligned with internal processing than direct message consumption. Tbh I started developing this as a CONSUMER and shifted it to INTERNAL midway.

Should I change this to CONSUMER as I am conflicted? I'm open to feedback on this decision.

@laurit
Copy link
Contributor

laurit commented Sep 3, 2025

3. Regarding the span kind: I felt Kafka Connect acts more like an internal processing/transformation layer rather than a direct consumer client. It receives messages, processes them (potentially transforming data), and forwards them to sinks. This seems more aligned with internal processing than direct message consumption. Tbh I started developing this as a CONSUMER and shifted it to INTERNAL midway.

Should I change this to CONSUMER as I am conflicted? I'm open to feedback on this decision.

Doesn't it read from the topic like a regular consumer? If you believe that INTERNAL span is the correct choice then keep in mind that the convention in this project is not to emit telemetry that doesn't have semantic conventions in default configuration. This means that we usually instrumentations that only emit internal spans need to be enabled with a flag e.g

@vasantteja
Copy link
Author

vasantteja commented Sep 3, 2025

Doesn't it read from the topic like a regular consumer? If you believe that INTERNAL span is the correct choice then keep in mind that the convention in this project is not to emit telemetry that doesn't have semantic conventions in default configuration. This means that we usually instrumentations that only emit internal spans need to be enabled with a flag e.g

@laurit Yep agree with you on this. Kafka Connect does read from topics like a regular consumer.

Looking at it from that perspective, CONSUMER makes more sense since:

  • Kafka Connect fundamentally consumes messages from Kafka topics
  • The processing/transformation aspect doesn't change the fact that it's consuming from messaging infrastructure
  • It aligns better with the semantic conventions for messaging systems

Regarding the convention about INTERNAL spans needing to be behind a flag - that's a great point I wasn't aware of. Since there are established semantic conventions for messaging consumers, CONSUMER would be the more appropriate choice here.

Edit: I updated the implementation to use CONSUMER span kind. Thank you for the guidance! Let me know your thoughts.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@vasantteja
Copy link
Author

@laurit Can you please take one more look? I refactored and made the following changes:

  1. Changed the span naming algorithm to include all the topics instead of the first record topic name.
  2. Remove the parent child relationship between producer and kafka connect spans.
  3. Modularized and rewrote the tests with new SmokeTestInstrumentationExtension

Let me know what do you think.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@vasantteja
Copy link
Author

@laurit Can you please take another look? Kafka connect's WorkerSinkTask is pulling records from multiple topics instead of one single topic. I am returning the topic name if all the records are from single topic otherwise I am returning a null. Here is the SpanData

{ "traceId" : "835d2dc1437d7f420a28adfab89347bb", "spanId" : "d9db28e45ce804f7", "startEpochNanos" : 1759528513964221000, "endEpochNanos" : 1759528514783972376, "kind" : "CONSUMER", "name" : "unknown process", "attributes" : { "messaging.system" : "kafka", "messaging.batch.message_count" : 3, "thread.name" : "task-thread-test-postgres-connector-multi-0", "thread.id" : 134, "messaging.operation" : "process" }, "parentSpanId" : "0000000000000000" }

Please let me know if you need additional changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants