-
Notifications
You must be signed in to change notification settings - Fork 1k
Add initial instrumentation of kafka connect SinkTask #14478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
...telemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java
Outdated
Show resolved
Hide resolved
...va/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java
Outdated
Show resolved
Hide resolved
...va/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
instrumentation/kafka/kafka-connect-2.6/javaagent/build.gradle.kts
Outdated
Show resolved
Hide resolved
@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span. Examples:JDBC Kafka Connector: Uses MongoDB Kafka Connector: Uses Cosmos DB Connector: Uses Cosmos DB SDK calls. Since Cosmos DB doesn't have OpenTelemetry instrumentation, the trace stops at the Kafka Connect span, but span links to producers are preserved. Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry. Let me know if this answers your question!! |
Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.
You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by Line 57 in eb7ea9e
I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate? |
@laurit Thanks for the reply!! Appreciate the help and thanks for correcting me. There are three things which I want to breakdown here:
I see that, we have perfect trace propagation:
Should I change this to CONSUMER as I am conflicted? I'm open to feedback on this decision. |
Doesn't it read from the topic like a regular consumer? If you believe that |
@laurit Yep agree with you on this. Kafka Connect does read from topics like a regular consumer. Looking at it from that perspective,
Regarding the convention about INTERNAL spans needing to be behind a flag - that's a great point I wasn't aware of. Since there are established semantic conventions for messaging consumers, Edit: I updated the implementation to use |
…he tests for accounting database span propagation.
🔧 The result from spotlessApply was committed to the PR branch. |
🔧 The result from spotlessApply was committed to the PR branch. |
🔧 The result from spotlessApply was committed to the PR branch. |
🔧 The result from spotlessApply was committed to the PR branch. |
@laurit Can you please take one more look? I refactored and made the following changes:
Let me know what do you think. |
.../opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectAttributesGetter.java
Outdated
Show resolved
Hide resolved
...ava/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java
Outdated
Show resolved
Hide resolved
...ava/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java
Outdated
Show resolved
Hide resolved
...main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java
Outdated
Show resolved
Hide resolved
...main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectTask.java
Outdated
Show resolved
Hide resolved
...va/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkRecordHeadersGetter.java
Outdated
Show resolved
Hide resolved
...t/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java
Outdated
Show resolved
Hide resolved
… removing redundant code in test cases.
🔧 The result from spotlessApply was committed to the PR branch. |
@laurit Can you please take another look? Kafka connect's WorkerSinkTask is pulling records from multiple topics instead of one single topic. I am returning the topic name if all the records are from single topic otherwise I am returning a null. Here is the
Please let me know if you need additional changes. |
📋 PR Description
Overview
This PR adds OpenTelemetry instrumentation for Apache Kafka Connect SinkTask operations, providing observability into data pipeline processing from Kafka topics to external systems.
What This Adds
SinkTask.put()
method: Creates spans for batch processing operationsKey Features
🔍 Span Creation
KafkaConnect.put
INTERNAL
thread.name
,thread.id
)🔗 Trace Context Propagation
TextMapPropagator
🧪 Testing Approach
Uses service-side instrumentation testing pattern (similar to JMX metrics instrumentation):
Technical Implementation
Instrumentation Details
Context Extraction
PropagatorBasedSpanLinksExtractor
for creating span linksSinkRecordHeadersGetter
for extracting headers from Kafka recordsClassloader Considerations
Kafka Connect's
PluginClassLoader
isolation required the Singletons Static Reference pattern:Testing Strategy
Why Not
InstrumentationExtension
?Unlike client-side instrumentations, Kafka Connect runs as a separate service in its own JVM. The standard
InstrumentationExtension
cannot collect spans from containerized services, requiring an OTLP-based approach.Test Architecture
Test Coverage
Performance Considerations
put()
method during batch processingFuture Enhancements
SourceTask
instrumentation)Testing Instructions
Related Issues
Thanks in advance and please let me know your thoughts.