Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3943](#3943))
- Add more Semconv attributes to LLMInvocation spans.
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3862](#3862))
- Add metrics to LLMInvocation traces
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891))

## Version 0.2b0 (2025-10-14)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,24 @@

from __future__ import annotations

import timeit
from contextlib import contextmanager
from typing import Iterator

from opentelemetry import context as otel_context
from opentelemetry.metrics import MeterProvider, get_meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import (
Span,
SpanKind,
TracerProvider,
get_tracer,
set_span_in_context,
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.span_utils import (
_apply_error_attributes,
_apply_finish_attributes,
Expand All @@ -88,13 +92,35 @@ class TelemetryHandler:
them as spans, metrics, and events.
"""

def __init__(self, tracer_provider: TracerProvider | None = None):
def __init__(
self,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
):
self._tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_37_0.value,
)
self._metrics_recorder: InvocationMetricsRecorder | None = None
meter = get_meter(__name__, meter_provider=meter_provider)
self._metrics_recorder = InvocationMetricsRecorder(meter)

def _record_llm_metrics(
self,
invocation: LLMInvocation,
span: Span | None = None,
*,
error_type: str | None = None,
) -> None:
if self._metrics_recorder is None or span is None:
return
self._metrics_recorder.record(
span,
invocation,
error_type=error_type,
)

def start_llm(
self,
Expand All @@ -106,6 +132,9 @@ def start_llm(
name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}",
kind=SpanKind.CLIENT,
)
# Record a monotonic start timestamp (seconds) for duration
# calculation using timeit.default_timer.
invocation.monotonic_start_s = timeit.default_timer()
invocation.span = span
invocation.context_token = otel_context.attach(
set_span_in_context(span)
Expand All @@ -118,10 +147,12 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab
# TODO: Provide feedback that this invocation was not started
return invocation

_apply_finish_attributes(invocation.span, invocation)
span = invocation.span
_apply_finish_attributes(span, invocation)
self._record_llm_metrics(invocation, span)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation

def fail_llm( # pylint: disable=no-self-use
Expand All @@ -132,11 +163,14 @@ def fail_llm( # pylint: disable=no-self-use
# TODO: Provide feedback that this invocation was not started
return invocation

span = invocation.span
_apply_finish_attributes(invocation.span, invocation)
_apply_error_attributes(invocation.span, error)
_apply_error_attributes(span, error)
error_type = getattr(error.type, "__qualname__", None)
self._record_llm_metrics(invocation, span, error_type=error_type)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation

@contextmanager
Expand Down Expand Up @@ -166,6 +200,7 @@ def llm(

def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
) -> TelemetryHandler:
"""
Returns a singleton TelemetryHandler instance.
Expand All @@ -174,6 +209,9 @@ def get_telemetry_handler(
get_telemetry_handler, "_default_handler", None
)
if handler is None:
handler = TelemetryHandler(tracer_provider=tracer_provider)
handler = TelemetryHandler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
setattr(get_telemetry_handler, "_default_handler", handler)
return handler
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
0.02,
0.04,
0.08,
0.16,
0.32,
0.64,
1.28,
2.56,
5.12,
10.24,
20.48,
40.96,
81.92,
]

_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [
1,
4,
16,
64,
256,
1024,
4096,
16384,
65536,
262144,
1048576,
4194304,
16777216,
67108864,
]


class Instruments:
def __init__(self, meter: Meter):
self.operation_duration_histogram: Histogram = meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION,
description="Duration of GenAI client operation",
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)
self.token_usage_histogram: Histogram = meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE,
description="Number of input and output tokens used by GenAI clients",
unit="{token}",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Helpers for emitting GenAI metrics from LLM invocations."""

from __future__ import annotations

import time
import timeit
from numbers import Number
from typing import Dict, Optional

from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.util.genai.instruments import Instruments
from opentelemetry.util.genai.types import LLMInvocation
from opentelemetry.util.types import AttributeValue

_NS_PER_SECOND = 1_000_000_000


def _get_span_start_time_ns(span: Optional[Span]) -> Optional[int]:
if span is None:
return None
for attr in ("start_time", "_start_time"):
value = getattr(span, attr, None)
if isinstance(value, int):
return value
return None


def _calculate_duration_seconds(
span: Optional[Span], invocation: LLMInvocation
) -> Optional[float]:
"""Calculate duration in seconds from a start time to now.
If `invocation.monotonic_start_ns` is present, use a monotonic
clock (`perf_counter_ns`) for elapsed time. Otherwise fall back to the
span's wall-clock start time (epoch ns) and `time_ns()` for now.
Returns None if no usable start time is available.
"""
# Prefer an explicit monotonic start on the invocation (seconds)
if invocation.monotonic_start_s is not None:
return max(timeit.default_timer() - invocation.monotonic_start_s, 0.0)

# Fall back to span start_time (wall clock epoch ns)
start_time_ns = _get_span_start_time_ns(span)
if start_time_ns is None:
return None
elapsed_ns = max(time.time_ns() - start_time_ns, 0)
return elapsed_ns / _NS_PER_SECOND


class InvocationMetricsRecorder:
"""Records duration and token usage histograms for GenAI invocations."""

def __init__(self, meter: Meter):
instruments = Instruments(meter)
self._duration_histogram: Histogram = (
instruments.operation_duration_histogram
)
self._token_histogram: Histogram = instruments.token_usage_histogram

def record(
self,
span: Optional[Span],
invocation: LLMInvocation,
*,
error_type: Optional[str] = None,
) -> None:
"""Record duration and token metrics for an invocation if possible."""
if span is None:
return

token_counts: list[tuple[int, str]] = []
if invocation.input_tokens is not None:
token_counts.append(
(
invocation.input_tokens,
GenAI.GenAiTokenTypeValues.INPUT.value,
)
)
if invocation.output_tokens is not None:
token_counts.append(
(
invocation.output_tokens,
GenAI.GenAiTokenTypeValues.OUTPUT.value,
)
)

attributes: Dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value
}
if invocation.request_model:
attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model
if invocation.provider:
attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider
if invocation.response_model_name:
attributes[GenAI.GEN_AI_RESPONSE_MODEL] = (
invocation.response_model_name
)

# Calculate duration from span timing or invocation monotonic start
duration_seconds = _calculate_duration_seconds(span, invocation)

span_context = set_span_in_context(span)
if error_type:
attributes["error.type"] = error_type

if (
duration_seconds is not None
and isinstance(duration_seconds, Number)
and duration_seconds >= 0
):
self._duration_histogram.record(
duration_seconds,
attributes=attributes,
context=span_context,
)

for token_count, token_type in token_counts:
self._token_histogram.record(
token_count,
attributes=attributes | {GenAI.GEN_AI_TOKEN_TYPE: token_type},
context=span_context,
)


__all__ = ["InvocationMetricsRecorder"]
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ class LLMInvocation:
max_tokens: int | None = None
stop_sequences: list[str] | None = None
seed: int | None = None
# Monotonic start time in seconds (from timeit.default_timer) used
# for duration calculations to avoid mixing clock sources. This is
# populated by the TelemetryHandler when starting an invocation.
monotonic_start_s: float | None = None


@dataclass
Expand Down
Loading