Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions common/stats/windowed_tdigest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package stats

import (
"errors"
"sync"
"time"

"github.com/caio/go-tdigest/v5"
)

type (
// TimeWindowedStats collects values into fixed-duration time windows,
// each backed by a t-digest, enabling approximate quantile queries
// over a sliding time range.
// Windows are guaranteed to be non-overlapping.
TimeWindowedStats interface {
// Record records a value that occurred at the given timestamp.
Record(value float64, timestamp time.Time)
// RecordToLatestWindow records a value in the current time window. Faster than Record.
// Warning: This is lower-accuracy than Record, lock-access will
// bias the recording time later than the call-time
RecordToLatestWindow(value float64)
RecordMulti(value float64, timestamp time.Time, count uint64)
RecordMultiToLatestWindow(value float64, count uint64)
// Quantile returns the approximate value at quantile q (0 <= q <= 1)
// across all active windows.
Quantile(q float64) float64
// TrimmedMean returns the approximate mean value within the given
// quantile range (0 <= q <= 1) across all active windows.
TrimmedMean(lowerQuantile, upperQuantile float64) float64
SubWindowForTime(instant time.Time) TimeWindowView
}

TimeWindowView interface {
Quantile(q float64) float64
TrimmedMean(lowerQuantile, upperQuantile float64) float64
Start() time.Time
End() time.Time
}

// WindowConfig controls the windowing behavior.
WindowConfig struct {
// windowSize is the duration of each window.
WindowSize time.Duration
// windowCount is the maximum number of windows to retain.
WindowCount int
// FillBlankIntervals controls whether gaps in the windowing should be preserved.
// This matters when windowSize is shorter than the largest gap in the measured data
// and individual time-windows are being fetched via TimeWindowedStats.SubWindowForTime.
FillBlankIntervals bool
}

timeWindowedTDigest struct {
mu sync.Mutex
windows []timedWindow // ring buffer, pre-allocated to windowCount
head int // index of the newest window
cfg WindowConfig
}

timedWindow struct {
tdigest *tdigest.TDigest
start time.Time
end time.Time
}
)

// NewWindowedTDigest creates a new TimeWindowedStats backed by per-window t-digests.
func NewWindowedTDigest(cfg WindowConfig) TimeWindowedStats {
if cfg.WindowCount <= 0 {
panic("windowCount must be non-negative")
}
if cfg.WindowSize.Milliseconds() <= 50 {
panic("probable misconfiguration detected: windowSize is too small, consider increasing it to at least 50ms")
}
return &timeWindowedTDigest{
windows: make([]timedWindow, cfg.WindowCount),
cfg: cfg,
// mu and head both empty
}
}

func (w *timeWindowedTDigest) Record(value float64, timestamp time.Time) {
w.RecordMulti(value, timestamp, 1)
}

func (w *timeWindowedTDigest) RecordMulti(value float64, timestamp time.Time, count uint64) {
window, err := w.getOrCreateWindow(timestamp)
if err != nil {
// Drop data for invalid timestamps
return
}
_ = window.tdigest.AddWeighted(value, count)
}

func (w *timeWindowedTDigest) RecordToLatestWindow(value float64) {
w.RecordMultiToLatestWindow(value, 1)
}

func (w *timeWindowedTDigest) RecordMultiToLatestWindow(value float64, count uint64) {
window, err := w.getOrCreateWindow(time.Time{})
if err != nil {
return
}
_ = window.tdigest.AddWeighted(value, count)
}

func (w *timeWindowedTDigest) SubWindowForTime(instant time.Time) TimeWindowView {
w.mu.Lock()
defer w.mu.Unlock()
window, _ := w.searchWindowsBackwards(instant)
return window
}

func (w *timeWindowedTDigest) Quantile(q float64) float64 {
return w.getMergedWindows().Quantile(q)
}

func (w *timeWindowedTDigest) TrimmedMean(lowerQuantile, upperQuantile float64) float64 {
return w.getMergedWindows().TrimmedMean(lowerQuantile, upperQuantile)
}

// TODO: this is expensive, maybe cache everything but the latest window so we can skip N merges?
func (w *timeWindowedTDigest) getMergedWindows() *tdigest.TDigest {
windows := w.cloneWindows()
var merged *tdigest.TDigest
for idx := range windows {
if windows[idx].tdigest != nil {
if merged == nil {
merged = windows[idx].tdigest
} else {
_ = merged.Merge(windows[idx].tdigest)
}
}
}
return merged
}

// cloneWindows creates a shallow copy of the ring buffer.
// Use this to avoid holding the lock while accessing the windows for aggregated queries.
func (w *timeWindowedTDigest) cloneWindows() []timedWindow {
w.mu.Lock()
defer w.mu.Unlock()
windows := make([]timedWindow, len(w.windows))
copy(windows, w.windows)
return windows
}

// getOrCreateWindow returns the window containing the given timestamp, creating a new one if necessary.
// 0 is a valid timestamp and will return the most recent window.
func (w *timeWindowedTDigest) getOrCreateWindow(timestamp time.Time) (*timedWindow, error) {
w.mu.Lock()
defer w.mu.Unlock()
if timestamp.IsZero() {
timestamp = time.Now()
}
candidate, err := w.searchWindowsBackwards(timestamp)
if err == nil {
return candidate, nil
}
if errors.Is(err, errTooNew) {
return w.advanceWindow(timestamp), nil
}
// err is errTooOld or errInGap
return nil, err
}

var errTooOld = errors.New("time was older than the earliest window")
var errTooNew = errors.New("time was newer than the latest window")
var errInGap = errors.New("time was in a gap between windows")

// Precondition: w.mu is held.
// Returns the window containing the given timestamp, or error if no window exists.
func (w *timeWindowedTDigest) searchWindowsBackwards(timestamp time.Time) (*timedWindow, error) {
latest := w.windows[w.head]
if !timestamp.Before(latest.start) {
// If the requested timestamp is after the latest window, no point in searching
if !timestamp.Before(latest.end) {
return nil, errTooNew
}
return &latest, nil
}
for idx := w.modDec(w.head); idx != w.head; idx = w.modDec(idx) {
candidate := w.windows[idx]
// Window start is inclusive, end is exclusive. We're iterating
// backwards in time, so the first window that matches is the one we want.
if !timestamp.Before(candidate.start) {
// The first window that matches might be too short to include this timestamp.
// Make sure the timestamp is actually in the window.
if !timestamp.Before(w.windows[idx].end) {
return nil, errInGap
}
return &candidate, nil
}
}
// All the windows are newer than the requested timestamp.
return nil, errTooOld
}

// Precondition: w.mu is held.
func (w *timeWindowedTDigest) advanceWindow(timestamp time.Time) *timedWindow {
if w.cfg.FillBlankIntervals {
// Fill in all the intervening blank windows.
// The empty digests will not affect the stats, but they will drop old windows.
lastWindow := &w.windows[w.head]
if lastWindow.tdigest == nil {
// Special-case, the latest window is uninitialized. Just take it.
return w.advanceWindowSimple(timestamp)
}
for {
lastWindow = w.advanceWindowSimple(lastWindow.end)
// Need a do-while here to enforce that lastWindow contains timestamp
if timestamp.Before(lastWindow.end) {
break
}
}
return lastWindow
}
return w.advanceWindowSimple(timestamp)
}

func (w *timeWindowedTDigest) advanceWindowSimple(start time.Time) *timedWindow {
w.head = w.modInc(w.head)
digest, _ := tdigest.New()
window := timedWindow{
tdigest: digest,
start: start,
end: start.Add(w.cfg.WindowSize),
}
w.windows[w.head] = window
return &window
}

// modulo-increment, for wrapping around the ring buffer
func (w *timeWindowedTDigest) modInc(idx int) int {
return (idx + 1) % w.cfg.WindowCount
}

// modulo-decrement, for wrapping around the ring buffer.
func (w *timeWindowedTDigest) modDec(idx int) int {
return (idx - 1 + w.cfg.WindowCount) % w.cfg.WindowCount
}

func (w *timedWindow) Start() time.Time {
return w.start
}
func (w *timedWindow) End() time.Time {
return w.end
}

func (w *timedWindow) Quantile(q float64) float64 {
return w.tdigest.Quantile(q)
}

func (w *timedWindow) TrimmedMean(lowerQuantile, upperQuantile float64) float64 {
return w.tdigest.TrimmedMean(lowerQuantile, upperQuantile)
}
140 changes: 140 additions & 0 deletions common/stats/windowed_tdigest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package stats

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func n(seconds int) time.Time {
return time.Unix(int64(seconds), 0)
}

type (
RecordingValue struct {
Value float64
Time time.Time
Count uint64
}
TimeWindowedStatsTestCase struct {
Name string
WindowConfig WindowConfig
Expectations []TestExpectation
RecordingValues []RecordingValue
}
TestExpectation func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest)
)

// TestWindowConfig is the default window configuration used in tests.
// It plays nicely with the "n" function above.
var TestWindowConfig = WindowConfig{
WindowSize: 1 * time.Second,
WindowCount: 10,
}
var TestWindowConfigBlanks = WindowConfig{
WindowSize: 1 * time.Second,
WindowCount: 10,
FillBlankIntervals: true,
}

func computeSimpleAverage(values []RecordingValue) map[int]RecordingValue {
averages := make(map[int]RecordingValue)
for _, value := range values {
existing := averages[value.Time.Second()]
averages[value.Time.Second()] = RecordingValue{
existing.Value + value.Value,
existing.Time,
existing.Count + value.Count,
}
}
return averages
}

func countNonEmptyWindows(stats *timeWindowedTDigest) int {
count := 0
for _, bucket := range stats.windows {
if bucket.tdigest != nil {
count++
}
}
return count
}

func incrementingData(start time.Time, count int) []RecordingValue {
RecordingValues := make([]RecordingValue, count)
for i := range count {
RecordingValues[i] = RecordingValue{
Value: float64(i),
Time: start.Add(time.Duration(i) * time.Second),
Count: 1,
}
}
return RecordingValues
}

func TestWindowedDigest(t *testing.T) {
// Use when unique windows are less than the max window count
simpleExpectations := []TestExpectation{
func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) {
averages := computeSimpleAverage(tc.RecordingValues)
require.Equal(t, len(averages), countNonEmptyWindows(stats))
for bucket, value := range averages {
window := stats.SubWindowForTime(n(bucket))
require.InDelta(t, value.Value/float64(value.Count), window.TrimmedMean(0, 1.0), 0.01)
}
},
}

testCases := []TimeWindowedStatsTestCase{
{"empty", TestWindowConfig, simpleExpectations, nil},
{"single-value", TestWindowConfig, simpleExpectations, []RecordingValue{
{float64(10), n(1), 1},
}},
{"full-buckets", TestWindowConfig, simpleExpectations,
incrementingData(n(1), 10),
},
{"overflow", TestWindowConfig, []TestExpectation{
func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) {
require.Equal(t, 10, countNonEmptyWindows(stats))
// datapoints will be 10-19, giving us max=19, min=10, avg=14.5
require.InDelta(t, 14.5, stats.TrimmedMean(0, 1.0), 0.01)
require.InDelta(t, 19, stats.Quantile(1.0), 0.01)
require.InDelta(t, 10, stats.Quantile(0.0), 0.01)
},
}, incrementingData(n(1), 20)},
{"blank-drops-old-data", TestWindowConfigBlanks, []TestExpectation{
func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) {
require.Equal(t, 10, countNonEmptyWindows(stats))
// avg=max=min because the old datapoint expired
require.InDelta(t, 20, stats.TrimmedMean(0, 1.0), 0.01)
require.InDelta(t, 20, stats.Quantile(1.0), 0.01)
require.InDelta(t, 20, stats.Quantile(0.0), 0.01)
}}, []RecordingValue{
{float64(10), n(1), 1},
{float64(20), n(11), 1},
}},
{"fill-blanks-simple", TestWindowConfigBlanks, []TestExpectation{
func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) {
require.Equal(t, 5, countNonEmptyWindows(stats))
require.InDelta(t, 15, stats.TrimmedMean(0, 1.0), 0.01)
require.InDelta(t, 20, stats.Quantile(1.0), 0.01)
require.InDelta(t, 10, stats.Quantile(0.0), 0.01)
}}, []RecordingValue{
{float64(10), n(1), 1},
{float64(20), n(5), 1},
}},
}

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
stats := NewWindowedTDigest(tc.WindowConfig)
for _, value := range tc.RecordingValues {
stats.RecordMulti(value.Value, value.Time, value.Count)
}
for _, expectation := range tc.Expectations {
expectation(t, tc, stats.(*timeWindowedTDigest))
}
})
}
}
Loading
Loading