Skip to content

Commit 9caa2fa

Browse files
committed
pkg/parcacol: Add QueryRange filtering for delta queries
1 parent 8b3f48a commit 9caa2fa

File tree

6 files changed

+166
-73
lines changed

6 files changed

+166
-73
lines changed

pkg/parcacol/querier.go

Lines changed: 124 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func (q *Querier) QueryRange(
318318
filterExpr := logicalplan.And(exprs...)
319319

320320
if queryParts.Delta {
321-
return q.queryRangeDelta(ctx, filterExpr, step, queryParts.Meta.SampleType.Unit)
321+
return q.queryRangeDelta(ctx, filterExpr, step, queryParts.Meta.SampleType.Unit, filterQuery)
322322
}
323323

324324
return q.queryRangeNonDelta(ctx, filterExpr, step, filterQuery)
@@ -331,7 +331,16 @@ const (
331331
ColumnValueSum = "sum(" + ColumnValue + ")"
332332
)
333333

334-
func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, sampleTypeUnit string) ([]*pb.MetricsSeries, error) {
334+
func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, sampleTypeUnit, filterQuery string) ([]*pb.MetricsSeries, error) {
335+
groupBy := []logicalplan.Expr{
336+
logicalplan.DynCol(ColumnLabels),
337+
logicalplan.Duration(step),
338+
}
339+
if filterQuery != "" {
340+
// If we have a filter query we need to group by the stacktrace column as well to be able to filter them out.
341+
groupBy = append(groupBy, logicalplan.Col(ColumnStacktrace))
342+
}
343+
335344
records := []arrow.Record{}
336345
rows := 0
337346
err := q.engine.ScanTable(q.tableName).
@@ -343,10 +352,7 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
343352
logicalplan.Sum(logicalplan.Col(ColumnValue)),
344353
logicalplan.Count(logicalplan.Col(ColumnValue)),
345354
},
346-
[]logicalplan.Expr{
347-
logicalplan.DynCol(ColumnLabels),
348-
logicalplan.Duration(step),
349-
},
355+
groupBy,
350356
).
351357
Execute(ctx, func(ctx context.Context, r arrow.Record) error {
352358
r.Retain()
@@ -371,18 +377,32 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
371377
Timestamp int
372378
ValueCount int
373379
ValueSum int
380+
Stacktrace int
374381
}{
375382
DurationSum: -1,
376383
PeriodSum: -1,
377384
Timestamp: -1,
378385
ValueCount: -1,
379386
ValueSum: -1,
387+
Stacktrace: -1,
380388
}
381389

390+
matchingStacktraces := map[string]bool{}
382391
labelColumnIndices := []int{}
392+
labelsetToIndex := map[string]int{}
393+
383394
labelSet := labels.Labels{}
384395
resSeries := []*pb.MetricsSeries{}
385-
labelsetToIndex := map[string]int{}
396+
397+
// These structs are only used when filtering for specific stacktraces.
398+
// They are used as intermediate helpers before being converted to the final MetricsSeries.
399+
type metricsSeriesStacktrace struct {
400+
durationSum int64
401+
periodSum int64
402+
valueSum int64
403+
valueCount int64
404+
}
405+
resSeriesStacktraces := map[int]map[int64]metricsSeriesStacktrace{}
386406

387407
for _, ar := range records {
388408
fields := ar.Schema().Fields()
@@ -403,6 +423,9 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
403423
case ColumnValueSum:
404424
columnIndices.ValueSum = i
405425
continue
426+
case ColumnStacktrace:
427+
columnIndices.Stacktrace = i
428+
continue
406429
}
407430

408431
if strings.HasPrefix(field.Name, "labels.") {
@@ -425,6 +448,9 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
425448
if columnIndices.ValueSum == -1 {
426449
return nil, errors.New("sum(value) column not found")
427450
}
451+
if columnIndices.Stacktrace == -1 && filterQuery != "" {
452+
return nil, errors.New("stacktrace column not found")
453+
}
428454

429455
for i := 0; i < int(ar.NumRows()); i++ {
430456
labelSet = labelSet[:0]
@@ -454,6 +480,7 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
454480
resSeries = append(resSeries, &pb.MetricsSeries{Labelset: &profilestorepb.LabelSet{Labels: pbLabelSet}})
455481
index = len(resSeries) - 1
456482
labelsetToIndex[s] = index
483+
resSeriesStacktraces[index] = map[int64]metricsSeriesStacktrace{}
457484
}
458485

459486
ts := ar.Column(columnIndices.Timestamp).(*array.Int64).Value(i)
@@ -462,30 +489,66 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
462489
valueSum := ar.Column(columnIndices.ValueSum).(*array.Int64).Value(i)
463490
valueCount := ar.Column(columnIndices.ValueCount).(*array.Int64).Value(i)
464491

465-
// TODO: We should do these period and duration calculations in frostDB,
466-
// so that we can push these down as projections.
492+
if filterQuery == "" {
493+
resSeries[index].Samples = append(resSeries[index].Samples,
494+
deltaMetricsSample(ts, durationSum, periodSum, valueSum, valueCount, sampleTypeUnit),
495+
)
496+
} else {
497+
// If we have a filter query we iterate through each stacktrace for each timestamp.
498+
// Only if a stacktrace matches the filter query we add its values to the intermediate metricsSeriesStacktrace.
499+
// After all timestamps have been seen we compute the final values and valuesPerSecond below.
500+
stacktrace := ar.Column(columnIndices.Stacktrace).(*array.Binary).Value(i)
501+
502+
matches, found := matchingStacktraces[string(stacktrace)]
503+
if !found {
504+
matches, err = q.matchingStacktrace(ctx, string(stacktrace), filterQuery)
505+
if err != nil {
506+
return nil, err
507+
}
508+
matchingStacktraces[string(stacktrace)] = matches
509+
}
467510

468-
// Because we store the period with each sample yet query for the sum(period) we need to normalize by the amount of values (rows in a database).
469-
period := periodSum / valueCount
470-
// Because we store the duration with each sample yet query for the sum(duration) we need to normalize by the amount of values (rows in a database).
471-
duration := durationSum / valueCount
511+
if sample, found := resSeriesStacktraces[index][ts]; found {
512+
if matches {
513+
sample.durationSum += durationSum
514+
sample.periodSum += periodSum
515+
sample.valueSum += valueSum
516+
sample.valueCount += valueCount
517+
resSeriesStacktraces[index][ts] = sample
518+
}
519+
continue
520+
}
472521

473-
// If we have a CPU samples value type we make sure we always do the next calculation with cpu nanoseconds.
474-
// If we already have CPU nanoseconds we don't need to multiply by the period.
475-
valuePerSecondSum := valueSum
476-
if sampleTypeUnit != "nanoseconds" {
477-
valuePerSecondSum = valueSum * period
522+
var sample metricsSeriesStacktrace
523+
if matches {
524+
// only set values if we have a match otherwise they will be 0
525+
sample = metricsSeriesStacktrace{
526+
durationSum: durationSum,
527+
periodSum: periodSum,
528+
valueSum: valueSum,
529+
valueCount: valueCount,
530+
}
531+
}
532+
resSeriesStacktraces[index][ts] = sample
478533
}
534+
}
535+
}
479536

480-
valuePerSecond := float64(valuePerSecondSum) / float64(duration)
481-
482-
series := resSeries[index]
483-
series.Samples = append(series.Samples, &pb.MetricsSample{
484-
Timestamp: timestamppb.New(timestamp.Time(ts)),
485-
Value: valueSum,
486-
ValuePerSecond: valuePerSecond,
487-
Duration: duration,
488-
})
537+
if filterQuery != "" {
538+
// We have aggregated the metric samples for each timestamp if the underlying stacktrace match.
539+
// Now we need to convert those raw values to the metric samples value and valuePerSecond.
540+
for i, times := range resSeriesStacktraces {
541+
for ts, sample := range times {
542+
resSeries[i].Samples = append(resSeries[i].Samples,
543+
deltaMetricsSample(
544+
ts,
545+
sample.durationSum,
546+
sample.periodSum,
547+
sample.valueSum,
548+
sample.valueCount,
549+
sampleTypeUnit,
550+
))
551+
}
489552
}
490553
}
491554

@@ -499,6 +562,40 @@ func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Ex
499562
return resSeries, nil
500563
}
501564

565+
func deltaMetricsSample(ts, durationSum, periodSum, valueSum, valueCount int64, sampleTypeUnit string) *pb.MetricsSample {
566+
// If the valueCount is zero we cannot do any calculations.
567+
// We simply return a sample with all values set to 0.
568+
if valueCount == 0 {
569+
return &pb.MetricsSample{
570+
Timestamp: timestamppb.New(timestamp.Time(ts)),
571+
// everything else is 0
572+
}
573+
}
574+
575+
// TODO: We should do these period and duration calculations in frostDB, so that we can push these down as projections.
576+
577+
// Because we store the period with each sample yet query for the sum(period) we need to normalize by the amount of values (rows in a database).
578+
period := periodSum / valueCount
579+
// Because we store the duration with each sample yet query for the sum(duration) we need to normalize by the amount of values (rows in a database).
580+
duration := durationSum / valueCount
581+
582+
// If we have a CPU samples value type we make sure we always do the next calculation with cpu nanoseconds.
583+
// If we already have CPU nanoseconds we don't need to multiply by the period.
584+
valuePerSecondSum := valueSum
585+
if sampleTypeUnit != "nanoseconds" {
586+
valuePerSecondSum = valueSum * period
587+
}
588+
589+
valuePerSecond := float64(valuePerSecondSum) / float64(duration)
590+
591+
return &pb.MetricsSample{
592+
Timestamp: timestamppb.New(timestamp.Time(ts)),
593+
Value: valueSum,
594+
ValuePerSecond: valuePerSecond,
595+
Duration: duration,
596+
}
597+
}
598+
502599
func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, filterQuery string) ([]*pb.MetricsSeries, error) {
503600
groupBy := []logicalplan.Expr{
504601
logicalplan.DynCol(ColumnLabels),

ui/packages/shared/profile/src/MetricsGraph/MetricsTooltip/index.tsx

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,6 @@ const MetricsTooltip = ({
122122
<td className="w-1/4">Value</td>
123123
<td className="w-3/4">
124124
{valueFormatter(highlighted.valuePerSecond, sampleUnit, 5)}{' '}
125-
{highlighted.valuePercentage > 0 &&
126-
<>({valueFormatter(highlighted.valuePercentage, 'percentage', 2)})</>
127-
}
128125
</td>
129126
</tr>
130127
{delta && (

ui/packages/shared/profile/src/MetricsGraph/index.tsx

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ export interface HighlightedSeries {
5050
timestamp: number;
5151
value: number;
5252
valuePerSecond: number;
53-
valuePercentage: number;
5453
duration: number;
5554
x: number;
5655
y: number;
@@ -141,7 +140,7 @@ export const RawMetricsGraph = ({
141140
values: s.samples.reduce<number[][]>(function (agg: number[][], d: MetricsSample) {
142141
if (d.timestamp !== undefined && d.valuePerSecond !== undefined) {
143142
const t = (+d.timestamp.seconds * 1e9 + d.timestamp.nanos) / 1e6; // https://github.com/microsoft/TypeScript/issues/5710#issuecomment-157886246
144-
agg.push([t, d.valuePerSecond, parseFloat(d.value), parseFloat(d.duration), d.valuePrecision]);
143+
agg.push([t, d.valuePerSecond, parseFloat(d.value), parseFloat(d.duration)]);
145144
}
146145
return agg;
147146
}, []),
@@ -212,7 +211,6 @@ export const RawMetricsGraph = ({
212211
labels: series[closestSeriesIndex].metric,
213212
timestamp: point[0],
214213
valuePerSecond: point[1],
215-
valuePercentage: point[4],
216214
value: point[2],
217215
duration: point[3],
218216
x: xScale(point[0]),
@@ -338,7 +336,6 @@ export const RawMetricsGraph = ({
338336
seriesIndex,
339337
timestamp: sample[0],
340338
valuePerSecond: sample[1],
341-
valuePercentage: sample[4],
342339
value: sample[2],
343340
duration: sample[3],
344341
x: xScale(sample[0]),

ui/packages/shared/profile/src/MetricsSeries/index.tsx

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ interface MetricsSeriesProps {
2424

2525
const MetricsSeries = ({data, line, color, strokeWidth}: MetricsSeriesProps): JSX.Element => (
2626
<g className="line-group">
27-
<path
27+
<path
2828
className="line"
29-
d={line(data.values) ?? undefined}
30-
style={{
31-
stroke: color,
29+
d={line(data.values) ?? undefined}
30+
style={{
31+
stroke: color,
3232
strokeWidth,
33-
}}
34-
/>
35-
</g>
36-
);
33+
}}
34+
/>
35+
</g>
36+
);
3737

3838
export default MetricsSeries;

ui/packages/shared/profile/src/ProfileMetricsGraph/index.tsx

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ export const useQueryRange = (
6666

6767
const stepDuration = getStepDuration(start, end);
6868

69-
const call = client.queryRange( {
70-
query: queryExpression,
71-
start: Timestamp.fromDate(new Date(start)),
72-
end: Timestamp.fromDate(new Date(end)),
73-
step: Duration.create(stepDuration),
74-
limit: 0,
75-
filterQuery: filterByFunction,
76-
},
69+
const call = client.queryRange(
70+
{
71+
query: queryExpression,
72+
start: Timestamp.fromDate(new Date(start)),
73+
end: Timestamp.fromDate(new Date(end)),
74+
step: Duration.create(stepDuration),
75+
limit: 0,
76+
filterQuery: filterByFunction,
77+
},
7778
{meta: metadata}
7879
);
7980
call.response
@@ -96,7 +97,13 @@ const ProfileMetricsGraph = ({
9697
addLabelMatcher,
9798
onPointClick,
9899
}: ProfileMetricsGraphProps): JSX.Element => {
99-
const {isLoading, response, error} = useQueryRange(queryClient, queryExpression, from, to, filterByFunction);
100+
const {isLoading, response, error} = useQueryRange(
101+
queryClient,
102+
queryExpression,
103+
from,
104+
to,
105+
filterByFunction
106+
);
100107
const isLoaderVisible = useDelayedLoader(isLoading);
101108
const {loader, onError, perf} = useParcaContext();
102109

0 commit comments

Comments
 (0)