Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl SpanExporter for OtlpHttpClient {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {e}"))
})?;
Expand Down
59 changes: 31 additions & 28 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use core::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use std::sync::{Arc, Mutex};

use opentelemetry::{otel_debug, otel_warn};
use opentelemetry_proto::tonic::collector::trace::v1::{
Expand All @@ -21,7 +20,7 @@ use crate::retry::RetryPolicy;
use opentelemetry_sdk::runtime::Tokio;

pub(crate) struct TonicTracesClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
retry_policy: RetryPolicy,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
Expand All @@ -30,7 +29,7 @@ pub(crate) struct TonicTracesClient {

struct ClientInner {
client: TraceServiceClient<Channel>,
interceptor: Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicTracesClient {
Expand All @@ -56,10 +55,10 @@ impl TonicTracesClient {
otel_debug!(name: "TonicsTracesClientBuilt");

TonicTracesClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {
client,
interceptor: Mutex::new(interceptor),
}),
interceptor,
})),
retry_policy: retry_policy.unwrap_or(RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
Expand Down Expand Up @@ -87,26 +86,26 @@ impl SpanExporter for TonicTracesClient {
let batch_clone = Arc::clone(&batch);

// Execute the export operation
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| {
// Convert interceptor errors to tonic::Status for retry classification
tonic::Status::internal(format!("interceptor error: {e:?}"))
})?
.into_parts();
(inner.client.clone(), m, e)
}
None => {
return Err(tonic::Status::failed_precondition(
let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| tonic::Status::internal(format!("failed to acquire lock: {e}")))
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
// Convert interceptor errors to tonic::Status for retry classification
tonic::Status::internal(format!("interceptor error: {e:?}"))
})?
.into_parts();
Ok((inner.client.clone(), m, e))
}
None => Err(tonic::Status::failed_precondition(
"exporter already shutdown",
))
}
};
)),
})?;

let resource_spans =
group_spans_by_resource_and_scope((*batch_clone).clone(), &self.resource);
Expand Down Expand Up @@ -147,8 +146,12 @@ impl SpanExporter for TonicTracesClient {
}
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
fn shutdown(&self) -> OTelSdkResult {
let mut inner_guard = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))?;
match inner_guard.take() {
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## vNext

- Fix `SpanExporter::shutdown()` default timeout from 5 nanoseconds to 5 seconds.
- **Breaking** `SpanExporter` trait methods `shutdown`, `shutdown_with_timeout`, and `force_flush` now take `&self` instead of `&mut self` for consistency with `LogExporter` and `PushMetricExporter`. Implementers using interior mutability (e.g., `Mutex`, `AtomicBool`) require no changes.
- Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning.
- **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]:
- `id_generator`, `should_sample`
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl SpanExporter for TokioSpanExporter {
})
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.tx_shutdown.send(()).map_err(|_| {
OTelSdkError::InternalFailure("Failed to send shutdown signal".to_string())
})
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/trace/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub trait SpanExporter: Send + Sync + Debug {
/// flush the data and the destination is unavailable). SDK authors
/// can decide if they want to make the shutdown timeout
/// configurable.
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
/// Shuts down the exporter with default timeout.
fn shutdown(&mut self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_nanos(5))
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}

/// This is a hint to ensure that the export of any Spans the exporter
Expand All @@ -66,7 +66,7 @@ pub trait SpanExporter: Send + Sync + Debug {
/// implemented as a blocking API or an asynchronous API which notifies the caller via
/// a callback or an event. OpenTelemetry client authors can decide if they want to
/// make the flush timeout configurable.
fn force_flush(&mut self) -> OTelSdkResult {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl SpanExporter for InMemorySpanExporter {
result
}

fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
if let Ok(mut exporter) = self.exporter.lock() {
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown_with_timeout(timeout)
} else {
Err(OTelSdkError::InternalFailure(
Expand Down Expand Up @@ -1171,7 +1171,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, resource: &Resource) {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/trace/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl opentelemetry_sdk::trace::SpanExporter for SpanExporter {
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown.store(true, Ordering::SeqCst);
Ok(())
}
Expand Down