diff --git a/CHANGELOG.md b/CHANGELOG.md index f2df165baa0..4656e2d91fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [FEATURE] StoreGateway: Add a flag `-blocks-storage.bucket-store.honor-projection-hints`. If enabled, Store Gateway in Parquet mode will honor projection hints and only materialize requested labels. #7206 * [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 * [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 7879646ff32..559ac3a4de3 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1813,6 +1813,12 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # [Experimental] If enabled, Store Gateway will honor projection hints and + # only materialize requested labels. It is only effect when + # `-blocks-storage.bucket-store.bucket-store-type` is parquet. + # CLI flag: -blocks-storage.bucket-store.honor-projection-hints + [honor_projection_hints: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 6e6a0c160a4..faed8494f56 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1879,6 +1879,12 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # [Experimental] If enabled, Store Gateway will honor projection hints and + # only materialize requested labels. It is only effect when + # `-blocks-storage.bucket-store.bucket-store-type` is parquet. + # CLI flag: -blocks-storage.bucket-store.honor-projection-hints + [honor_projection_hints: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 41de9d6e294..fa72b964991 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2491,6 +2491,12 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # [Experimental] If enabled, Store Gateway will honor projection hints and + # only materialize requested labels. It is only effect when + # `-blocks-storage.bucket-store.bucket-store-type` is parquet. + # CLI flag: -blocks-storage.bucket-store.honor-projection-hints + [honor_projection_hints: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 2a35b853688..b277c9484bf 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -110,6 +110,8 @@ Currently experimental features are: - Store Gateway Zone Stable Shuffle Sharding - `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag - `zone_stable_shuffle_sharding` (boolean) field in config file +- Store Gateway HonorProjectionHints in Parquet Mode + - `-blocks-storage.bucket-store.honor-projection-hints` CLI flag - Basic Lifecycler (Storegateway, Alertmanager, Ruler) Final Sleep on shutdown, which tells the pod wait before shutdown, allowing a delay to propagate ring changes. - `-ruler.ring.final-sleep` (duration) CLI flag - `store-gateway.sharding-ring.final-sleep` (duration) CLI flag diff --git a/integration/parquet_store_gateway_test.go b/integration/parquet_store_gateway_test.go new file mode 100644 index 00000000000..7d8f942daa1 --- /dev/null +++ b/integration/parquet_store_gateway_test.go @@ -0,0 +1,207 @@ +//go:build integration_querier + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "slices" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetBucketStore_ProjectionHint(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + minio := e2edb.NewMinio(9000, bucketName) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) + + // Define configuration flags. + flags := BlocksStorageFlags() + flags = mergeFlags(flags, map[string]string{ + // Enable Thanos engine and projection optimization. + "-querier.thanos-engine": "true", + "-querier.optimizers": "projection", + + // enable honor-projection-hints querier and store gateway + "-querier.honor-projection-hints": "true", + "-blocks-storage.bucket-store.honor-projection-hints": "true", + // enable Store Gateway Parquet mode + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + + // Set query-ingesters-within to 1h so queries older than 1h don't hit ingesters + "-querier.query-ingesters-within": "1h", + + // Configure Parquet Converter + "-parquet-converter.enabled": "true", + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-compactor.block-ranges": "1ms,12h", + // Enable cache + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }) + + // Store Gateway + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(storeGateway)) + + // Parquet Converter + parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(parquetConverter)) + + // Querier + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) + + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Create block + now := time.Now() + // Time range: [Now - 24h] to [Now - 20h] + start := now.Add(-24 * time.Hour) + end := now.Add(-20 * time.Hour) + + ctx := context.Background() + + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + methods := []string{"GET", "POST", "PUT", "DELETE"} + + numSeries := 10 + numSamples := 100 + + lbls := make([]labels.Labels, 0, numSeries) + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.FromStrings( + labels.MetricName, "http_requests_total", + "job", "api-server", + "instance", fmt.Sprintf("instance-%d", i), + "status_code", statusCodes[i%len(statusCodes)], + "method", methods[i%len(methods)], + "path", fmt.Sprintf("/api/v1/endpoint%d", i%3), + "cluster", "test-cluster", + )) + } + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + + storage, err := e2ecortex.NewS3ClientForMinio(minio, bucketName) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + // Upload TSDB Block + require.NoError(t, block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)) + + // Wait until parquet converter convert block + require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics)) + + // Create client + c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} { + labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end) + if err != nil { + t.Logf("Series query failed: %v", err) + return false + } + return len(labelSets) > 0 + }) + + testCases := []struct { + name string + query string + expectedLabels []string // query result should contain these labels + }{ + { + name: "vector selector query", + query: `http_requests_total`, + expectedLabels: []string{ + "__name__", "job", "instance", "status_code", "method", "path", "cluster", + }, + }, + { + name: "simple_sum_by_job", + query: `sum by (job) (http_requests_total)`, + expectedLabels: []string{"job"}, + }, + { + name: "rate_with_aggregation", + query: `sum by (method) (rate(http_requests_total[5m]))`, + expectedLabels: []string{"method"}, + }, + { + name: "multiple_grouping_labels", + query: `sum by (job, status_code) (http_requests_total)`, + expectedLabels: []string{"job", "status_code"}, + }, + { + name: "aggregation without query", + query: `sum without (instance, method) (http_requests_total)`, + expectedLabels: []string{"job", "status_code", "path", "cluster"}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Testing: %s", tc.query) + + // Execute instant query + result, err := c.Query(tc.query, end) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify we got results + vector, ok := result.(model.Vector) + require.True(t, ok, "result should be a vector") + require.NotEmpty(t, vector, "query should return results") + + for _, sample := range vector { + actualLabels := make(map[string]struct{}) + for label := range sample.Metric { + actualLabels[string(label)] = struct{}{} + } + + // Check that all expected labels are present + for _, expectedLabel := range tc.expectedLabels { + _, ok := actualLabels[expectedLabel] + require.True(t, ok, + "series should have %s label", expectedLabel) + } + + // Check that no unexpected labels are present + for lbl := range actualLabels { + if !slices.Contains(tc.expectedLabels, lbl) { + require.Fail(t, "series should not have unexpected label: %s", lbl) + } + } + } + }) + } +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index c5e465c591a..4c2e05732ec 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -334,7 +334,8 @@ type BucketStoreConfig struct { // Token bucket configs TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"` // Parquet shard cache config - ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + HonorProjectionHints bool `yaml:"honor_projection_hints"` } type TokenBucketBytesLimiterConfig struct { @@ -397,6 +398,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token") f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.") cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f) + f.BoolVar(&cfg.HonorProjectionHints, "blocks-storage.bucket-store.honor-projection-hints", false, "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It is only effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.") } // Validate the config. diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index e6bded1e2f6..8eec27a278e 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -3,6 +3,7 @@ package storegateway import ( "context" "fmt" + "slices" "strings" "github.com/go-kit/log" @@ -38,8 +39,9 @@ type parquetBucketStore struct { chunksDecoder *schema.PrometheusParquetChunksDecoder - matcherCache storecache.MatchersCache - parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] + matcherCache storecache.MatchersCache + parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] + honorProjectionHints bool } func (p *parquetBucketStore) Close() error { @@ -107,6 +109,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep return fmt.Errorf("failed to find parquet shards: %w", err) } + storageHints := p.buildSelectHints(req.QueryHints, shards, req.MinTime, req.MaxTime) seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards)) errGroup, ctx := errgroup.WithContext(srv.Context()) errGroup.SetLimit(p.concurrency) @@ -116,7 +119,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep Id: shard.name, }) errGroup.Go(func() error { - ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers) + ss, err := shard.Query(ctx, storageHints, req.SkipChunks, matchers) seriesSet[i] = ss return err }) @@ -279,3 +282,42 @@ func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label Hints: anyHints, }, nil } + +func (p *parquetBucketStore) buildSelectHints(queryHints *storepb.QueryHints, shards []*parquetBlock, minT, maxT int64) *prom_storage.SelectHints { + storageHints := &prom_storage.SelectHints{ + Start: minT, + End: maxT, + } + + if p.honorProjectionHints && queryHints != nil { + storageHints.ProjectionInclude = queryHints.ProjectionInclude + storageHints.ProjectionLabels = queryHints.ProjectionLabels + + if storageHints.ProjectionInclude { + // Reset projection hints if not all parquet shard have the hash column. + if !allParquetBlocksHaveHashColumn(shards) { + storageHints.ProjectionInclude = false + storageHints.ProjectionLabels = nil + } + } else { + // Reset hints for non-include projections to force a full scan, matching querier behavior. + storageHints.ProjectionLabels = nil + } + + if storageHints.ProjectionInclude && !slices.Contains(storageHints.ProjectionLabels, schema.SeriesHashColumn) { + storageHints.ProjectionLabels = append(storageHints.ProjectionLabels, schema.SeriesHashColumn) + } + } + + return storageHints +} + +func allParquetBlocksHaveHashColumn(blocks []*parquetBlock) bool { + // TODO(Sungjin1212): Change it to read marker version + for _, b := range blocks { + if !b.hasHashColumn() { + return false + } + } + return true +} diff --git a/pkg/storegateway/parquet_bucket_store_test.go b/pkg/storegateway/parquet_bucket_store_test.go new file mode 100644 index 00000000000..d7ccd654a1c --- /dev/null +++ b/pkg/storegateway/parquet_bucket_store_test.go @@ -0,0 +1,255 @@ +package storegateway + +import ( + "bytes" + "testing" + + "github.com/parquet-go/parquet-go" + "github.com/prometheus-community/parquet-common/schema" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type mockParquetFileView struct { + parquet_storage.ParquetFileView + file *parquet.File +} + +func (m *mockParquetFileView) Schema() *parquet.Schema { + return m.file.Schema() +} + +type mockShard struct { + parquet_storage.ParquetShard + fileView parquet_storage.ParquetFileView +} + +func (m *mockShard) LabelsFile() parquet_storage.ParquetFileView { + return m.fileView +} + +func createTestParquetBlock(t *testing.T, hasHash bool) *parquetBlock { + t.Helper() + + var buf bytes.Buffer + var err error + + if hasHash { + // v2 and higher blocks + type RowWithHash struct { + SeriesHash string `parquet:"s_series_hash"` + Label string `parquet:"l_job"` + } + + w := parquet.NewGenericWriter[RowWithHash](&buf) + _, err = w.Write([]RowWithHash{{SeriesHash: "hash1", Label: "node-1"}}) + require.NoError(t, err) + require.NoError(t, w.Close()) + } else { + // v1 block + type RowWithoutHash struct { + Label string `parquet:"l_job"` + } + + w := parquet.NewGenericWriter[RowWithoutHash](&buf) + _, err = w.Write([]RowWithoutHash{{Label: "node-1"}}) + require.NoError(t, err) + require.NoError(t, w.Close()) + } + + readBuf := bytes.NewReader(buf.Bytes()) + f, err := parquet.OpenFile(readBuf, readBuf.Size()) + require.NoError(t, err) + + return &parquetBlock{ + shard: &mockShard{ + fileView: &mockParquetFileView{file: f}, + }, + } +} + +func Test_AllParquetBlocksHaveHashColumn(t *testing.T) { + tests := []struct { + description string + setup func() []*parquetBlock + expected bool + }{ + { + description: "returns true when all blocks have hash column", + setup: func() []*parquetBlock { + return []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, true), + createTestParquetBlock(t, true), + } + }, + expected: true, + }, + { + description: "returns false when mixed block versions exist", + setup: func() []*parquetBlock { + return []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, false), + createTestParquetBlock(t, true), + } + }, + expected: false, + }, + { + description: "returns false when no blocks have hash column", + setup: func() []*parquetBlock { + return []*parquetBlock{ + createTestParquetBlock(t, false), + createTestParquetBlock(t, false), + } + }, + expected: false, + }, + { + description: "returns true for empty block list", + setup: func() []*parquetBlock { + return []*parquetBlock{} + }, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + blocks := tc.setup() + actual := allParquetBlocksHaveHashColumn(blocks) + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestParquetBucketStore_buildSelectHints(t *testing.T) { + const ( + minT = 1000 + maxT = 2000 + ) + + tests := []struct { + description string + honorProjectionHints bool + queryHints *storepb.QueryHints + shards []*parquetBlock + expectedHints *prom_storage.SelectHints + }{ + { + description: "honorProjectionHints=false, should ignore query hints", + honorProjectionHints: false, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job"}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: false, + ProjectionLabels: nil, + }, + }, + { + description: "honorProjectionHints=true, V2 blocks, should enable projection and append hash column", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job"}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: true, + ProjectionLabels: []string{"job", schema.SeriesHashColumn}, + }, + }, + { + description: "honorProjectionHints=true, V2 blocks, should not duplicate hash column if already present", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job", schema.SeriesHashColumn}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: true, + ProjectionLabels: []string{"job", schema.SeriesHashColumn}, + }, + }, + { + description: "honorProjectionHints=true, Mixed V1/V2 blocks, should reset projection", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job"}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, false), // v1 + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: false, + ProjectionLabels: nil, + }, + }, + { + description: "honorProjectionHints=true, nil query hints, should return default hints", + honorProjectionHints: true, + queryHints: nil, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: false, + ProjectionLabels: nil, + }, + }, + { + description: "honorProjectionHints=true, Empty projection labels, should add hash column only", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: true, + ProjectionLabels: []string{schema.SeriesHashColumn}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + store := &parquetBucketStore{ + honorProjectionHints: tc.honorProjectionHints, + } + shards := tc.shards + hints := store.buildSelectHints(tc.queryHints, shards, minT, maxT) + require.Equal(t, tc.expectedHints, hints) + }) + } +} diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index b51bf758ae5..6eae37927f9 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -255,13 +255,14 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) store := &parquetBucketStore{ - logger: userLogger, - bucket: userBucket, - limits: u.limits, - concurrency: 4, // TODO: make this configurable - chunksDecoder: u.chunksDecoder, - matcherCache: u.matcherCache, - parquetShardCache: u.parquetShardCache, + logger: userLogger, + bucket: userBucket, + limits: u.limits, + concurrency: 4, // TODO: make this configurable + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + parquetShardCache: u.parquetShardCache, + honorProjectionHints: u.cfg.BucketStore.HonorProjectionHints, } return store, nil @@ -310,7 +311,7 @@ func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, s if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback, false) + m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback, p.honorProjectionHints) if err != nil { return nil, err } @@ -365,7 +366,7 @@ func (f *shardMatcherLabelsFilter) Close() { f.shardMatcher.Close() } -func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { +func (b *parquetBlock) Query(ctx context.Context, hints *prom_storage.SelectHints, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(b.concurrency) @@ -395,7 +396,7 @@ func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks b return nil } - seriesSetIter, err := b.m.Materialize(ctx, nil, rgi, mint, maxt, skipChunks, rr) + seriesSetIter, err := b.m.Materialize(ctx, hints, rgi, hints.Start, hints.End, skipChunks, rr) if err != nil { return err } @@ -532,6 +533,18 @@ func (b *parquetBlock) allLabelValues(ctx context.Context, name string, limit in return util.MergeUnsortedSlices(int(limit), results...), nil } +// hasHashColumn checks if the parquet block contains the schema.SeriesHashColumn column. +// This is used to determine if projection pushdown can be enabled. +func (b *parquetBlock) hasHashColumn() bool { + labelsFile := b.shard.LabelsFile() + if labelsFile == nil { + return false + } + + _, found := labelsFile.Schema().Lookup(schema.SeriesHashColumn) + return found +} + type byLabels []prom_storage.ChunkSeries func (b byLabels) Len() int { return len(b) } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8e1e48a8f87..a7055e266ba 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -1455,6 +1455,12 @@ "x-cli-flag": "blocks-storage.bucket-store.consistency-delay", "x-format": "duration" }, + "honor_projection_hints": { + "default": false, + "description": "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It is only effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.honor-projection-hints" + }, "ignore_blocks_before": { "default": "0s", "description": "The blocks created before `now() - ignore_blocks_before` will not be synced. 0 to disable.",