diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index e6be4999f8..ca2c2d0a0e 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3943](#3943)) - Add more Semconv attributes to LLMInvocation spans. ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3862](#3862)) +- Add metrics to LLMInvocation traces + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891)) ## Version 0.2b0 (2025-10-14) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index d7758e16fc..bc2f2fa350 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -60,20 +60,24 @@ from __future__ import annotations +import timeit from contextlib import contextmanager from typing import Iterator from opentelemetry import context as otel_context +from opentelemetry.metrics import MeterProvider, get_meter from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import ( + Span, SpanKind, TracerProvider, get_tracer, set_span_in_context, ) +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( _apply_error_attributes, _apply_finish_attributes, @@ -88,13 +92,35 @@ class TelemetryHandler: them as spans, metrics, and events. """ - def __init__(self, tracer_provider: TracerProvider | None = None): + def __init__( + self, + tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, + ): self._tracer = get_tracer( __name__, __version__, tracer_provider, schema_url=Schemas.V1_37_0.value, ) + self._metrics_recorder: InvocationMetricsRecorder | None = None + meter = get_meter(__name__, meter_provider=meter_provider) + self._metrics_recorder = InvocationMetricsRecorder(meter) + + def _record_llm_metrics( + self, + invocation: LLMInvocation, + span: Span | None = None, + *, + error_type: str | None = None, + ) -> None: + if self._metrics_recorder is None or span is None: + return + self._metrics_recorder.record( + span, + invocation, + error_type=error_type, + ) def start_llm( self, @@ -106,6 +132,9 @@ def start_llm( name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}", kind=SpanKind.CLIENT, ) + # Record a monotonic start timestamp (seconds) for duration + # calculation using timeit.default_timer. + invocation.monotonic_start_s = timeit.default_timer() invocation.span = span invocation.context_token = otel_context.attach( set_span_in_context(span) @@ -118,10 +147,12 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab # TODO: Provide feedback that this invocation was not started return invocation - _apply_finish_attributes(invocation.span, invocation) + span = invocation.span + _apply_finish_attributes(span, invocation) + self._record_llm_metrics(invocation, span) # Detach context and end span otel_context.detach(invocation.context_token) - invocation.span.end() + span.end() return invocation def fail_llm( # pylint: disable=no-self-use @@ -132,11 +163,14 @@ def fail_llm( # pylint: disable=no-self-use # TODO: Provide feedback that this invocation was not started return invocation + span = invocation.span _apply_finish_attributes(invocation.span, invocation) - _apply_error_attributes(invocation.span, error) + _apply_error_attributes(span, error) + error_type = getattr(error.type, "__qualname__", None) + self._record_llm_metrics(invocation, span, error_type=error_type) # Detach context and end span otel_context.detach(invocation.context_token) - invocation.span.end() + span.end() return invocation @contextmanager @@ -166,6 +200,7 @@ def llm( def get_telemetry_handler( tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, ) -> TelemetryHandler: """ Returns a singleton TelemetryHandler instance. @@ -174,6 +209,9 @@ def get_telemetry_handler( get_telemetry_handler, "_default_handler", None ) if handler is None: - handler = TelemetryHandler(tracer_provider=tracer_provider) + handler = TelemetryHandler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + ) setattr(get_telemetry_handler, "_default_handler", handler) return handler diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py new file mode 100644 index 0000000000..efc7acf345 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -0,0 +1,52 @@ +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + +_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + +_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] + + +class Instruments: + def __init__(self, meter: Meter): + self.operation_duration_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, + description="Duration of GenAI client operation", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + self.token_usage_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, + description="Number of input and output tokens used by GenAI clients", + unit="{token}", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py new file mode 100644 index 0000000000..50aaf26353 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -0,0 +1,130 @@ +"""Helpers for emitting GenAI metrics from LLM invocations.""" + +from __future__ import annotations + +import time +import timeit +from numbers import Number +from typing import Dict, Optional + +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import Span, set_span_in_context +from opentelemetry.util.genai.instruments import Instruments +from opentelemetry.util.genai.types import LLMInvocation +from opentelemetry.util.types import AttributeValue + +_NS_PER_SECOND = 1_000_000_000 + + +def _get_span_start_time_ns(span: Optional[Span]) -> Optional[int]: + if span is None: + return None + for attr in ("start_time", "_start_time"): + value = getattr(span, attr, None) + if isinstance(value, int): + return value + return None + + +def _calculate_duration_seconds( + span: Optional[Span], invocation: LLMInvocation +) -> Optional[float]: + """Calculate duration in seconds from a start time to now. + + If `invocation.monotonic_start_ns` is present, use a monotonic + clock (`perf_counter_ns`) for elapsed time. Otherwise fall back to the + span's wall-clock start time (epoch ns) and `time_ns()` for now. + + Returns None if no usable start time is available. + """ + # Prefer an explicit monotonic start on the invocation (seconds) + if invocation.monotonic_start_s is not None: + return max(timeit.default_timer() - invocation.monotonic_start_s, 0.0) + + # Fall back to span start_time (wall clock epoch ns) + start_time_ns = _get_span_start_time_ns(span) + if start_time_ns is None: + return None + elapsed_ns = max(time.time_ns() - start_time_ns, 0) + return elapsed_ns / _NS_PER_SECOND + + +class InvocationMetricsRecorder: + """Records duration and token usage histograms for GenAI invocations.""" + + def __init__(self, meter: Meter): + instruments = Instruments(meter) + self._duration_histogram: Histogram = ( + instruments.operation_duration_histogram + ) + self._token_histogram: Histogram = instruments.token_usage_histogram + + def record( + self, + span: Optional[Span], + invocation: LLMInvocation, + *, + error_type: Optional[str] = None, + ) -> None: + """Record duration and token metrics for an invocation if possible.""" + if span is None: + return + + token_counts: list[tuple[int, str]] = [] + if invocation.input_tokens is not None: + token_counts.append( + ( + invocation.input_tokens, + GenAI.GenAiTokenTypeValues.INPUT.value, + ) + ) + if invocation.output_tokens is not None: + token_counts.append( + ( + invocation.output_tokens, + GenAI.GenAiTokenTypeValues.OUTPUT.value, + ) + ) + + attributes: Dict[str, AttributeValue] = { + GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value + } + if invocation.request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model + if invocation.provider: + attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider + if invocation.response_model_name: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = ( + invocation.response_model_name + ) + + # Calculate duration from span timing or invocation monotonic start + duration_seconds = _calculate_duration_seconds(span, invocation) + + span_context = set_span_in_context(span) + if error_type: + attributes["error.type"] = error_type + + if ( + duration_seconds is not None + and isinstance(duration_seconds, Number) + and duration_seconds >= 0 + ): + self._duration_histogram.record( + duration_seconds, + attributes=attributes, + context=span_context, + ) + + for token_count, token_type in token_counts: + self._token_histogram.record( + token_count, + attributes=attributes | {GenAI.GEN_AI_TOKEN_TYPE: token_type}, + context=span_context, + ) + + +__all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 558807022d..02fe1c371e 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -123,6 +123,10 @@ class LLMInvocation: max_tokens: int | None = None stop_sequences: list[str] | None = None seed: int | None = None + # Monotonic start time in seconds (from timeit.default_timer) used + # for duration calculations to avoid mixing clock sources. This is + # populated by the TelemetryHandler when starting an invocation. + monotonic_start_s: float | None = None @dataclass diff --git a/util/opentelemetry-util-genai/tests/test_handler_metrics.py b/util/opentelemetry-util-genai/tests/test_handler_metrics.py new file mode 100644 index 0000000000..3e136ffb44 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_metrics.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +from typing import Any, Dict, List +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import Error, LLMInvocation + + +class TelemetryHandlerMetricsTest(TestCase): + def setUp(self) -> None: + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + def test_stop_llm_records_duration_and_tokens(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = LLMInvocation(request_model="model", provider="prov") + invocation.input_tokens = 5 + invocation.output_tokens = 7 + # Patch default_timer during start to ensure monotonic_start_s + with patch("timeit.default_timer", return_value=1000.0): + handler.start_llm(invocation) + + # Simulate 2 seconds of elapsed monotonic time (seconds) + with patch( + "timeit.default_timer", + return_value=1002.0, + ): + handler.stop_llm(invocation) + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_OPERATION_NAME], + GenAI.GenAiOperationNameValues.CHAT.value, + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_REQUEST_MODEL], "model" + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_PROVIDER_NAME], "prov" + ) + self.assertAlmostEqual(duration_point.sum, 2.0, places=3) + + self.assertIn("gen_ai.client.token.usage", metrics) + token_points = metrics["gen_ai.client.token.usage"] + token_by_type = { + point.attributes[GenAI.GEN_AI_TOKEN_TYPE]: point + for point in token_points + } + self.assertEqual(len(token_by_type), 2) + self.assertAlmostEqual( + token_by_type[GenAI.GenAiTokenTypeValues.INPUT.value].sum, + 5.0, + places=3, + ) + self.assertAlmostEqual( + token_by_type[GenAI.GenAiTokenTypeValues.COMPLETION.value].sum, + 7.0, + places=3, + ) + + def test_fail_llm_records_error_and_available_tokens(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = LLMInvocation(request_model="err-model", provider=None) + invocation.input_tokens = 11 + # Patch default_timer during start to ensure monotonic_start_s + with patch("timeit.default_timer", return_value=2000.0): + handler.start_llm(invocation) + + error = Error(message="boom", type=ValueError) + with patch( + "timeit.default_timer", + return_value=2001.0, + ): + handler.fail_llm(invocation, error) + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + self.assertEqual( + duration_point.attributes.get("error.type"), "ValueError" + ) + self.assertEqual( + duration_point.attributes.get(GenAI.GEN_AI_REQUEST_MODEL), + "err-model", + ) + self.assertAlmostEqual(duration_point.sum, 1.0, places=3) + + self.assertIn("gen_ai.client.token.usage", metrics) + token_points = metrics["gen_ai.client.token.usage"] + self.assertEqual(len(token_points), 1) + token_point = token_points[0] + self.assertEqual( + token_point.attributes[GenAI.GEN_AI_TOKEN_TYPE], + GenAI.GenAiTokenTypeValues.INPUT.value, + ) + self.assertAlmostEqual(token_point.sum, 11.0, places=3) + + def _harvest_metrics(self) -> Dict[str, List[Any]]: + try: + self.meter_provider.force_flush() + except Exception: # pylint: disable=broad-except + pass + self.metric_reader.collect() + metrics_by_name: Dict[str, List[Any]] = {} + data = self.metric_reader.get_metrics_data() + for resource_metric in (data and data.resource_metrics) or []: + for scope_metric in resource_metric.scope_metrics or []: + for metric in scope_metric.metrics or []: + points = metric.data.data_points or [] + metrics_by_name.setdefault(metric.name, []).extend(points) + return metrics_by_name