Skip to content

Commit cfe3022

Browse files
authored
ref(spans): Always produce new kafka format (#5199)
Before we merge getsentry/sentry#100181, we should make sure that there is no way back for the kafka producer (i.e. relay) to produce old span messages. - [ ] Handle kafka schema validation errors in integration tests. ref: INGEST-535
1 parent 63138f6 commit cfe3022

File tree

11 files changed

+153
-992
lines changed

11 files changed

+153
-992
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222

2323
- No longer writes Spans as trace items. ([#5152](https://github.com/getsentry/relay/pull/5152))
2424
- Produce spans to `ingest-spans` by default. ([#5163](https://github.com/getsentry/relay/pull/5163))
25-
- Add ability to produce Span V2 Kafka messages. ([#5151](https://github.com/getsentry/relay/pull/5151))
2625
- Add `retentions` to the project configuration and use them for logs. ([#5135](https://github.com/getsentry/relay/pull/5135))
26+
- Produce Span V2 Kafka messages. ([#5151](https://github.com/getsentry/relay/pull/5151), [#5173](https://github.com/getsentry/relay/pull/5173), [#5199](https://github.com/getsentry/relay/pull/5199))
2727
- Modernize session processing and move to Relay's new processing framework. ([#5201](https://github.com/getsentry/relay/pull/5201))
28-
- Add ability to produce Span V2 Kafka messages. ([#5151](https://github.com/getsentry/relay/pull/5151), [#5173](https://github.com/getsentry/relay/pull/5173))
2928

3029
## 25.9.0
3130

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ sentry = { version = "0.41.0", default-features = false, features = [
177177
"transport",
178178
] }
179179
sentry-core = "0.41.0"
180-
sentry-kafka-schemas = { version = "2.1.1", default-features = false }
180+
sentry-kafka-schemas = { version = "2.1.6", default-features = false }
181181
sentry-release-parser = { version = "1.3.2", default-features = false }
182182
sentry-types = "0.41.0"
183183
sentry_protos = "0.3.3"

relay-dynamic-config/src/global.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,6 @@ pub struct Options {
186186
)]
187187
pub replay_relay_snuba_publish_disabled_sample_rate: f32,
188188

189-
/// Fraction of spans that are produced as backward-compatible Span V2 kafka messages.
190-
#[serde(
191-
rename = "relay.kafka.span-v2.sample-rate",
192-
deserialize_with = "default_on_error",
193-
skip_serializing_if = "is_default"
194-
)]
195-
pub span_kafka_v2_sample_rate: f32,
196-
197189
/// All other unknown options.
198190
#[serde(flatten)]
199191
other: HashMap<String, Value>,

relay-server/src/managed/counted.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use relay_event_schema::protocol::{
2-
OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2,
2+
CompatSpan, OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2,
33
};
44
use relay_protocol::Annotated;
55
use relay_quotas::DataCategory;
@@ -105,6 +105,12 @@ impl Counted for Annotated<Span> {
105105
}
106106
}
107107

108+
impl Counted for Annotated<CompatSpan> {
109+
fn quantities(&self) -> Quantities {
110+
smallvec::smallvec![(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)]
111+
}
112+
}
113+
108114
impl Counted for ExtractedMetrics {
109115
fn quantities(&self) -> Quantities {
110116
// We only consider project metrics, sampling project metrics should never carry outcomes,

relay-server/src/processing/spans/mod.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::sync::Arc;
22

33
use relay_event_normalization::GeoIpLookup;
44
use relay_event_schema::processor::ProcessingAction;
5+
#[cfg(feature = "processing")]
6+
use relay_event_schema::protocol::CompatSpan;
57
use relay_event_schema::protocol::SpanV2;
68
use relay_quotas::{DataCategory, RateLimits};
79

@@ -197,14 +199,24 @@ impl Forward for SpanOutput {
197199
let envelope = spans.map(|spans, records| {
198200
let mut items = Items::with_capacity(spans.spans.len());
199201
for span in spans.spans {
200-
let mut span = span.value.map_value(relay_spans::span_v2_to_span_v1);
202+
use relay_protocol::Annotated;
203+
204+
let mut span = match span.value.map_value(CompatSpan::try_from) {
205+
Annotated(Some(Result::Err(error)), _) => {
206+
// TODO: Use records.internal_error(error, span)
207+
relay_log::error!(
208+
error = &error as &dyn std::error::Error,
209+
"Failed to create CompatSpan"
210+
);
211+
continue;
212+
}
213+
Annotated(Some(Result::Ok(compat_span)), meta) => {
214+
Annotated(Some(compat_span), meta)
215+
}
216+
Annotated(None, meta) => Annotated(None, meta),
217+
};
201218
if let Some(span) = span.value_mut() {
202-
inject_server_sample_rate(span, spans.server_sample_rate);
203-
204-
// TODO: this needs to be done in a normalization step, which is yet to be
205-
// implemented.
206-
span.received =
207-
relay_event_schema::protocol::Timestamp(chrono::Utc::now()).into();
219+
inject_server_sample_rate(&mut span.span_v2, spans.server_sample_rate);
208220
}
209221

210222
let mut item = Item::new(ItemType::Span);
@@ -215,7 +227,10 @@ impl Forward for SpanOutput {
215227
continue;
216228
}
217229
};
218-
item.set_payload(ContentType::Json, payload);
230+
item.set_payload(ContentType::CompatSpan, payload);
231+
if let Some(trace_id) = span.value().and_then(|s| s.span_v2.trace_id.value()) {
232+
item.set_routing_hint(*trace_id.as_ref());
233+
}
219234
items.push(item);
220235
}
221236

@@ -238,24 +253,14 @@ impl Forward for SpanOutput {
238253
/// Ideally we forward a proper data structure to the store instead, then we don't
239254
/// have to inject the sample rate into a measurement.
240255
#[cfg(feature = "processing")]
241-
fn inject_server_sample_rate(
242-
span: &mut relay_event_schema::protocol::Span,
243-
server_sample_rate: Option<f64>,
244-
) {
256+
fn inject_server_sample_rate(span: &mut SpanV2, server_sample_rate: Option<f64>) {
245257
let Some(server_sample_rate) = server_sample_rate.and_then(relay_protocol::FiniteF64::new)
246258
else {
247259
return;
248260
};
249261

250-
let measurements = span.measurements.get_or_insert_with(Default::default);
251-
measurements.0.insert(
252-
"server_sample_rate".to_owned(),
253-
relay_event_schema::protocol::Measurement {
254-
value: server_sample_rate.into(),
255-
unit: None.into(),
256-
}
257-
.into(),
258-
);
262+
let attributes = span.attributes.get_or_insert_with(Default::default);
263+
attributes.insert("sentry.server_sample_rate", server_sample_rate.to_f64());
259264
}
260265

261266
/// Spans in their serialized state, as transported in an envelope.

relay-server/src/services/processor/span/processing.rs

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ pub async fn process(
9191
geo_lookup,
9292
);
9393

94-
let org_id = managed_envelope.scoping().organization_id.value();
9594
let client_ip = managed_envelope.envelope().meta().client_addr();
9695
let filter_settings = &project_info.config.filter_settings;
9796
let sampling_decision = sampling_result.decision();
@@ -230,8 +229,7 @@ pub async fn process(
230229
}
231230
};
232231

233-
let Ok(mut new_item) = create_span_item(annotated_span, &config, global_config, org_id)
234-
else {
232+
let Ok(mut new_item) = create_span_item(annotated_span, &config) else {
235233
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
236234
};
237235

@@ -254,14 +252,9 @@ pub async fn process(
254252
}
255253
}
256254

257-
fn create_span_item(
258-
span: Annotated<Span>,
259-
config: &Config,
260-
global_config: &GlobalConfig,
261-
org_id: u64,
262-
) -> Result<Item, ()> {
255+
fn create_span_item(span: Annotated<Span>, config: &Config) -> Result<Item, ()> {
263256
let mut new_item = Item::new(ItemType::Span);
264-
if produce_compat_spans(config, global_config, org_id) {
257+
if cfg!(feature = "processing") && config.processing_enabled() {
265258
let span_v2 = span.map_value(relay_spans::span_v1_to_span_v2);
266259
let compat_span = match span_v2.map_value(CompatSpan::try_from) {
267260
Annotated(Some(Result::Err(err)), _) => {
@@ -297,15 +290,6 @@ fn create_span_item(
297290
Ok(new_item)
298291
}
299292

300-
/// Whether or not to convert spans into backward-compatible V2 spans.
301-
///
302-
/// This only makes sense when we forward the envelope to Kafka.
303-
fn produce_compat_spans(config: &Config, global_config: &GlobalConfig, org_id: u64) -> bool {
304-
cfg!(feature = "processing")
305-
&& config.processing_enabled()
306-
&& utils::is_rolled_out(org_id, global_config.options.span_kafka_v2_sample_rate).is_keep()
307-
}
308-
309293
fn add_sample_rate(measurements: &mut Annotated<Measurements>, name: &str, value: Option<f64>) {
310294
let value = match value {
311295
Some(value) if value > 0.0 => value,
@@ -352,8 +336,6 @@ pub fn extract_from_event(
352336
.dsc()
353337
.and_then(|ctx| ctx.sample_rate);
354338

355-
let org_id = managed_envelope.scoping().organization_id.value();
356-
357339
let mut add_span = |mut span: Span| {
358340
add_sample_rate(
359341
&mut span.measurements,
@@ -387,7 +369,7 @@ pub fn extract_from_event(
387369
}
388370
};
389371

390-
let Ok(mut item) = create_span_item(span, &config, global_config, org_id) else {
372+
let Ok(mut item) = create_span_item(span, &config) else {
391373
managed_envelope.track_outcome(
392374
Outcome::Invalid(DiscardReason::InvalidSpan),
393375
relay_quotas::DataCategory::SpanIndexed,

0 commit comments

Comments
 (0)