Skip to content

Commit 12601ba

Browse files
committed
feat(chunk,ct): add chunk format that supports start timestamp
Related to prometheus/proposals#60 Signed-off-by: bwplotka <[email protected]>
1 parent 1045fff commit 12601ba

File tree

10 files changed

+751
-113
lines changed

10 files changed

+751
-113
lines changed

storage/interface.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -460,25 +460,37 @@ type Series interface {
460460
}
461461

462462
type mockSeries struct {
463-
timestamps []int64
464-
values []float64
465-
labelSet []string
463+
timestamps []int64
464+
startTimestamps []int64
465+
values []float64
466+
labelSet []string
466467
}
467468

468469
func (s mockSeries) Labels() labels.Labels {
469470
return labels.FromStrings(s.labelSet...)
470471
}
471472

472473
func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
473-
return chunkenc.MockSeriesIterator(s.timestamps, s.values)
474+
return chunkenc.MockSeriesIterator(s.startTimestamps, s.timestamps, s.values)
474475
}
475476

476477
// MockSeries returns a series with custom timestamps, values and labelSet.
477478
func MockSeries(timestamps []int64, values []float64, labelSet []string) Series {
478479
return mockSeries{
479-
timestamps: timestamps,
480-
values: values,
481-
labelSet: labelSet,
480+
startTimestamps: make([]int64, len(timestamps)), // 0 means unset.
481+
timestamps: timestamps,
482+
values: values,
483+
labelSet: labelSet,
484+
}
485+
}
486+
487+
// MockSeriesWithST returns a series with custom start timestamps, timestamps, values and labelSet.
488+
func MockSeriesWithST(startTimestamps, timestamps []int64, values []float64, labelSet []string) Series {
489+
return mockSeries{
490+
startTimestamps: startTimestamps,
491+
timestamps: timestamps,
492+
values: values,
493+
labelSet: labelSet,
482494
}
483495
}
484496

tsdb/chunkenc/benchmark_test.go

Lines changed: 134 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ import (
1919
"io"
2020
"math/rand"
2121
"testing"
22+
"time"
2223

2324
"github.com/stretchr/testify/require"
25+
26+
"github.com/prometheus/prometheus/model/timestamp"
2427
)
2528

2629
type sampleCase struct {
2730
name string
28-
samples []pair
31+
samples []triple
2932
}
3033

3134
type fmtCase struct {
@@ -36,55 +39,136 @@ type fmtCase struct {
3639
func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampleCase)) {
3740
const nSamples = 120 // Same as tsdb.DefaultSamplesPerChunk.
3841

42+
d, err := time.Parse(time.DateTime, "2025-11-04 10:01:05")
43+
require.NoError(b, err)
44+
3945
var (
40-
r = rand.New(rand.NewSource(1))
41-
initT = int64(1234123324)
42-
initV = 1243535.123
46+
r = rand.New(rand.NewSource(1))
47+
initST = timestamp.FromTime(d) // Use realistic timestamp.
48+
initT = initST + 15000 // 15s after initST.
49+
initV = 1243535.123
4350
)
4451

4552
sampleCases := []sampleCase{
4653
{
47-
name: "constant",
48-
samples: func() (ret []pair) {
54+
name: "vt=constant/st=0",
55+
samples: func() (ret []triple) {
4956
t, v := initT, initV
5057
for range nSamples {
5158
t += 1000
52-
ret = append(ret, pair{t: t, v: v})
59+
ret = append(ret, triple{st: 0, t: t, v: v})
5360
}
5461
return ret
5562
}(),
5663
},
5764
{
58-
name: "random steps",
59-
samples: func() (ret []pair) {
65+
// Cumulative with a constant ST through the whole chunk, typical case (e.g. long counting counter).
66+
name: "vt=constant/st=cumulative",
67+
samples: func() (ret []triple) {
6068
t, v := initT, initV
6169
for range nSamples {
70+
t += 1000
71+
ret = append(ret, triple{st: initST, t: t, v: v})
72+
}
73+
return ret
74+
}(),
75+
},
76+
{
77+
// Delta simulates delta type or worst case for cumulatives, where ST
78+
// is changing on every sample.
79+
name: "vt=constant/st=delta",
80+
samples: func() (ret []triple) {
81+
st, t, v := initST, initT, initV
82+
for range nSamples {
83+
st = t + 1 // ST is a tight interval after the last t+1ms.
84+
t += 1000
85+
ret = append(ret, triple{st: st, t: t, v: v})
86+
}
87+
return ret
88+
}(),
89+
},
90+
{
91+
name: "vt=random steps/st=0",
92+
samples: func() (ret []triple) {
93+
t, v := initT, initV
94+
for range nSamples {
95+
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
96+
v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
97+
ret = append(ret, triple{st: 0, t: t, v: v})
98+
}
99+
return ret
100+
}(),
101+
},
102+
{
103+
name: "vt=random steps/st=cumulative",
104+
samples: func() (ret []triple) {
105+
t, v := initT, initV
106+
for range nSamples {
107+
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
108+
v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
109+
ret = append(ret, triple{st: initST, t: t, v: v})
110+
}
111+
return ret
112+
}(),
113+
},
114+
{
115+
name: "vt=random steps/st=delta",
116+
samples: func() (ret []triple) {
117+
st, t, v := initST, initT, initV
118+
for range nSamples {
119+
st = t + 1 // ST is a tight interval after the last t+1ms.
62120
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
63121
v += float64(r.Intn(100) - 50) // Varying from -50 to +50 in 100 discrete steps.
64-
ret = append(ret, pair{t: t, v: v})
122+
ret = append(ret, triple{st: st, t: t, v: v})
65123
}
66124
return ret
67125
}(),
68126
},
69127
{
70-
name: "random 0-1",
71-
samples: func() (ret []pair) {
128+
name: "vt=random 0-1/st=0",
129+
samples: func() (ret []triple) {
72130
t, v := initT, initV
73131
for range nSamples {
74132
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
75133
v += r.Float64() // Random between 0 and 1.0.
76-
ret = append(ret, pair{t: t, v: v})
134+
ret = append(ret, triple{st: 0, t: t, v: v})
135+
}
136+
return ret
137+
}(),
138+
},
139+
{
140+
name: "vt=random 0-1/st=cumulative",
141+
samples: func() (ret []triple) {
142+
t, v := initT, initV
143+
for range nSamples {
144+
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
145+
v += r.Float64() // Random between 0 and 1.0.
146+
ret = append(ret, triple{st: initST, t: t, v: v})
147+
}
148+
return ret
149+
}(),
150+
},
151+
{
152+
name: "vt=random 0-1/st=delta",
153+
samples: func() (ret []triple) {
154+
st, t, v := initST, initT, initV
155+
for range nSamples {
156+
st = t + 1 // ST is a tight interval after the last t+1ms.
157+
t += int64(r.Intn(100) - 50 + 15000) // 15 seconds +- up to 100ms of jitter.
158+
v += r.Float64() // Random between 0 and 1.0.
159+
ret = append(ret, triple{st: st, t: t, v: v})
77160
}
78161
return ret
79162
}(),
80163
},
81164
}
82165

83166
for _, f := range []fmtCase{
84-
{name: "XOR", newChunkFn: func() Chunk { return NewXORChunk() }},
167+
{name: "XOR (ST ignored)", newChunkFn: func() Chunk { return NewXORChunk() }},
168+
{name: "XORv2", newChunkFn: func() Chunk { return NewXORv2Chunk() }},
85169
} {
86170
for _, s := range sampleCases {
87-
b.Run(fmt.Sprintf("fmt=%s/samples=%s", f.name, s.name), func(b *testing.B) {
171+
b.Run(fmt.Sprintf("fmt=%s/%s", f.name, s.name), func(b *testing.B) {
88172
fn(b, f, s)
89173
})
90174
}
@@ -94,7 +178,7 @@ func foreachFmtSampleCase(b *testing.B, fn func(b *testing.B, f fmtCase, s sampl
94178
/*
95179
export bench=append && go test \
96180
-run '^$' -bench '^BenchmarkAppender' \
97-
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
181+
-benchtime 2s -count 6 -cpu 2 -timeout 999m \
98182
| tee ${bench}.txt
99183
*/
100184
func BenchmarkAppender(b *testing.B) {
@@ -104,41 +188,66 @@ func BenchmarkAppender(b *testing.B) {
104188
for b.Loop() {
105189
c := f.newChunkFn()
106190

107-
a, err := c.Appender()
191+
newAppenderFn, _ := compatNewAppenderV2(c)
192+
a, err := newAppenderFn()
108193
if err != nil {
109194
b.Fatalf("get appender: %s", err)
110195
}
111196
for _, p := range s.samples {
112-
a.Append(p.t, p.v)
197+
a.Append(p.st, p.t, p.v)
113198
}
114199
b.ReportMetric(float64(len(c.Bytes())), "B/chunk")
200+
201+
require.Equal(b, len(s.samples), c.NumSamples())
115202
}
116203
})
117204
}
118205

206+
type supportsAppenderV2 interface {
207+
// AppenderV2 returns an v2 appender to append samples to the chunk.
208+
AppenderV2() (AppenderV2, error)
209+
}
210+
119211
/*
120212
export bench=iter && go test \
121213
-run '^$' -bench '^BenchmarkIterator' \
122-
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
214+
-benchtime 2s -count 6 -cpu 2 -timeout 999m \
123215
| tee ${bench}.txt
124216
*/
125217
func BenchmarkIterator(b *testing.B) {
126218
foreachFmtSampleCase(b, func(b *testing.B, f fmtCase, s sampleCase) {
127219
b.ReportAllocs()
128220

129221
c := f.newChunkFn()
130-
a, err := c.Appender()
222+
newAppenderFn, stSupported := compatNewAppenderV2(c)
223+
a, err := newAppenderFn()
131224
if err != nil {
132225
b.Fatalf("get appender: %s", err)
133226
}
134227
for _, p := range s.samples {
135-
a.Append(p.t, p.v)
228+
a.Append(p.st, p.t, p.v)
229+
}
230+
231+
// While we are at it, test if encoding/decoding works.
232+
require.Equal(b, len(s.samples), c.NumSamples())
233+
it := c.Iterator(nil)
234+
var got []triple
235+
for i := 0; it.Next() == ValFloat; i++ {
236+
t, v := it.At()
237+
if !stSupported {
238+
// Some formats do not support ST, but we still want to test them.
239+
// inject correct st for comparison purposes.
240+
got = append(got, triple{st: s.samples[i].st, t: t, v: v})
241+
} else {
242+
got = append(got, triple{st: it.AtST(), t: t, v: v})
243+
}
244+
}
245+
if err := it.Err(); err != nil && !errors.Is(err, io.EOF) {
246+
require.NoError(b, err)
136247
}
248+
require.Equal(b, s.samples, got)
137249

138-
var (
139-
sink float64
140-
it Iterator
141-
)
250+
var sink float64
142251
// Measure decoding efficiency.
143252
for i := 0; b.Loop(); {
144253
b.ReportMetric(float64(len(c.Bytes())), "B/chunk")

0 commit comments

Comments
 (0)