diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 61f76fb1fdc1..17c80e463f7b 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -146,6 +146,7 @@ go_library( "//pkg/util/httputil", "//pkg/util/humanizeutil", "//pkg/util/intsets", + "//pkg/util/iterutil", "//pkg/util/json", "//pkg/util/log", "//pkg/util/log/channel", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 43ffe2ba38a3..9c5470ff27ac 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1783,7 +1783,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { // The feed's checkpoint is tracked in a map which is used to inform the // checkpoint_progress metric which will return the lowest timestamp across // all feeds in the scope. - cf.sliMetrics.setCheckpoint(cf.sliMetricsID, cf.frontier.Frontier()) + cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved) return cf.maybeEmitResolved(cf.Ctx(), newResolved) } @@ -2226,16 +2226,29 @@ func (cf *changeFrontier) maybeEmitResolved(ctx context.Context, newResolved hlc // updateProgressSkewMetrics updates the progress skew metrics. func (cf *changeFrontier) updateProgressSkewMetrics() { - maxSpanTS := cf.frontier.LatestTS() - maxTableTS := cf.frontier.Frontier() - for _, f := range cf.frontier.Frontiers() { - tableTS := f.Frontier() - if tableTS.After(maxTableTS) { - maxTableTS = tableTS + fastestSpanTS := cf.frontier.LatestTS() + fastestTableTS := func() hlc.Timestamp { + var maxTS hlc.Timestamp + for _, f := range cf.frontier.Frontiers() { + if f.Frontier().After(maxTS) { + maxTS = f.Frontier() + } + } + return maxTS + }() + + slowestTS := cf.frontier.Frontier() + var spanSkew, tableSkew int64 + if slowestTS.IsSet() { + if fastestSpanTS.IsSet() { + spanSkew = fastestSpanTS.WallTime - slowestTS.WallTime + } + if fastestTableTS.IsSet() { + tableSkew = fastestTableTS.WallTime - slowestTS.WallTime } } - cf.sliMetrics.setFastestTS(cf.sliMetricsID, maxSpanTS, maxTableTS) + cf.sliMetrics.setProgressSkew(cf.sliMetricsID, spanSkew, tableSkew) } func frontierIsBehind(frontier hlc.Timestamp, sv *settings.Values) bool { diff --git a/pkg/ccl/changefeedccl/changefeed_progress_test.go b/pkg/ccl/changefeedccl/changefeed_progress_test.go index 0df4aefb4ac3..9297b5b04e6d 100644 --- a/pkg/ccl/changefeedccl/changefeed_progress_test.go +++ b/pkg/ccl/changefeedccl/changefeed_progress_test.go @@ -273,8 +273,7 @@ WITH no_initial_scan, min_checkpoint_frequency='1s', resolved, metrics_label='%s } if !perTableTracking { if tableSkew != 0 { - // TODO(#155083): Return an error here. - return nil + return errors.Newf("expected table skew to be 0, got %d", tableSkew) } return nil } diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index d4c7337f4dcb..83c6a7514ba7 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -6,7 +6,9 @@ package changefeedccl import ( + "cmp" "context" + "maps" "slices" "strings" "sync/atomic" @@ -25,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/cidr" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" @@ -188,11 +191,11 @@ type sliMetrics struct { mu struct { syncutil.Mutex - id int64 - resolved map[int64]hlc.Timestamp - checkpoint map[int64]hlc.Timestamp - fastestSpan map[int64]hlc.Timestamp - fastestTable map[int64]hlc.Timestamp + id int64 + resolved map[int64]hlc.Timestamp + checkpoint map[int64]hlc.Timestamp + spanSkew map[int64]int64 + tableSkew map[int64]int64 } NetMetrics *cidr.NetMetrics @@ -206,8 +209,8 @@ func (m *sliMetrics) closeId(id int64) { defer m.mu.Unlock() delete(m.mu.checkpoint, id) delete(m.mu.resolved, id) - delete(m.mu.fastestSpan, id) - delete(m.mu.fastestTable, id) + delete(m.mu.spanSkew, id) + delete(m.mu.tableSkew, id) } // setResolved writes a resolved timestamp entry for the given id. @@ -228,15 +231,15 @@ func (m *sliMetrics) setCheckpoint(id int64, ts hlc.Timestamp) { } } -// setFastestTS saves the fastest span/table timestamps for a given id. -func (m *sliMetrics) setFastestTS(id int64, spanTS hlc.Timestamp, tableTS hlc.Timestamp) { +// setProgressSkew saves the span skew/table skew for a given ID. +func (m *sliMetrics) setProgressSkew(id int64, spanSkew int64, tableSkew int64) { m.mu.Lock() defer m.mu.Unlock() - if _, ok := m.mu.fastestSpan[id]; ok { - m.mu.fastestSpan[id] = spanTS + if _, ok := m.mu.spanSkew[id]; ok { + m.mu.spanSkew[id] = spanSkew } - if _, ok := m.mu.fastestTable[id]; ok { - m.mu.fastestTable[id] = tableTS + if _, ok := m.mu.tableSkew[id]; ok { + m.mu.tableSkew[id] = tableSkew } } @@ -249,8 +252,8 @@ func (m *sliMetrics) claimId() int64 { // ignored until a nonzero timestamp is written. m.mu.checkpoint[id] = hlc.Timestamp{} m.mu.resolved[id] = hlc.Timestamp{} - m.mu.fastestSpan[id] = hlc.Timestamp{} - m.mu.fastestTable[id] = hlc.Timestamp{} + m.mu.spanSkew[id] = 0 + m.mu.tableSkew[id] = 0 m.mu.id++ return id } @@ -1296,8 +1299,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) - sm.mu.fastestSpan = make(map[int64]hlc.Timestamp) - sm.mu.fastestTable = make(map[int64]hlc.Timestamp) + sm.mu.spanSkew = make(map[int64]int64) + sm.mu.tableSkew = make(map[int64]int64) sm.mu.id = 1 // start the first id at 1 so we can detect intiialization minTimestampGetter := func(m map[int64]hlc.Timestamp) func() int64 { @@ -1328,24 +1331,11 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { } } - maxTimestampSkewGetter := func( - base map[int64]hlc.Timestamp, ahead map[int64]hlc.Timestamp, - ) func() int64 { + maxTimestampSkewGetter := func(m map[int64]int64) func() int64 { return func() int64 { sm.mu.Lock() defer sm.mu.Unlock() - var maxSkew int64 - for id, b := range base { - a := ahead[id] - if a.IsEmpty() || b.IsEmpty() { - continue - } - skew := a.WallTime - b.WallTime - if skew > maxSkew { - maxSkew = skew - } - } - return maxSkew + return iterutil.MaxFunc(maps.Values(m), cmp.Compare) } } @@ -1353,9 +1343,9 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope) sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope) sm.SpanProgressSkew = a.SpanProgressSkew.AddFunctionalChild( - maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestSpan), scope) + maxTimestampSkewGetter(sm.mu.spanSkew), scope) sm.TableProgressSkew = a.TableProgressSkew.AddFunctionalChild( - maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestTable), scope) + maxTimestampSkewGetter(sm.mu.tableSkew), scope) a.mu.sliMetrics[scope] = sm return sm, nil @@ -1364,7 +1354,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { // getLaggingRangesCallback returns a function which can be called to update the // lagging ranges metric. It should be called with the current number of lagging // ranges. -func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) { +func (m *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) { // Because this gauge is shared between changefeeds in the same metrics scope, // we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to // ensure values written by others are not overwritten. The code below is used @@ -1388,10 +1378,10 @@ func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) last.Lock() defer last.Unlock() - s.LaggingRanges.Dec(last.lagging - lagging) + m.LaggingRanges.Dec(last.lagging - lagging) last.lagging = lagging - s.TotalRanges.Dec(last.total - total) + m.TotalRanges.Dec(last.total - total) last.total = total } } diff --git a/pkg/util/iterutil/iterutil.go b/pkg/util/iterutil/iterutil.go index 6d400d625ca9..ca610b2bf2f8 100644 --- a/pkg/util/iterutil/iterutil.go +++ b/pkg/util/iterutil/iterutil.go @@ -78,3 +78,15 @@ func MinFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E { } return m } + +// MaxFunc returns the maximum element in seq, using cmp to compare elements. +// If seq has no values, the zero value is returned. +func MaxFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E { + var m E + for i, v := range Enumerate(seq) { + if i == 0 || cmp(v, m) > 0 { + m = v + } + } + return m +} diff --git a/pkg/util/iterutil/iterutil_test.go b/pkg/util/iterutil/iterutil_test.go index 11f53ab32dcb..d6d00c1a2f0d 100644 --- a/pkg/util/iterutil/iterutil_test.go +++ b/pkg/util/iterutil/iterutil_test.go @@ -95,3 +95,35 @@ func TestMinFunc(t *testing.T) { }) } } + +func TestMaxFunc(t *testing.T) { + intCmp := func(a, b int) int { + return a - b + } + + for name, tc := range map[string]struct { + input []int + }{ + "empty": { + input: nil, + }, + "one element": { + input: []int{1}, + }, + "multiple elements": { + input: []int{1, 3, 2}, + }, + "multiple elements with zero value": { + input: []int{1, 0, 3, 2}, + }, + } { + t.Run(name, func(t *testing.T) { + m := iterutil.MaxFunc(slices.Values(tc.input), intCmp) + if len(tc.input) == 0 { + require.Equal(t, 0, m) + } else { + require.Equal(t, slices.MaxFunc(tc.input, intCmp), m) + } + }) + } +}