Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [FEATURE] StoreGateway: Add a flag `-blocks-storage.bucket-store.honor-projection-hints`. If enabled, Store Gateway in Parquet mode will honor projection hints and only materialize requested labels. #7206
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If enabled, Store Gateway will honor projection hints and
# only materialize requested labels. It is only effect when
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1879,6 +1879,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If enabled, Store Gateway will honor projection hints and
# only materialize requested labels. It is only effect when
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,12 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If enabled, Store Gateway will honor projection hints and
# only materialize requested labels. It is only effect when
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Currently experimental features are:
- Store Gateway Zone Stable Shuffle Sharding
- `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag
- `zone_stable_shuffle_sharding` (boolean) field in config file
- Store Gateway HonorProjectionHints in Parquet Mode
- `-blocks-storage.bucket-store.honor-projection-hints` CLI flag
- Basic Lifecycler (Storegateway, Alertmanager, Ruler) Final Sleep on shutdown, which tells the pod wait before shutdown, allowing a delay to propagate ring changes.
- `-ruler.ring.final-sleep` (duration) CLI flag
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
Expand Down
207 changes: 207 additions & 0 deletions integration/parquet_store_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
//go:build integration_querier

package integration

import (
"context"
"fmt"
"math/rand"
"path/filepath"
"slices"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)

func TestParquetBucketStore_ProjectionHint(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
minio := e2edb.NewMinio(9000, bucketName)
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))

// Define configuration flags.
flags := BlocksStorageFlags()
flags = mergeFlags(flags, map[string]string{
// Enable Thanos engine and projection optimization.
"-querier.thanos-engine": "true",
"-querier.optimizers": "projection",

// enable honor-projection-hints querier and store gateway
"-querier.honor-projection-hints": "true",
"-blocks-storage.bucket-store.honor-projection-hints": "true",
// enable Store Gateway Parquet mode
"-blocks-storage.bucket-store.bucket-store-type": "parquet",

// Set query-ingesters-within to 1h so queries older than 1h don't hit ingesters
"-querier.query-ingesters-within": "1h",

// Configure Parquet Converter
"-parquet-converter.enabled": "true",
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-compactor.block-ranges": "1ms,12h",
// Enable cache
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
})

// Store Gateway
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(storeGateway))

// Parquet Converter
parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(parquetConverter))

// Querier
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Create block
now := time.Now()
// Time range: [Now - 24h] to [Now - 20h]
start := now.Add(-24 * time.Hour)
end := now.Add(-20 * time.Hour)

ctx := context.Background()

rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
methods := []string{"GET", "POST", "PUT", "DELETE"}

numSeries := 10
numSamples := 100

lbls := make([]labels.Labels, 0, numSeries)
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.FromStrings(
labels.MetricName, "http_requests_total",
"job", "api-server",
"instance", fmt.Sprintf("instance-%d", i),
"status_code", statusCodes[i%len(statusCodes)],
"method", methods[i%len(methods)],
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
"cluster", "test-cluster",
))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)

storage, err := e2ecortex.NewS3ClientForMinio(minio, bucketName)
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

// Upload TSDB Block
require.NoError(t, block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc))

// Wait until parquet converter convert block
require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics))

// Create client
c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end)
if err != nil {
t.Logf("Series query failed: %v", err)
return false
}
return len(labelSets) > 0
})

testCases := []struct {
name string
query string
expectedLabels []string // query result should contain these labels
}{
{
name: "vector selector query",
query: `http_requests_total`,
expectedLabels: []string{
"__name__", "job", "instance", "status_code", "method", "path", "cluster",
},
},
{
name: "simple_sum_by_job",
query: `sum by (job) (http_requests_total)`,
expectedLabels: []string{"job"},
},
{
name: "rate_with_aggregation",
query: `sum by (method) (rate(http_requests_total[5m]))`,
expectedLabels: []string{"method"},
},
{
name: "multiple_grouping_labels",
query: `sum by (job, status_code) (http_requests_total)`,
expectedLabels: []string{"job", "status_code"},
},
{
name: "aggregation without query",
query: `sum without (instance, method) (http_requests_total)`,
expectedLabels: []string{"job", "status_code", "path", "cluster"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Testing: %s", tc.query)

// Execute instant query
result, err := c.Query(tc.query, end)
require.NoError(t, err)
require.NotNil(t, result)

// Verify we got results
vector, ok := result.(model.Vector)
require.True(t, ok, "result should be a vector")
require.NotEmpty(t, vector, "query should return results")

for _, sample := range vector {
actualLabels := make(map[string]struct{})
for label := range sample.Metric {
actualLabels[string(label)] = struct{}{}
}

// Check that all expected labels are present
for _, expectedLabel := range tc.expectedLabels {
_, ok := actualLabels[expectedLabel]
require.True(t, ok,
"series should have %s label", expectedLabel)
}

// Check that no unexpected labels are present
for lbl := range actualLabels {
if !slices.Contains(tc.expectedLabels, lbl) {
require.Fail(t, "series should not have unexpected label: %s", lbl)
}
}
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ type BucketStoreConfig struct {
// Token bucket configs
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
// Parquet shard cache config
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
HonorProjectionHints bool `yaml:"honor_projection_hints"`
}

type TokenBucketBytesLimiterConfig struct {
Expand Down Expand Up @@ -397,6 +398,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f)
f.BoolVar(&cfg.HonorProjectionHints, "blocks-storage.bucket-store.honor-projection-hints", false, "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It is only effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.")
}

// Validate the config.
Expand Down
48 changes: 45 additions & 3 deletions pkg/storegateway/parquet_bucket_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storegateway
import (
"context"
"fmt"
"slices"
"strings"

"github.com/go-kit/log"
Expand Down Expand Up @@ -38,8 +39,9 @@ type parquetBucketStore struct {

chunksDecoder *schema.PrometheusParquetChunksDecoder

matcherCache storecache.MatchersCache
parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard]
matcherCache storecache.MatchersCache
parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard]
honorProjectionHints bool
}

func (p *parquetBucketStore) Close() error {
Expand Down Expand Up @@ -107,6 +109,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep
return fmt.Errorf("failed to find parquet shards: %w", err)
}

storageHints := p.buildSelectHints(req.QueryHints, shards, req.MinTime, req.MaxTime)
seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards))
errGroup, ctx := errgroup.WithContext(srv.Context())
errGroup.SetLimit(p.concurrency)
Expand All @@ -116,7 +119,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep
Id: shard.name,
})
errGroup.Go(func() error {
ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers)
ss, err := shard.Query(ctx, storageHints, req.SkipChunks, matchers)
seriesSet[i] = ss
return err
})
Expand Down Expand Up @@ -279,3 +282,42 @@ func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label
Hints: anyHints,
}, nil
}

func (p *parquetBucketStore) buildSelectHints(queryHints *storepb.QueryHints, shards []*parquetBlock, minT, maxT int64) *prom_storage.SelectHints {
storageHints := &prom_storage.SelectHints{
Start: minT,
End: maxT,
}

if p.honorProjectionHints && queryHints != nil {
storageHints.ProjectionInclude = queryHints.ProjectionInclude
storageHints.ProjectionLabels = queryHints.ProjectionLabels

if storageHints.ProjectionInclude {
// Reset projection hints if not all parquet shard have the hash column.
if !allParquetBlocksHaveHashColumn(shards) {
storageHints.ProjectionInclude = false
storageHints.ProjectionLabels = nil
}
} else {
// Reset hints for non-include projections to force a full scan, matching querier behavior.
storageHints.ProjectionLabels = nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also reset the projection hints if it is not projection include. Same as what we do in querier.

if !sp.ProjectionInclude || q.distributor.UseQueryable(q.now, mint, maxt) {

If we don't do this then it can do wrong below as we only add series hash if it is projection include. In general it is hard to estimate the cost of projection not include and it can be more expensive


if storageHints.ProjectionInclude && !slices.Contains(storageHints.ProjectionLabels, schema.SeriesHashColumn) {
storageHints.ProjectionLabels = append(storageHints.ProjectionLabels, schema.SeriesHashColumn)
}
}

return storageHints
}

func allParquetBlocksHaveHashColumn(blocks []*parquetBlock) bool {
// TODO(Sungjin1212): Change it to read marker version
for _, b := range blocks {
if !b.hasHashColumn() {
return false
}
}
return true
}
Loading
Loading