Skip to content

Commit 7df922d

Browse files
committed
wip: First pass at Otel logging using ChatGpt
1 parent d674caf commit 7df922d

File tree

6 files changed

+252
-0
lines changed

6 files changed

+252
-0
lines changed

airbyte-commons-storage/src/main/kotlin/io/airbyte/commons/logging/LogEvent.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ fun ILoggingEvent.toLogEvent(): LogEvent {
3535
logSource = LogSource.find(this.mdcPropertyMap.getOrDefault(LOG_SOURCE_MDC_KEY, LogSource.PLATFORM.displayName)) ?: LogSource.PLATFORM,
3636
caller = caller,
3737
throwable = throwable,
38+
traceId = this.mdcPropertyMap["trace_id"] ?: this.mdcPropertyMap["traceId"], // OpenTelemetry trace ID from MDC (snake_case preferred)
39+
spanId = this.mdcPropertyMap["span_id"] ?: this.mdcPropertyMap["spanId"], // OpenTelemetry span ID from MDC (snake_case preferred)
3840
)
3941
}
4042

@@ -78,6 +80,8 @@ data class LogEvent(
7880
val logSource: LogSource = LogSource.PLATFORM,
7981
val caller: LogCaller? = null,
8082
val throwable: Throwable? = null,
83+
val traceId: String? = null,
84+
val spanId: String? = null,
8185
)
8286

8387
/**

airbyte-server/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ dependencies {
1919
implementation(libs.bundles.micronaut.cache)
2020
implementation(libs.bundles.micronaut.data.jdbc)
2121
implementation(libs.bundles.micronaut.metrics)
22+
implementation("io.micrometer:micrometer-observation")
23+
implementation("io.opentelemetry:opentelemetry-api")
24+
implementation("io.opentelemetry:opentelemetry-sdk")
25+
implementation("io.opentelemetry:opentelemetry-sdk-trace")
26+
implementation("io.micronaut.tracing:micronaut-tracing-opentelemetry-http")
2227
implementation(libs.micronaut.jaxrs.server)
2328
implementation(libs.micronaut.http)
2429
implementation(libs.jakarta.ws.rs.api)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.airbyte.server.config
2+
3+
import io.micronaut.context.annotation.Requires
4+
import io.micronaut.http.HttpRequest
5+
import io.micronaut.http.MutableHttpResponse
6+
import io.micronaut.http.annotation.Filter
7+
import io.micronaut.http.filter.HttpServerFilter
8+
import io.micronaut.http.filter.ServerFilterChain
9+
import io.opentelemetry.api.OpenTelemetry
10+
import io.opentelemetry.api.trace.Span
11+
import io.opentelemetry.api.trace.SpanKind
12+
import io.opentelemetry.api.trace.StatusCode
13+
import io.opentelemetry.api.trace.Tracer
14+
import io.opentelemetry.context.Scope
15+
import jakarta.inject.Singleton
16+
import org.slf4j.LoggerFactory
17+
import org.slf4j.MDC
18+
import reactor.core.publisher.Mono
19+
20+
private val logger = LoggerFactory.getLogger("MicronautHttpTracingFilter")
21+
22+
/**
23+
* Micronaut HTTP filter to automatically create OpenTelemetry spans for all HTTP requests.
24+
*
25+
* This filter creates a span for every incoming HTTP request, which automatically populates
26+
* MDC with trace and span IDs for structured logging correlation.
27+
*/
28+
@Filter("/**")
29+
@Singleton
30+
@Requires(bean = OpenTelemetry::class)
31+
class MicronautHttpTracingFilter(private val openTelemetry: OpenTelemetry) : HttpServerFilter {
32+
33+
private val tracer: Tracer = openTelemetry.getTracer("airbyte-server-http")
34+
35+
override fun doFilter(request: HttpRequest<*>, chain: ServerFilterChain): Mono<MutableHttpResponse<*>> {
36+
val method = request.method.name
37+
val path = request.path
38+
val spanName = "$method $path"
39+
40+
// Create a new span for this HTTP request
41+
val span = tracer.spanBuilder(spanName)
42+
.setSpanKind(SpanKind.SERVER)
43+
.setAttribute("http.method", method)
44+
.setAttribute("http.url", request.uri.toString())
45+
.setAttribute("http.scheme", request.uri.scheme ?: "http")
46+
.setAttribute("http.host", request.uri.host ?: "unknown")
47+
.setAttribute("http.target", path)
48+
.startSpan()
49+
50+
// Make this span current AND manually populate MDC
51+
val scope = span.makeCurrent()
52+
53+
// MANUALLY populate MDC with trace/span IDs
54+
val traceId = span.spanContext.traceId
55+
val spanId = span.spanContext.spanId
56+
MDC.put("trace_id", traceId)
57+
MDC.put("traceId", traceId)
58+
MDC.put("span_id", spanId)
59+
MDC.put("spanId", spanId)
60+
61+
logger.debug("HTTP span started for {} {} - traceId={}, spanId={}", method, path, traceId, spanId)
62+
63+
// Convert Publisher to Mono and handle the response
64+
return Mono.from(chain.proceed(request))
65+
.doOnNext { response ->
66+
try {
67+
span.setAttribute("http.status_code", response.status.code.toLong())
68+
if (response.status.code >= 400) {
69+
span.setStatus(StatusCode.ERROR, "HTTP ${response.status.code}")
70+
}
71+
logger.debug("HTTP span completed for {} {} with status {}", method, path, response.status.code)
72+
} catch (e: Exception) {
73+
span.recordException(e)
74+
span.setStatus(StatusCode.ERROR, e.message ?: "Unknown error")
75+
logger.error("HTTP span processing failed", e)
76+
}
77+
}
78+
.doOnError { throwable ->
79+
span.recordException(throwable)
80+
span.setStatus(StatusCode.ERROR, throwable.message ?: "Unknown error")
81+
logger.error("HTTP span failed for {} {}", method, path, throwable)
82+
}
83+
.doFinally {
84+
// Clean up MDC
85+
MDC.remove("trace_id")
86+
MDC.remove("traceId")
87+
MDC.remove("span_id")
88+
MDC.remove("spanId")
89+
90+
scope.close()
91+
span.end()
92+
logger.debug("HTTP span ended for {} {}", method, path)
93+
}
94+
}
95+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.server.config
6+
7+
import io.github.oshai.kotlinlogging.KotlinLogging
8+
import io.micrometer.observation.ObservationRegistry
9+
import io.micronaut.context.annotation.Factory
10+
import io.opentelemetry.api.GlobalOpenTelemetry
11+
import io.opentelemetry.api.trace.Tracer
12+
import io.opentelemetry.sdk.OpenTelemetrySdk
13+
import io.opentelemetry.sdk.trace.SdkTracerProvider
14+
import jakarta.inject.Singleton
15+
16+
private val logger = KotlinLogging.logger {}
17+
18+
/**
19+
* Factory for creating the ObservationRegistry bean.
20+
* This is required for Micrometer observations to work properly with our OpenTelemetry MDC shim.
21+
*/
22+
@Factory
23+
class ObservationRegistryBeanFactory {
24+
25+
@Singleton
26+
fun openTelemetrySDK(): OpenTelemetrySdk {
27+
logger.info { "🏭 Creating OpenTelemetry SDK with tracing enabled (no export)" }
28+
29+
// Create our custom MDC span processor
30+
val mdcSpanProcessor = OpenTelemetryMdcSpanProcessor()
31+
32+
// Create a TracerProvider that generates valid spans but doesn't export them
33+
val tracerProvider = SdkTracerProvider.builder()
34+
.addSpanProcessor(mdcSpanProcessor) // Add our MDC processor
35+
.build()
36+
37+
return OpenTelemetrySdk.builder()
38+
.setTracerProvider(tracerProvider)
39+
.build()
40+
.also { sdk ->
41+
// Set as global instance for the Tracer to use
42+
GlobalOpenTelemetry.set(sdk)
43+
logger.info { "✅ OpenTelemetry SDK configured with MDC SpanProcessor and set as global instance" }
44+
}
45+
}
46+
47+
@Singleton
48+
fun observationRegistry(): ObservationRegistry {
49+
logger.info { "🏭 Creating ObservationRegistry bean" }
50+
return ObservationRegistry.create()
51+
}
52+
53+
@Singleton
54+
fun tracer(openTelemetrySdk: OpenTelemetrySdk): Tracer {
55+
logger.info { "🏭 Creating OpenTelemetry Tracer bean from SDK" }
56+
return openTelemetrySdk.getTracer("airbyte-server")
57+
}
58+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.server.config
6+
7+
import io.github.oshai.kotlinlogging.KotlinLogging
8+
import io.opentelemetry.context.Context
9+
import io.opentelemetry.sdk.trace.ReadWriteSpan
10+
import io.opentelemetry.sdk.trace.ReadableSpan
11+
import io.opentelemetry.sdk.trace.SpanProcessor
12+
import org.slf4j.MDC
13+
14+
private val logger = KotlinLogging.logger {}
15+
16+
/**
17+
* OpenTelemetry SpanProcessor that automatically populates SLF4J MDC with trace/span IDs.
18+
* This processor hooks directly into the OpenTelemetry span lifecycle to ensure
19+
* trace/span IDs are available in MDC for all logging.
20+
*/
21+
class OpenTelemetryMdcSpanProcessor : SpanProcessor {
22+
23+
override fun onStart(parentContext: Context, span: ReadWriteSpan) {
24+
try {
25+
val spanContext = span.spanContext
26+
if (spanContext.isValid) {
27+
val traceId = spanContext.traceId
28+
val spanId = spanContext.spanId
29+
30+
// Inject OpenTelemetry IDs into MDC
31+
// OpenTelemetry standard uses snake_case for MDC keys
32+
MDC.put("trace_id", traceId) // OpenTelemetry standard format
33+
MDC.put("span_id", spanId) // OpenTelemetry standard format
34+
35+
// Also set camelCase for Airbyte compatibility
36+
MDC.put("traceId", traceId) // Airbyte format
37+
MDC.put("spanId", spanId) // Airbyte format
38+
39+
logger.debug { "🔍 SpanProcessor: Injected traceId=$traceId, spanId=$spanId into MDC" }
40+
}
41+
} catch (e: Exception) {
42+
logger.warn(e) { "Failed to inject OpenTelemetry context into MDC on span start" }
43+
}
44+
}
45+
46+
override fun onEnd(span: ReadableSpan) {
47+
try {
48+
// Clean up MDC when span ends (optional - depends on your threading model)
49+
// For now, we'll leave the values in MDC as they might be useful for subsequent operations
50+
logger.debug { "🔍 SpanProcessor: Span ended for ${span.spanContext.spanId}" }
51+
} catch (e: Exception) {
52+
logger.debug(e) { "Failed to clean up MDC on span end" }
53+
}
54+
}
55+
56+
override fun isStartRequired(): Boolean = true
57+
override fun isEndRequired(): Boolean = true
58+
}

airbyte-server/src/main/resources/application.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ micronaut:
5353
enabled: false
5454
web:
5555
enabled: ${MICROMETER_METRICS_ENABLED:false}
56+
http:
57+
services:
58+
tracing:
59+
enabled: ${HTTP_TRACING_ENABLED:true} # Enable HTTP request tracing
60+
client:
61+
tracing:
62+
enabled: ${HTTP_CLIENT_TRACING_ENABLED:true} # Enable HTTP client tracing
63+
tracing:
64+
opentelemetry:
65+
enabled: ${MICRONAUT_TRACING_ENABLED:true}
66+
http:
67+
server:
68+
enabled: ${HTTP_SERVER_TRACING_ENABLED:true}
69+
client:
70+
enabled: ${HTTP_CLIENT_TRACING_ENABLED:true}
5671
export:
5772
otlp:
5873
enabled: ${MICROMETER_METRICS_OTLP_ENABLED:false}
@@ -557,3 +572,20 @@ logger:
557572
# Uncomment to help resolve issues with micronaut data
558573
# com.zaxxer.hikari.HikariConfig: DEBUG
559574
# com.zaxxer.hikari: TRACE
575+
576+
# OpenTelemetry Configuration - Using Micronaut's built-in OpenTelemetry integration
577+
# This enables local trace generation WITHOUT any exporters (for internal tracing only)
578+
# Setting disabled to false enables tracing, but with no exporters configured means traces stay local
579+
otel:
580+
tracing:
581+
enabled: ${OTEL_TRACING_ENABLED:true} # Enable OpenTelemetry tracing
582+
sampler:
583+
probability: ${OTEL_SAMPLER_RATIO:1.0} # Sample all traces by default
584+
exporter:
585+
# Explicitly disable all exporters for local-only tracing
586+
otlp:
587+
enabled: false
588+
jaeger:
589+
enabled: false
590+
zipkin:
591+
enabled: false

0 commit comments

Comments
 (0)