Skip to content

Commit b660982

Browse files
ref(spans): Use Span V2 kafka schema in span consumers (#100181)
--------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
1 parent 9fccd58 commit b660982

File tree

21 files changed

+616
-510
lines changed

21 files changed

+616
-510
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ dependencies = [
7777
# [end] jsonschema format validators
7878
"sentry-arroyo>=2.25.5",
7979
"sentry-forked-email-reply-parser>=0.5.12.post1",
80-
"sentry-kafka-schemas>=2.1.3",
80+
"sentry-kafka-schemas>=2.1.6",
8181
"sentry-ophio>=1.1.3",
8282
"sentry-protos>=0.4.0",
8383
"sentry-redis-tools>=0.5.0",

src/sentry/insights/__init__.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ def from_span_v1(cls, span: dict[str, Any]) -> "FilterSpan":
2626
)
2727

2828
@classmethod
29-
def from_span_data(cls, data: dict[str, Any]) -> "FilterSpan":
30-
"""Get relevant fields from `span.data`.
31-
32-
This will later be replaced by `from_span_attributes` or `from_span_v2`."""
29+
def from_span_attributes(cls, attributes: dict[str, Any]) -> "FilterSpan":
30+
"""Get relevant fields from `span.attributes`."""
3331
return cls(
34-
op=data.get("sentry.op"),
35-
category=data.get("sentry.category"),
36-
description=data.get("sentry.description"),
37-
transaction_op=data.get("sentry.transaction_op"),
32+
op=(attributes.get("sentry.op") or {}).get("value"),
33+
category=(attributes.get("sentry.category") or {}).get("value"),
34+
description=(attributes.get("sentry.description") or {}).get("value"),
35+
transaction_op=(attributes.get("sentry.transaction_op") or {}).get("value"),
3836
)
3937

4038

src/sentry/spans/buffer.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777

7878
from sentry import options
7979
from sentry.processing.backpressure.memory import ServiceMemory, iter_cluster_memory_usage
80+
from sentry.spans.consumers.process_segments.types import attribute_value
8081
from sentry.utils import metrics, redis
8182

8283
# SegmentKey is an internal identifier used by the redis buffer that is also
@@ -129,7 +130,7 @@ class Span(NamedTuple):
129130
segment_id: str | None
130131
project_id: int
131132
payload: bytes
132-
end_timestamp_precise: float
133+
end_timestamp: float
133134
is_segment_span: bool = False
134135

135136
def effective_parent_id(self):
@@ -339,7 +340,7 @@ def _group_by_parent(self, spans: Sequence[Span]) -> dict[tuple[str, str], list[
339340

340341
def _prepare_payloads(self, spans: list[Span]) -> dict[str | bytes, float]:
341342
if self._zstd_compressor is None:
342-
return {span.payload: span.end_timestamp_precise for span in spans}
343+
return {span.payload: span.end_timestamp for span in spans}
343344

344345
combined = b"\x00".join(span.payload for span in spans)
345346
original_size = len(combined)
@@ -354,7 +355,7 @@ def _prepare_payloads(self, spans: list[Span]) -> dict[str | bytes, float]:
354355
metrics.timing("spans.buffer.compression.compressed_size", compressed_size)
355356
metrics.timing("spans.buffer.compression.compression_ratio", compression_ratio)
356357

357-
min_timestamp = min(span.end_timestamp_precise for span in spans)
358+
min_timestamp = min(span.end_timestamp for span in spans)
358359
return {compressed: min_timestamp}
359360

360361
def _decompress_batch(self, compressed_data: bytes) -> list[bytes]:
@@ -428,17 +429,23 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
428429
has_root_span = False
429430
metrics.timing("spans.buffer.flush_segments.num_spans_per_segment", len(segment))
430431
for payload in segment:
431-
val = orjson.loads(payload)
432-
433-
if not val.get("segment_id"):
434-
val["segment_id"] = segment_span_id
435-
436-
is_segment = segment_span_id == val["span_id"]
437-
val["is_segment"] = is_segment
432+
span = orjson.loads(payload)
433+
434+
if not attribute_value(span, "sentry.segment.id"):
435+
span.setdefault("attributes", {})["sentry.segment.id"] = {
436+
"type": "string",
437+
"value": segment_span_id,
438+
}
439+
440+
is_segment = segment_span_id == span["span_id"]
441+
span.setdefault("attributes", {})["sentry.is_segment"] = {
442+
"type": "boolean",
443+
"value": is_segment,
444+
}
438445
if is_segment:
439446
has_root_span = True
440447

441-
output_spans.append(OutputSpan(payload=val))
448+
output_spans.append(OutputSpan(payload=span))
442449

443450
metrics.incr(
444451
"spans.buffer.flush_segments.num_segments_per_shard", tags={"shard_i": shard}

src/sentry/spans/consumers/process/factory.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from sentry import killswitches
1919
from sentry.spans.buffer import Span, SpansBuffer
2020
from sentry.spans.consumers.process.flusher import SpanFlusher
21+
from sentry.spans.consumers.process_segments.types import attribute_value
2122
from sentry.utils import metrics
2223
from sentry.utils.arroyo import MultiprocessingPool, SetJoinTimeout, run_task_with_multiprocessing
2324

@@ -182,10 +183,10 @@ def process_batch(
182183
trace_id=val["trace_id"],
183184
span_id=val["span_id"],
184185
parent_span_id=val.get("parent_span_id"),
185-
segment_id=val.get("segment_id"),
186+
segment_id=cast(str | None, attribute_value(val, "sentry.segment.id")),
186187
project_id=val["project_id"],
187188
payload=payload.value,
188-
end_timestamp_precise=val["end_timestamp_precise"],
189+
end_timestamp=val["end_timestamp"],
189190
is_segment_span=bool(val.get("parent_span_id") is None or val.get("is_remote")),
190191
)
191192
spans.append(span)

src/sentry/spans/consumers/process_segments/convert.py

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from collections.abc import MutableMapping
21
from typing import Any, cast
32

43
import orjson
@@ -13,58 +12,69 @@
1312
I64_MAX = 2**63 - 1
1413

1514
FIELD_TO_ATTRIBUTE = {
16-
"description": "sentry.raw_description",
17-
"duration_ms": "sentry.duration_ms",
18-
"is_segment": "sentry.is_segment",
19-
"exclusive_time_ms": "sentry.exclusive_time_ms",
20-
"start_timestamp_precise": "sentry.start_timestamp_precise",
21-
"end_timestamp_precise": "sentry.end_timestamp_precise",
15+
"end_timestamp": "sentry.end_timestamp_precise",
16+
"event_id": "sentry.event_id",
17+
"hash": "sentry.hash",
2218
"is_remote": "sentry.is_remote",
19+
"kind": "sentry.kind",
20+
"name": "sentry.name",
2321
"parent_span_id": "sentry.parent_span_id",
24-
"profile_id": "sentry.profile_id",
25-
"segment_id": "sentry.segment_id",
2622
"received": "sentry.received",
27-
"origin": "sentry.origin",
28-
"kind": "sentry.kind",
29-
"hash": "sentry.hash",
30-
"event_id": "sentry.event_id",
23+
"start_timestamp": "sentry.start_timestamp_precise",
24+
}
25+
26+
RENAME_ATTRIBUTES = {
27+
"sentry.description": "sentry.raw_description",
28+
"sentry.segment.id": "sentry.segment_id",
3129
}
3230

3331

3432
def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
35-
attributes: MutableMapping[str, AnyValue] = {} # TODO
33+
attributes: dict[str, AnyValue] = {}
3634

3735
client_sample_rate = 1.0
3836
server_sample_rate = 1.0
3937

40-
# This key is ambiguous. sentry-conventions and relay interpret it as "raw description",
41-
# sentry interprets it as normalized_description.
42-
# See https://github.com/getsentry/sentry/blob/7f2ccd1d03e8845a833fe1ee6784bce0c7f0b935/src/sentry/search/eap/spans/attributes.py#L596.
43-
# Delete it and relay on top-level `description` for now.
44-
(span.get("data") or {}).pop("sentry.description", None)
45-
46-
for k, v in (span.get("data") or {}).items():
47-
if v is not None:
48-
try:
49-
attributes[k] = _anyvalue(v)
50-
except Exception:
51-
sentry_sdk.capture_exception()
52-
else:
53-
if k == "sentry.client_sample_rate":
54-
try:
55-
client_sample_rate = float(v)
56-
except ValueError:
57-
pass
58-
elif k == "sentry.server_sample_rate":
59-
try:
60-
server_sample_rate = float(v)
61-
except ValueError:
62-
pass
38+
for k, attribute in (span.get("attributes") or {}).items():
39+
if attribute is None:
40+
continue
41+
if (value := attribute.get("value")) is None:
42+
continue
43+
try:
44+
# NOTE: This ignores the `type` field of the attribute itself
45+
attributes[k] = _anyvalue(value)
46+
except Exception:
47+
sentry_sdk.capture_exception()
48+
else:
49+
if k == "sentry.client_sample_rate":
50+
try:
51+
client_sample_rate = float(value) # type:ignore[arg-type]
52+
except ValueError:
53+
pass
54+
elif k == "sentry.server_sample_rate":
55+
try:
56+
server_sample_rate = float(value) # type:ignore[arg-type]
57+
except ValueError:
58+
pass
6359

6460
for field_name, attribute_name in FIELD_TO_ATTRIBUTE.items():
65-
v = span.get(field_name)
66-
if v is not None:
67-
attributes[attribute_name] = _anyvalue(v)
61+
attribute = span.get(field_name) # type:ignore[assignment]
62+
if attribute is not None:
63+
attributes[attribute_name] = _anyvalue(attribute)
64+
65+
# Rename some attributes from their sentry-conventions name to what the product currently expects.
66+
# Eventually this should all be handled by deprecation policies in sentry-conventions.
67+
for convention_name, eap_name in RENAME_ATTRIBUTES.items():
68+
if convention_name in attributes:
69+
attributes[eap_name] = attributes.pop(convention_name)
70+
71+
try:
72+
# TODO: Move this to Relay
73+
attributes["sentry.duration_ms"] = AnyValue(
74+
int_value=int(1000 * (span["end_timestamp"] - span["start_timestamp"]))
75+
)
76+
except Exception:
77+
sentry_sdk.capture_exception()
6878

6979
if links := span.get("links"):
7080
try:
@@ -80,7 +90,7 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
8090
trace_id=span["trace_id"],
8191
item_id=int(span["span_id"], 16).to_bytes(16, "little"),
8292
item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
83-
timestamp=_timestamp(span["start_timestamp_precise"]),
93+
timestamp=_timestamp(span["start_timestamp"]),
8494
attributes=attributes,
8595
client_sample_rate=client_sample_rate,
8696
server_sample_rate=server_sample_rate,
@@ -132,7 +142,10 @@ def _sanitize_span_link(link: SpanLink) -> SpanLink:
132142
# might be an intermediary state where there is a pre-existing dropped
133143
# attributes count. Respect that count, if it's present. It should always be
134144
# an integer.
135-
dropped_attributes_count = attributes.get("sentry.dropped_attributes_count", 0)
145+
try:
146+
dropped_attributes_count = int(attributes["sentry.dropped_attributes_count"]["value"]) # type: ignore[arg-type,index]
147+
except (KeyError, ValueError, TypeError):
148+
dropped_attributes_count = 0
136149

137150
for key, value in attributes.items():
138151
if key in ALLOWED_LINK_ATTRIBUTE_KEYS:
@@ -141,7 +154,10 @@ def _sanitize_span_link(link: SpanLink) -> SpanLink:
141154
dropped_attributes_count += 1
142155

143156
if dropped_attributes_count > 0:
144-
allowed_attributes["sentry.dropped_attributes_count"] = dropped_attributes_count
157+
allowed_attributes["sentry.dropped_attributes_count"] = {
158+
"type": "integer",
159+
"value": dropped_attributes_count,
160+
}
145161

146162
# Only include the `attributes` key if the key was present in the original
147163
# link, don't create a an empty object, since there is a semantic difference

0 commit comments

Comments
 (0)