Skip to content

Commit 936bd5c

Browse files
committed
streamingpromql: use label/metric name validation scheme from limits
1 parent 1819cf5 commit 936bd5c

File tree

9 files changed

+74
-42
lines changed

9 files changed

+74
-42
lines changed

pkg/frontend/querymiddleware/querysharding_test_utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func newEngineForTesting(t *testing.T, engine string, opts ...engineOpt) (promql
333333
case querier.PrometheusEngine:
334334
return promOpts, promql.NewEngine(promOpts)
335335
case querier.MimirEngine:
336-
limits := streamingpromql.NewStaticQueryLimitsProvider(0)
336+
limits := streamingpromql.NewStaticQueryLimitsProvider(0, model.UTF8Validation)
337337
metrics := stats.NewQueryMetrics(promOpts.Reg)
338338
planner := streamingpromql.NewQueryPlanner(mqeOpts)
339339
logger := log.NewNopLogger()

pkg/mimir/modules.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/prometheus/alertmanager/matchers/compat"
2929
"github.com/prometheus/client_golang/prometheus"
3030
"github.com/prometheus/common/config"
31+
"github.com/prometheus/common/model"
3132
"github.com/prometheus/prometheus/model/labels"
3233
"github.com/prometheus/prometheus/promql"
3334
"github.com/prometheus/prometheus/rules"
@@ -812,9 +813,10 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error
812813
var eng promql.QueryEngine
813814
switch t.Cfg.Frontend.QueryEngine {
814815
case querier.PrometheusEngine:
815-
eng = promql.NewEngine(promOpts)
816+
// TODO: Decide whether this is a good idea.
817+
eng = streamingpromqlcompat.NameValidatingEngine(promql.NewEngine(promOpts), t.Overrides)
816818
case querier.MimirEngine:
817-
streamingEngine, err := streamingpromql.NewEngine(mqeOpts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(mqeOpts.CommonOpts.Reg), t.QueryPlanner, util_log.Logger)
819+
streamingEngine, err := streamingpromql.NewEngine(mqeOpts, streamingpromql.NewStaticQueryLimitsProvider(0, model.UTF8Validation), stats.NewQueryMetrics(mqeOpts.CommonOpts.Reg), t.QueryPlanner, util_log.Logger)
818820
if err != nil {
819821
return nil, fmt.Errorf("unable to create Mimir Query Engine: %w", err)
820822
}
@@ -828,9 +830,6 @@ func (t *Mimir) initQueryFrontendTripperware() (serv services.Service, err error
828830
panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", t.Cfg.Querier.QueryEngine))
829831
}
830832

831-
// TODO: Decide whether this is a good idea.
832-
eng = streamingpromqlcompat.NameValidatingEngine(eng, t.Overrides)
833-
834833
tripperware, err := querymiddleware.NewTripperware(
835834
t.Cfg.Frontend.QueryMiddleware,
836835
util_log.Logger,

pkg/querier/querier.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, quer
185185

186186
switch cfg.QueryEngine {
187187
case PrometheusEngine:
188-
eng = promql.NewEngine(opts)
188+
// TODO: Check whether this approach is a good idea.
189+
eng = compat.NameValidatingEngine(promql.NewEngine(opts), limits)
189190
case MimirEngine:
190191
limitsProvider := NewTenantQueryLimitsProvider(limits)
191192
streamingEngine, err := streamingpromql.NewEngine(mqeOpts, limitsProvider, queryMetrics, planner, logger)
@@ -194,7 +195,8 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, quer
194195
}
195196

196197
if cfg.EnableQueryEngineFallback {
197-
prometheusEngine := promql.NewEngine(opts)
198+
// TODO: Check whether this approach is a good idea.
199+
prometheusEngine := compat.NameValidatingEngine(promql.NewEngine(opts), limits)
198200
eng = compat.NewEngineWithFallback(streamingEngine, prometheusEngine, reg, logger)
199201
} else {
200202
eng = streamingEngine
@@ -203,8 +205,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, quer
203205
panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", cfg.QueryEngine))
204206
}
205207

206-
// TODO: Check whether this approach is a good idea.
207-
eng = compat.NameValidatingEngine(eng, limits)
208208
return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, eng, nil
209209
}
210210

@@ -795,3 +795,19 @@ func (p *TenantQueryLimitsProvider) GetMaxEstimatedMemoryConsumptionPerQuery(ctx
795795

796796
return totalLimit, nil
797797
}
798+
799+
// GetValidationScheme computes the validation scheme for tenants injected into ctx. Returns LegacyValidation if
800+
// at least one tenant uses LegacyValidation, UTF8Validation otherwise.
801+
func (p *TenantQueryLimitsProvider) GetValidationScheme(ctx context.Context) (model.ValidationScheme, error) {
802+
tenantIDs, err := tenant.TenantIDs(ctx)
803+
if err != nil {
804+
return 0, err
805+
}
806+
for _, tenantID := range tenantIDs {
807+
validationScheme := p.limits.ValidationScheme(tenantID)
808+
if validationScheme == model.LegacyValidation {
809+
return validationScheme, nil
810+
}
811+
}
812+
return model.UTF8Validation, nil
813+
}

pkg/streamingpromql/benchmarks/comparison_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/grafana/dskit/services"
2121
"github.com/grafana/dskit/test"
2222
"github.com/grafana/dskit/user"
23+
"github.com/prometheus/common/model"
2324
"github.com/prometheus/prometheus/model/labels"
2425
"github.com/prometheus/prometheus/promql"
2526
"github.com/prometheus/prometheus/storage"
@@ -43,7 +44,7 @@ func BenchmarkQuery(b *testing.B) {
4344

4445
opts := streamingpromql.NewTestEngineOpts()
4546
prometheusEngine := promql.NewEngine(opts.CommonOpts)
46-
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), streamingpromql.NewQueryPlanner(opts), log.NewNopLogger())
47+
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0, model.UTF8Validation), stats.NewQueryMetrics(nil), streamingpromql.NewQueryPlanner(opts), log.NewNopLogger())
4748
require.NoError(b, err)
4849

4950
// Important: the names below must remain in sync with the names used in tools/benchmark-query-engine.
@@ -95,7 +96,7 @@ func TestBothEnginesReturnSameResultsForBenchmarkQueries(t *testing.T) {
9596

9697
opts := streamingpromql.NewTestEngineOpts()
9798
prometheusEngine := promql.NewEngine(opts.CommonOpts)
98-
limitsProvider := streamingpromql.NewStaticQueryLimitsProvider(0)
99+
limitsProvider := streamingpromql.NewStaticQueryLimitsProvider(0, model.UTF8Validation)
99100
queryMetrics := stats.NewQueryMetrics(nil)
100101
mimirEngine, err := streamingpromql.NewEngine(opts, limitsProvider, queryMetrics, streamingpromql.NewQueryPlanner(opts), log.NewNopLogger())
101102
require.NoError(t, err)
@@ -124,7 +125,7 @@ func TestBenchmarkSetup(t *testing.T) {
124125
q := createBenchmarkQueryable(t, []int{1})
125126

126127
opts := streamingpromql.NewTestEngineOpts()
127-
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), streamingpromql.NewQueryPlanner(opts), log.NewNopLogger())
128+
mimirEngine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0, model.UTF8Validation), stats.NewQueryMetrics(nil), streamingpromql.NewQueryPlanner(opts), log.NewNopLogger())
128129
require.NoError(t, err)
129130

130131
ctx := user.InjectOrgID(context.Background(), UserID)

pkg/streamingpromql/engine.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/go-kit/log"
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/prometheus/client_golang/prometheus/promauto"
18+
"github.com/prometheus/common/model"
1819
"github.com/prometheus/prometheus/promql"
1920
"github.com/prometheus/prometheus/storage"
2021
"go.opentelemetry.io/otel"
@@ -143,25 +144,33 @@ func (e *Engine) newQueryFromPlanner(ctx context.Context, q storage.Queryable, o
143144
type QueryLimitsProvider interface {
144145
// GetMaxEstimatedMemoryConsumptionPerQuery returns the maximum estimated memory allowed to be consumed by a query in bytes, or 0 to disable the limit.
145146
GetMaxEstimatedMemoryConsumptionPerQuery(ctx context.Context) (uint64, error)
147+
// GetValidationScheme returns the label/metric name validation scheme to use for a query.
148+
GetValidationScheme(ctx context.Context) (model.ValidationScheme, error)
146149
}
147150

148151
// NewStaticQueryLimitsProvider returns a QueryLimitsProvider that always returns the provided limits.
149152
//
150153
// This should generally only be used in tests.
151-
func NewStaticQueryLimitsProvider(maxEstimatedMemoryConsumptionPerQuery uint64) QueryLimitsProvider {
154+
func NewStaticQueryLimitsProvider(maxEstimatedMemoryConsumptionPerQuery uint64, validationScheme model.ValidationScheme) QueryLimitsProvider {
152155
return staticQueryLimitsProvider{
153156
maxEstimatedMemoryConsumptionPerQuery: maxEstimatedMemoryConsumptionPerQuery,
157+
validationScheme: validationScheme,
154158
}
155159
}
156160

157161
type staticQueryLimitsProvider struct {
158162
maxEstimatedMemoryConsumptionPerQuery uint64
163+
validationScheme model.ValidationScheme
159164
}
160165

161166
func (p staticQueryLimitsProvider) GetMaxEstimatedMemoryConsumptionPerQuery(_ context.Context) (uint64, error) {
162167
return p.maxEstimatedMemoryConsumptionPerQuery, nil
163168
}
164169

170+
func (p staticQueryLimitsProvider) GetValidationScheme(_ context.Context) (model.ValidationScheme, error) {
171+
return p.validationScheme, nil
172+
}
173+
165174
type NoopQueryTracker struct{}
166175

167176
func (n *NoopQueryTracker) GetMaxConcurrent() int {

pkg/streamingpromql/engine_concurrency_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/go-kit/log"
13+
"github.com/prometheus/common/model"
1314
"github.com/prometheus/prometheus/model/timestamp"
1415
"github.com/prometheus/prometheus/promql"
1516
"github.com/prometheus/prometheus/promql/promqltest"
@@ -187,7 +188,7 @@ func TestConcurrentQueries(t *testing.T) {
187188
t.Cleanup(func() { require.NoError(t, storage.Close()) })
188189

189190
opts := NewTestEngineOpts()
190-
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), NewQueryPlanner(opts), log.NewNopLogger())
191+
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0, model.UTF8Validation), stats.NewQueryMetrics(nil), NewQueryPlanner(opts), log.NewNopLogger())
191192
require.NoError(t, err)
192193

193194
// Populate the expected result for each query.

0 commit comments

Comments
 (0)