Skip to content

Commit a31210c

Browse files
Copilottrask
andcommitted
Remove experimental from config key, simplify fallback, add test
Co-authored-by: trask <[email protected]>
1 parent 4f3d30d commit a31210c

File tree

5 files changed

+176
-24
lines changed

5 files changed

+176
-24
lines changed

conventions/.kotlin/sessions/kotlin-compiler-3745061290931361383.salive

Whitespace-only changes.

gradle-plugins/.kotlin/sessions/kotlin-compiler-7174543796092179295.salive

Whitespace-only changes.

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

8-
import static java.util.Collections.emptyList;
9-
108
import com.google.errorprone.annotations.CanIgnoreReturnValue;
119
import io.opentelemetry.api.GlobalOpenTelemetry;
1210
import io.opentelemetry.context.Context;
13-
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
1411
import io.opentelemetry.instrumentation.api.internal.Timer;
1512
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
1613
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
@@ -33,7 +30,7 @@
3330
public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
3431

3532
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
36-
"opentelemetry.experimental.kafka-telemetry.supplier";
33+
"opentelemetry.kafka-telemetry.supplier";
3734

3835
@Nullable private KafkaTelemetry telemetry;
3936
private String consumerGroup;
@@ -70,16 +67,7 @@ public void configure(Map<String, ?> configs) {
7067
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
7168
if (telemetrySupplier == null) {
7269
// Fallback to GlobalOpenTelemetry if not configured
73-
this.telemetry =
74-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
75-
.setMessagingReceiveInstrumentationEnabled(
76-
ConfigPropertiesUtil.getBoolean(
77-
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled",
78-
false))
79-
.setCapturedHeaders(
80-
ConfigPropertiesUtil.getList(
81-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
82-
.build();
70+
this.telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
8371
return;
8472
}
8573

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,8 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

8-
import static java.util.Collections.emptyList;
9-
108
import com.google.errorprone.annotations.CanIgnoreReturnValue;
119
import io.opentelemetry.api.GlobalOpenTelemetry;
12-
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
1310
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier;
1411
import java.util.Map;
1512
import java.util.Objects;
@@ -28,7 +25,7 @@
2825
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
2926

3027
public static final String CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER =
31-
"opentelemetry.experimental.kafka-telemetry.supplier";
28+
"opentelemetry.kafka-telemetry.supplier";
3229

3330
@Nullable private KafkaTelemetry telemetry;
3431
@Nullable private String clientId;
@@ -55,12 +52,7 @@ public void configure(Map<String, ?> configs) {
5552
Object telemetrySupplier = configs.get(CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
5653
if (telemetrySupplier == null) {
5754
// Fallback to GlobalOpenTelemetry if not configured
58-
this.telemetry =
59-
KafkaTelemetry.builder(GlobalOpenTelemetry.get())
60-
.setCapturedHeaders(
61-
ConfigPropertiesUtil.getList(
62-
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
63-
.build();
55+
this.telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
6456
return;
6557
}
6658

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
10+
11+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaTelemetrySupplier;
12+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
13+
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
14+
import java.io.ByteArrayInputStream;
15+
import java.io.ByteArrayOutputStream;
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.io.ObjectInputStream;
19+
import java.io.ObjectOutputStream;
20+
import java.io.ObjectStreamClass;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.apache.kafka.clients.consumer.KafkaConsumer;
25+
import org.apache.kafka.clients.producer.KafkaProducer;
26+
import org.apache.kafka.clients.producer.ProducerConfig;
27+
import org.apache.kafka.common.serialization.StringDeserializer;
28+
import org.apache.kafka.common.serialization.StringSerializer;
29+
import org.junit.jupiter.api.Assumptions;
30+
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.api.extension.RegisterExtension;
32+
33+
class KafkaTelemetryInterceptorTest {
34+
35+
@RegisterExtension
36+
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
37+
38+
private static Map<String, Object> producerConfig() {
39+
Map<String, Object> config = new HashMap<>();
40+
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
41+
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
42+
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
43+
config.putAll(
44+
KafkaTelemetry.create(testing.getOpenTelemetry()).producerInterceptorConfigProperties());
45+
return config;
46+
}
47+
48+
private static Map<String, Object> consumerConfig() {
49+
Map<String, Object> config = new HashMap<>();
50+
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
51+
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
52+
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
53+
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
54+
config.putAll(
55+
KafkaTelemetry.create(testing.getOpenTelemetry()).consumerInterceptorConfigProperties());
56+
return config;
57+
}
58+
59+
@Test
60+
void badProducerConfig() {
61+
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
62+
63+
// Bad config - wrong type for supplier
64+
assertThatThrownBy(
65+
() -> {
66+
Map<String, Object> producerConfig = producerConfig();
67+
producerConfig.put(
68+
TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, "foo");
69+
new KafkaProducer<>(producerConfig).close();
70+
})
71+
.hasRootCauseInstanceOf(IllegalStateException.class)
72+
.hasRootCauseMessage(
73+
"Configuration property opentelemetry.kafka-telemetry.supplier is not instance of Supplier");
74+
75+
// Bad config - supplier returns wrong type
76+
assertThatThrownBy(
77+
() -> {
78+
Map<String, Object> producerConfig = producerConfig();
79+
producerConfig.put(
80+
TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
81+
(java.util.function.Supplier<?>) () -> "not a KafkaTelemetry");
82+
new KafkaProducer<>(producerConfig).close();
83+
})
84+
.hasRootCauseInstanceOf(IllegalStateException.class)
85+
.hasRootCauseMessage(
86+
"Configuration property opentelemetry.kafka-telemetry.supplier supplier does not return KafkaTelemetry instance");
87+
}
88+
89+
@Test
90+
void badConsumerConfig() {
91+
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
92+
93+
// Bad config - wrong type for supplier
94+
assertThatThrownBy(
95+
() -> {
96+
Map<String, Object> consumerConfig = consumerConfig();
97+
consumerConfig.put(
98+
TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER, "foo");
99+
new KafkaConsumer<>(consumerConfig).close();
100+
})
101+
.hasRootCauseInstanceOf(IllegalStateException.class)
102+
.hasRootCauseMessage(
103+
"Configuration property opentelemetry.kafka-telemetry.supplier is not instance of Supplier");
104+
105+
// Bad config - supplier returns wrong type
106+
assertThatThrownBy(
107+
() -> {
108+
Map<String, Object> consumerConfig = consumerConfig();
109+
consumerConfig.put(
110+
TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER,
111+
(java.util.function.Supplier<?>) () -> "not a KafkaTelemetry");
112+
new KafkaConsumer<>(consumerConfig).close();
113+
})
114+
.hasRootCauseInstanceOf(IllegalStateException.class)
115+
.hasRootCauseMessage(
116+
"Configuration property opentelemetry.kafka-telemetry.supplier supplier does not return KafkaTelemetry instance");
117+
}
118+
119+
@Test
120+
void serializableConfig() throws IOException, ClassNotFoundException {
121+
testSerialize(producerConfig());
122+
testSerialize(consumerConfig());
123+
}
124+
125+
@SuppressWarnings("unchecked")
126+
private static void testSerialize(Map<String, Object> map)
127+
throws IOException, ClassNotFoundException {
128+
// Check that producer config has the supplier
129+
Object producerSupplier = map.get(TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
130+
Object consumerSupplier = map.get(TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER);
131+
132+
KafkaTelemetrySupplier supplier = null;
133+
if (producerSupplier instanceof KafkaTelemetrySupplier) {
134+
supplier = (KafkaTelemetrySupplier) producerSupplier;
135+
} else if (consumerSupplier instanceof KafkaTelemetrySupplier) {
136+
supplier = (KafkaTelemetrySupplier) consumerSupplier;
137+
}
138+
139+
assertThat(supplier).isNotNull();
140+
assertThat(supplier.get()).isNotNull();
141+
142+
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
143+
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
144+
outputStream.writeObject(map);
145+
}
146+
147+
class CustomObjectInputStream extends ObjectInputStream {
148+
CustomObjectInputStream(InputStream inputStream) throws IOException {
149+
super(inputStream);
150+
}
151+
152+
@Override
153+
protected Class<?> resolveClass(ObjectStreamClass desc)
154+
throws IOException, ClassNotFoundException {
155+
if (desc.getName().startsWith("io.opentelemetry.")) {
156+
throw new IllegalStateException(
157+
"Serial form contains opentelemetry class " + desc.getName());
158+
}
159+
return super.resolveClass(desc);
160+
}
161+
}
162+
163+
try (ObjectInputStream inputStream =
164+
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
165+
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
166+
assertThat(result.get(TracingProducerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER))
167+
.isNull();
168+
assertThat(result.get(TracingConsumerInterceptor.CONFIG_KEY_KAFKA_TELEMETRY_SUPPLIER))
169+
.isNull();
170+
}
171+
}
172+
}

0 commit comments

Comments
 (0)