Skip to content

Commit e791fc8

Browse files
Copilottrask
andcommitted
Add helper methods to KafkaTelemetry for interceptor config
Co-authored-by: trask <[email protected]>
1 parent e5ca107 commit e791fc8

File tree

3 files changed

+65
-13
lines changed

3 files changed

+65
-13
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,34 @@ The Kafka clients API provides a way to "intercept" messages before they are sen
3535
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
3636
The interceptor class has to be set in the properties bag used to create the Kafka client.
3737

38-
##### Recommended approach: Configuring interceptors with OpenTelemetry
38+
##### Recommended approach: Configuring interceptors with KafkaTelemetry
3939

40-
The recommended way to use interceptors is to configure them with an `OpenTelemetry` instance.
40+
The recommended way to use interceptors is to configure them with a `KafkaTelemetry` instance.
4141
Interceptors will use system properties for additional configuration like captured headers and receive telemetry settings.
4242

4343
For the producer:
4444

4545
```java
46+
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
47+
4648
Map<String, Object> props = new HashMap<>();
4749
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
4850
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
49-
props.put(TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
50-
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier(openTelemetry));
51+
props.putAll(telemetry.producerInterceptorConfigProperties());
5152

5253
Producer<String, String> producer = new KafkaProducer<>(props);
5354
```
5455

5556
For the consumer:
5657

5758
```java
59+
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
60+
5861
Map<String, Object> props = new HashMap<>();
5962
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
6063
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
6164
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
62-
props.put(TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
63-
new io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier(openTelemetry));
65+
props.putAll(telemetry.consumerInterceptorConfigProperties());
6466

6567
Consumer<String, String> consumer = new KafkaConsumer<>(props);
6668
```

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,58 @@ <K, V> ConsumerRecords<K, V> addTracing(
207207
return Collections.unmodifiableMap(config);
208208
}
209209

210+
/**
211+
* Returns configuration properties that can be used to enable tracing via {@code
212+
* TracingProducerInterceptor}. Add these resulting properties to the configuration map used to
213+
* initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}.
214+
*
215+
* <p>Example usage:
216+
*
217+
* <pre>{@code
218+
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
219+
* // Map<String, Object> config = new HashMap<>();
220+
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
221+
* // config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
222+
* // config.putAll(telemetry.producerInterceptorConfigProperties());
223+
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
224+
* }</pre>
225+
*
226+
* @return the kafka producer interceptor config properties
227+
*/
228+
public Map<String, ?> producerInterceptorConfigProperties() {
229+
Map<String, Object> config = new HashMap<>();
230+
config.put(
231+
TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
232+
new OpenTelemetrySupplier(openTelemetry));
233+
return Collections.unmodifiableMap(config);
234+
}
235+
236+
/**
237+
* Returns configuration properties that can be used to enable tracing via {@code
238+
* TracingConsumerInterceptor}. Add these resulting properties to the configuration map used to
239+
* initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
240+
*
241+
* <p>Example usage:
242+
*
243+
* <pre>{@code
244+
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
245+
* // Map<String, Object> config = new HashMap<>();
246+
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
247+
* // config.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
248+
* // config.putAll(telemetry.consumerInterceptorConfigProperties());
249+
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
250+
* }</pre>
251+
*
252+
* @return the kafka consumer interceptor config properties
253+
*/
254+
public Map<String, ?> consumerInterceptorConfigProperties() {
255+
Map<String, Object> config = new HashMap<>();
256+
config.put(
257+
TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
258+
new OpenTelemetrySupplier(openTelemetry));
259+
return Collections.unmodifiableMap(config);
260+
}
261+
210262
/**
211263
* Build and inject span into record.
212264
*

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import static org.assertj.core.api.Assertions.assertThat;
99

1010
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaClientBaseTest;
11-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
1211
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
1312
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
1413
import java.nio.charset.StandardCharsets;
@@ -29,14 +28,15 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {
2928

3029
static final String greeting = "Hello Kafka!";
3130

31+
private static final KafkaTelemetry kafkaTelemetry =
32+
KafkaTelemetry.create(testing.getOpenTelemetry());
33+
3234
@Override
3335
public Map<String, Object> producerProps() {
3436
Map<String, Object> props = super.producerProps();
3537
props.put(
3638
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
37-
props.put(
38-
TracingProducerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
39-
new OpenTelemetrySupplier(testing.getOpenTelemetry()));
39+
props.putAll(kafkaTelemetry.producerInterceptorConfigProperties());
4040
return props;
4141
}
4242

@@ -45,9 +45,7 @@ public Map<String, Object> consumerProps() {
4545
Map<String, Object> props = super.consumerProps();
4646
props.put(
4747
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
48-
props.put(
49-
TracingConsumerInterceptor.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
50-
new OpenTelemetrySupplier(testing.getOpenTelemetry()));
48+
props.putAll(kafkaTelemetry.consumerInterceptorConfigProperties());
5149
return props;
5250
}
5351

0 commit comments

Comments
 (0)