Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions downstreamadapter/sink/redo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2026 PingCAP, Inc.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The copyright year is set to 2026, which is in the future. Please correct it to the current year.

Suggested change
// Copyright 2026 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package redo

import (
"time"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/redo"
"github.com/prometheus/client_golang/prometheus"
)
Comment on lines +16 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Run formatter to unblock CI before merge.

The pipeline already reports gci import formatting changed file; please run make fmt and commit the formatting diff for this file.

As per coding guidelines, **/*.go: Use gofmt to keep Go code clean; run make fmt before pushing to format code with gci, gofumports, and shfmt, plus log-style checks.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/metrics.go` around lines 16 - 23, The import
block in metrics.go is not formatted per repository tooling; run the formatter
(make fmt) to apply gci/gofumports/gofmt and reformat the import block and file,
then add and commit the resulting changes so CI no longer reports "gci import
formatting changed file" (ensure the imports for time,
github.com/pingcap/ticdc/pkg/common, pkg/metrics, pkg/redo, and prometheus are
reordered/formatted by the toolchain).


type redoSinkMetrics struct {
changefeedID common.ChangeFeedID

rowWriteLogDuration prometheus.Observer
ddlWriteLogDuration prometheus.Observer

rowTotalCount prometheus.Counter
ddlTotalCount prometheus.Counter

rowWorkerBusyRatio prometheus.Counter
ddlWorkerBusyRatio prometheus.Counter
}

func newRedoSinkMetrics(changefeedID common.ChangeFeedID) *redoSinkMetrics {
keyspace := changefeedID.Keyspace()
changefeed := changefeedID.Name()
return &redoSinkMetrics{
changefeedID: changefeedID,
rowWriteLogDuration: metrics.RedoWriteLogDurationHistogram.
WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType),
ddlWriteLogDuration: metrics.RedoWriteLogDurationHistogram.
WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType),
rowTotalCount: metrics.RedoTotalRowsCountGauge.
WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType),
ddlTotalCount: metrics.RedoTotalRowsCountGauge.
WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType),
Comment on lines +47 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find and inspect RedoTotalRowsCountGauge declaration
rg -n "RedoTotalRowsCountGauge" --type go -A 2 -B 1

Repository: pingcap/ticdc

Length of output: 1698


🏁 Script executed:

# Inspect the metrics.go file to see field types around lines 31-32
fd "metrics.go" -x cat -n {} | grep -A 10 -B 5 "rowTotalCount\|ddlTotalCount"

Repository: pingcap/ticdc

Length of output: 2339


🏁 Script executed:

# Check the struct definition in downstreamadapter/sink/redo/metrics.go
fd "downstreamadapter/sink/redo/metrics.go" -x cat -n {}

Repository: pingcap/ticdc

Length of output: 39


Change rowTotalCount / ddlTotalCount field types to prometheus.Gauge or switch RedoTotalRowsCountGauge to a CounterVec.

RedoTotalRowsCountGauge is declared as a *prometheus.GaugeVec in pkg/metrics/redo.go:54, but the struct fields at lines 31–32 are typed as prometheus.Counter. The GaugeVec.WithLabelValues() method returns a prometheus.Gauge, which is not assignable to a prometheus.Counter field—this will fail at compile time.

Either change the field types to prometheus.Gauge (lines 31–32) to match the returned type, or convert RedoTotalRowsCountGauge to a CounterVec if monotonic row counting is the semantic intent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/metrics.go` around lines 47 - 50, The struct
fields rowTotalCount and ddlTotalCount are typed as prometheus.Counter but you
call RedoTotalRowsCountGauge.WithLabelValues(...) which returns a
prometheus.Gauge; update the field types to prometheus.Gauge to match
RedoTotalRowsCountGauge (or alternatively change RedoTotalRowsCountGauge to a
prometheus.CounterVec if you intend monotonic counts). Locate the struct that
declares rowTotalCount and ddlTotalCount and change their types from
prometheus.Counter to prometheus.Gauge so assignments from
RedoTotalRowsCountGauge.WithLabelValues(...) compile.

rowWorkerBusyRatio: metrics.RedoWorkerBusyRatio.
WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType),
ddlWorkerBusyRatio: metrics.RedoWorkerBusyRatio.
WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType),
}
}

func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) {
if rows > 0 {
m.rowTotalCount.Add(float64(rows))
}
m.rowWriteLogDuration.Observe(duration.Seconds())
m.rowWorkerBusyRatio.Add(duration.Seconds())
}
Comment on lines +58 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Zero-row calls inflate duration histogram and busy ratio.

The rows > 0 guard only protects the counter increment — rowWriteLogDuration.Observe and rowWorkerBusyRatio.Add fire unconditionally. If callers invoke observeRowWrite(0, d) (e.g., empty-batch flushes in sendMessages), non-zero durations will be recorded, distorting both the write-duration histogram and the worker busy ratio.

Consider extending the guard to skip observing duration when there is nothing to flush:

🛡️ Proposed fix
 func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) {
-	if rows > 0 {
-		m.rowTotalCount.Add(float64(rows))
-	}
-	m.rowWriteLogDuration.Observe(duration.Seconds())
-	m.rowWorkerBusyRatio.Add(duration.Seconds())
+	if rows > 0 {
+		m.rowTotalCount.Add(float64(rows))
+		m.rowWriteLogDuration.Observe(duration.Seconds())
+		m.rowWorkerBusyRatio.Add(duration.Seconds())
+	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) {
if rows > 0 {
m.rowTotalCount.Add(float64(rows))
}
m.rowWriteLogDuration.Observe(duration.Seconds())
m.rowWorkerBusyRatio.Add(duration.Seconds())
}
func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) {
if rows > 0 {
m.rowTotalCount.Add(float64(rows))
m.rowWriteLogDuration.Observe(duration.Seconds())
m.rowWorkerBusyRatio.Add(duration.Seconds())
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/metrics.go` around lines 58 - 64, In
observeRowWrite (method redoSinkMetrics.observeRowWrite) avoid recording
duration/ratio when rows==0: either return early or extend the existing rows>0
guard so that m.rowWriteLogDuration.Observe(...) and
m.rowWorkerBusyRatio.Add(...) only run when rows > 0; ensure the counter
m.rowTotalCount.Add(...) behavior is preserved and no other metrics are recorded
for empty-batch flushes.


func (m *redoSinkMetrics) observeDDLWrite(duration time.Duration) {
m.ddlTotalCount.Inc()
m.ddlWriteLogDuration.Observe(duration.Seconds())
m.ddlWorkerBusyRatio.Add(duration.Seconds())
}

func (m *redoSinkMetrics) close() {
keyspace := m.changefeedID.Keyspace()
changefeed := m.changefeedID.Name()

for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} {
metrics.RedoWriteLogDurationHistogram.DeleteLabelValues(keyspace, changefeed, logType)
metrics.RedoTotalRowsCountGauge.DeleteLabelValues(keyspace, changefeed, logType)
metrics.RedoWorkerBusyRatio.DeleteLabelValues(keyspace, changefeed, logType)
}
}
105 changes: 53 additions & 52 deletions downstreamadapter/sink/redo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/redo"
"github.com/pingcap/ticdc/pkg/redo/writer"
"github.com/pingcap/ticdc/pkg/redo/writer/factory"
Expand All @@ -44,9 +43,10 @@ type Sink struct {
logBuffer *chann.UnlimitedChannel[writer.RedoEvent, any]

// isNormal indicate whether the sink is in the normal state.
isNormal *atomic.Bool
isClosed *atomic.Bool
statistics *metrics.Statistics
isNormal *atomic.Bool
isClosed *atomic.Bool

metric *redoSinkMetrics
}

func Verify(ctx context.Context, changefeedID common.ChangeFeedID, cfg *config.ConsistentConfig) error {
Expand All @@ -68,10 +68,9 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: util.GetOrZero(cfg.MaxLogSize) * redo.Megabyte,
},
logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](),
isNormal: atomic.NewBool(true),
isClosed: atomic.NewBool(false),
statistics: metrics.NewStatistics(changefeedID, "redo"),
logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](),
isNormal: atomic.NewBool(true),
isClosed: atomic.NewBool(false),
}
start := time.Now()
ddlWriter, err := factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoDDLLogFileType)
Expand All @@ -94,6 +93,7 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID,
}
s.ddlWriter = ddlWriter
s.dmlWriter = dmlWriter
s.metric = newRedoSinkMetrics(changefeedID)
return s
}

Expand All @@ -114,51 +114,49 @@ func (s *Sink) Run(ctx context.Context) error {
func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch e := event.(type) {
case *commonEvent.DDLEvent:
err := s.statistics.RecordDDLExecution(func() (string, error) {
ddlType := e.GetDDLType().String()
return ddlType, s.ddlWriter.WriteEvents(s.ctx, e)
})
start := time.Now()
err := s.ddlWriter.WriteEvents(s.ctx, e)
if err != nil {
s.isNormal.Store(false)
return errors.Trace(err)
}
if s.metric != nil {
s.metric.observeDDLWrite(time.Since(start))
}
}
return nil
}

func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

By removing statistics.RecordBatchExecution, we've lost valuable batch-level metrics such as ExecBatchHistogram (batch size in rows) and ExecBatchWriteBytesHistogram (batch size in bytes). While new per-row metrics are added, these batch metrics are important for monitoring and performance tuning. Was this intentional? If not, please consider reintroducing them or providing an equivalent.

_ = s.statistics.RecordBatchExecution(func() (int, int64, error) {
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
}
s.logBuffer.Push(&commonEvent.RedoRowEvent{
StartTs: event.StartTs,
CommitTs: event.CommitTs,
Event: row,
PhysicalTableID: event.PhysicalTableID,
TableInfo: event.TableInfo,
Callback: rowCallback,
})
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
Comment on lines +131 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify DMLEvent size semantics and row iteration behavior.
rg -n --type=go -C3 'type\s+DMLEvent\s+struct|func\s*\(.*\*DMLEvent\)\s*Len\s*\(|func\s*\(.*\*DMLEvent\)\s*GetNextRow\s*\('

# Find all AddDMLEvent call sites and nearby guards.
rg -n --type=go -C4 '\.AddDMLEvent\s*\('

# Check whether empty-event guards already exist upstream.
rg -n --type=go -C3 'Len\(\)\s*==\s*0|Len\(\)\s*>\s*0'

Repository: pingcap/ticdc

Length of output: 26689


🏁 Script executed:

#!/bin/bash
# Examine the AddDMLEvent implementation in redo/sink.go around lines 131-149
cat -n downstreamadapter/sink/redo/sink.go | sed -n '120,160p'

# Check the basic_dispatcher context to see if it filters empty events before AddDMLEvent
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '285,310p'

# Look for event creation to understand if empty events are possible
rg -n --type=go -B5 -A2 'PostTxnFlushed.*=' | head -50

Repository: pingcap/ticdc

Length of output: 5458


🏁 Script executed:

#!/bin/bash
# Let me check the broader context of line 607 in basic_dispatcher to understand if it filters before AddDMLEvent
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '580,620p'

# Also check if there are other AddDMLEvent calls elsewhere that might guard against empty events
rg -n --type=go -B10 'AddDMLEvent' downstreamadapter/dispatcher/basic_dispatcher.go | head -40

Repository: pingcap/ticdc

Length of output: 3384


🏁 Script executed:

#!/bin/bash
# Check what creates DMLEvent instances and whether they can be empty
rg -n --type=go -B5 'Length\s*=' pkg/common/event/dml_event.go | head -60

# Check the filtering logic in AddDMLEventsToSink more carefully
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '265,290p'

# Verify if there are other code paths that might bypass the empty event guard
rg -n --type=go 'AddDMLEvent' downstreamadapter/ | grep -v test | grep -v '\.AddDMLEvent'

Repository: pingcap/ticdc

Length of output: 3060


🏁 Script executed:

#!/bin/bash
# Check FilterDMLEvent implementation to see if it can return empty events
rg -n --type=go -A20 'func FilterDMLEvent' pkg/common/event/

# Check if there are code paths calling AddDMLEvent directly (not through basic_dispatcher)
rg -n --type=go 'sink\.AddDMLEvent' --glob='!test' | grep -v basic_dispatcher

Repository: pingcap/ticdc

Length of output: 1884


🏁 Script executed:

#!/bin/bash
# Check the storage-consumer path to see if it guards against empty events
cat -n cmd/storage-consumer/consumer.go | sed -n '345,365p'

# Also verify if FilterDMLEvent can return an event with Length == 0
rg -n --type=go -A50 'func FilterDMLEvent' pkg/common/event/active_active.go | sed -n '1,80p'

# Check if there's any code path in redo sink tests that tests empty events
rg -n --type=go 'Len.*0|empty' downstreamadapter/sink/redo/sink_test.go | head -20

Repository: pingcap/ticdc

Length of output: 2338


🏁 Script executed:

#!/bin/bash
# Check if redo sink is used in storage-consumer
rg -n --type=go 'redo.*sink|NewSink' cmd/storage-consumer/consumer.go | head -20

# Check all direct calls to sink.AddDMLEvent (not through dispatcher)
rg -n --type=go 'sink\.AddDMLEvent|\.AddDMLEvent' cmd/ pkg/applier/ --glob='!test' -B3 -A1 | grep -v 'test\|Test'

# Check FilterDMLEvent to see if it can produce an empty event (check the return)
rg -n --type=go -A100 'func FilterDMLEvent' pkg/common/event/active_active.go | sed -n '1,120p'

Repository: pingcap/ticdc

Length of output: 4581


🏁 Script executed:

#!/bin/bash
# Check if redo sink is directly instantiated outside of dispatcher context
rg -n --type=go 'NewSink|redo\.Sink' downstreamadapter/sink/redo/ --glob='!test'

# Check all sink interface implementations and their AddDMLEvent calls
rg -n --type=go 'type.*sink.*interface|interface.*Sink' downstreamadapter/sink/

# Verify if there are entry points that create events and pass them directly to sink
rg -n --type=go -B5 'GetAllEvents\(\)' cmd/

Repository: pingcap/ticdc

Length of output: 745


🏁 Script executed:

#!/bin/bash
# Check the GetAllEvents implementation and if it can return empty events
cat -n cmd/util/event_group.go | sed -n '120,135p'

# Check if storage-consumer or applier code could create empty DML events
rg -n --type=go -B10 'AddDMLEvent' pkg/applier/redo.go | head -30

# Check event group to understand event creation
cat -n cmd/util/event_group.go | sed -n '1,50p'

Repository: pingcap/ticdc

Length of output: 2748


🏁 Script executed:

#!/bin/bash
# Check if applier.redo.go filters events before AddDMLEvent call
cat -n pkg/applier/redo.go | sed -n '385,410p'

# Check storage-consumer to see if it filters events
cat -n cmd/storage-consumer/consumer.go | sed -n '340,365p'

# Verify if event groups can have empty events
rg -n --type=go -B10 'AppendEvent\|AddEvent' cmd/util/event_group.go | head -40

Repository: pingcap/ticdc

Length of output: 1582


🏁 Script executed:

#!/bin/bash
# Verify if FilterDMLEvent can actually return an event with Length == 0
cat -n pkg/common/event/active_active.go | sed -n '440,470p'

# Check if there's any validation after filtering in AddDMLEventsToSink
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '273,302p'

# Double-check: is there any guard that checks event.Len() after FilterDMLEvent?
rg -n --type=go -A10 'FilterDMLEvent' downstreamadapter/dispatcher/basic_dispatcher.go

Repository: pingcap/ticdc

Length of output: 4183


🏁 Script executed:

#!/bin/bash
# Final verification: check if there are any other guards after AddDMLEvent in sink implementations
rg -n --type=go -A15 'func.*AddDMLEvent' downstreamadapter/sink/redo/sink.go | head -30

# Check if other sink implementations have similar patterns
rg -n --type=go -A10 'func.*AddDMLEvent' downstreamadapter/sink/mysql/sink.go | head -20

Repository: pingcap/ticdc

Length of output: 1085


Guard against empty DML events to prevent silently skipped transaction callbacks.

If event.Len() is 0 (e.g., after filtering in active-active mode), no row callbacks are scheduled, causing postTxnFlushed hooks to never run. This issue exists in redo sink and should be handled consistently with other sinks that already guard this case.

🔧 Defensive guard
 	rowsCount := uint64(event.Len())
+	if rowsCount == 0 {
+		for _, callback := range event.PostTxnFlushed {
+			callback()
+		}
+		return
+	}
 	rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
}
s.logBuffer.Push(&commonEvent.RedoRowEvent{
StartTs: event.StartTs,
CommitTs: event.CommitTs,
Event: row,
PhysicalTableID: event.PhysicalTableID,
TableInfo: event.TableInfo,
Callback: rowCallback,
})
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
}
rowsCount := uint64(event.Len())
if rowsCount == 0 {
for _, callback := range event.PostTxnFlushed {
callback()
}
return
}
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 131 - 149, The code builds
rowCallback via toRowCallback using rowsCount := uint64(event.Len()) but doesn't
handle the case rowsCount == 0, so event.PostTxnFlushed callbacks are never
invoked; update the logic around toRowCallback/rowCallback (and before entering
the GetNextRow loop) to check if event.Len() is zero and, if so, immediately
invoke each callback in event.PostTxnFlushed (or call the returned rowCallback
once) and then call event.Rewind() / return as appropriate so empty DML events
don't silently skip transaction callbacks; modify code paths that reference
toRowCallback, rowCallback, event.PostTxnFlushed, event.Len(),
event.GetNextRow(), and event.Rewind() accordingly.

}
return int(event.Len()), event.GetSize(), nil
})
s.logBuffer.Push(&commonEvent.RedoRowEvent{
StartTs: event.StartTs,
CommitTs: event.CommitTs,
Event: row,
PhysicalTableID: event.PhysicalTableID,
TableInfo: event.TableInfo,
Callback: rowCallback,
})
}
}

func (s *Sink) IsNormal() bool {
Expand Down Expand Up @@ -194,8 +192,8 @@ func (s *Sink) Close(_ bool) {
zap.Error(err))
}
}
if s.statistics != nil {
s.statistics.Close()
if s.metric != nil {
s.metric.close()
}
log.Info("redo sink closed",
zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()),
Expand All @@ -204,16 +202,19 @@ func (s *Sink) Close(_ bool) {

func (s *Sink) sendMessages(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if e, ok := s.logBuffer.Get(); ok {
err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
return err
}
}
e, ok := s.logBuffer.Get()
if !ok {
return nil
}

start := time.Now()
err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
return err
}
Comment on lines +211 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wrap DML writer errors before returning from the send loop.

At Line 213, err is returned without stack context. This path should mirror the wrapped-error handling used elsewhere.

🔧 Proposed fix
 		err := s.dmlWriter.WriteEvents(ctx, e)
 		if err != nil {
-			return err
+			return errors.Trace(err)
 		}

As per coding guidelines, when an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace; upstream callers should propagate wrapped errors without wrapping again.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
return err
}
err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
return errors.Trace(err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/redo/sink.go` around lines 211 - 214, The call to
s.dmlWriter.WriteEvents in the send loop returns err directly; change the return
to wrap the library error with the project's tracing wrapper (e.g.,
errors.Trace(err) or errors.WrapError(err)) before returning so the error
includes stack context—locate the WriteEvents call on s.dmlWriter in the send
loop and replace the plain return err with a wrapped error return.


if s.metric != nil {
s.metric.observeRowWrite(1, time.Since(start))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions metrics/grafana/ticdc_new_arch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22311,7 +22311,7 @@
"targets": [
{
"exemplar": true,
"expr": "max(rate(ticdc_redo_flushall_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"expr": "max(rate(ticdc_redo_flush_all_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"format": "heatmap",
"interval": "",
"intervalFactor": 2,
Expand Down Expand Up @@ -22751,7 +22751,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)",
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])*100) by (changefeed,instance)",
"interval": "",
"legendFormat": "{{changefeed}}-{{instance}}",
"queryType": "randomWalk",
Expand Down Expand Up @@ -25515,4 +25515,4 @@
"title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch",
"uid": "YiGL8hBZ0aac",
"version": 38
}
}
6 changes: 3 additions & 3 deletions metrics/nextgengrafana/ticdc_new_arch_next_gen.json
Original file line number Diff line number Diff line change
Expand Up @@ -22311,7 +22311,7 @@
"targets": [
{
"exemplar": true,
"expr": "max(rate(ticdc_redo_flushall_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"expr": "max(rate(ticdc_redo_flush_all_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"format": "heatmap",
"interval": "",
"intervalFactor": 2,
Expand Down Expand Up @@ -22751,7 +22751,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",sharedpool_id=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)",
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",sharedpool_id=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])*100) by (changefeed,instance)",
"interval": "",
"legendFormat": "{{changefeed}}-{{instance}}",
"queryType": "randomWalk",
Expand Down Expand Up @@ -25515,4 +25515,4 @@
"title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch",
"uid": "YiGL8hBZ0aac",
"version": 38
}
}
29 changes: 29 additions & 0 deletions pkg/metrics/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoTotalRowsCountGauge records the total number of rows written to redo log.
RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_rows_count",
Help: "The total count of rows that are processed by redo writer",
}, []string{getKeyspaceLabel(), "changefeed", "type"})
Comment on lines +53 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find where RedoTotalRowsCountGauge is used in the codebase
rg "RedoTotalRowsCountGauge" -A 3 -B 1

Repository: pingcap/ticdc

Length of output: 1911


🌐 Web query:

Prometheus Counter vs Gauge best practices monotonically increasing total metrics

💡 Result:

Rule of thumb

  • Use a Counter for any metric that is monotonically increasing (it may reset to 0 on process restart). These are the right fit for “total” event counts: requests, errors, bytes processed, jobs completed, etc. [1]
  • Use a Gauge for values that can go up and down (queue length, in-flight requests, temperature, current memory usage, current number of active sessions, etc.). (Implied by the counter guidance: don’t use counters for values that decrease.) [3]

“Total” metrics best practices (monotonically increasing)

  1. If it’s a lifetime cumulative count, make it a Counter and name it *_total.
    Prometheus naming best practice is that “an accumulating count has a total suffix” (e.g., http_requests_total). [1]
  2. Don’t export rates; export totals and calculate rates in PromQL.
    Use rate(http_requests_total[5m]) / irate() for per-second rates. [3]
  3. Expect and handle resets (restarts) via PromQL functions.
    rate()/increase() are designed to cope with counter resets caused by restarts. [3]
  4. If you create recording rules from counters, drop _total after applying rate()/irate().
    Prometheus rule naming guidance: strip _total off counters when using rate()/irate() in recording rules. [2]

Common confusion: _total vs _count

  • Regular counters should end in _total. [1]
  • Histograms and summaries automatically expose series like *_count and *_sum (so you’ll see counters that end with _count there), which is why this looks inconsistent in practice and is discussed in Prometheus docs/issues. [4]

Quick decision table

  • “Total number of requests since process start” → Counter: http_requests_total [1]
  • “Current requests in flight right now” → Gauge: http_requests_in_flight
  • “Total time spent in handlers” → Counter (unit + total): http_request_duration_seconds_total (then rate(...) for seconds/sec) [1][3]
  • “Current queue depth” → Gauge: queue_depth

Sources: Prometheus naming best practices [1], Prometheus recording rules naming [2], counter usage and rate() guidance [3], Prometheus docs discussion on _total vs _count [4].


Use CounterVec instead of GaugeVec for RedoTotalRowsCountGauge.

This metric represents a monotonically increasing total (as indicated by the name and help text). Prometheus best practices recommend using Counter for cumulative counts, not Gauge. Additionally, the metric name should follow Prometheus naming conventions by using the _total suffix (e.g., total_rows_count_total).

♻️ Proposed change
-	RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+	RedoTotalRowsCountGauge = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: namespace,
 		Subsystem: subsystem,
-		Name:      "total_rows_count",
+		Name:      "total_rows_count_total",
 		Help:      "The total count of rows that are processed by redo writer",
 	}, []string{getKeyspaceLabel(), "changefeed", "type"})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// RedoTotalRowsCountGauge records the total number of rows written to redo log.
RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_rows_count",
Help: "The total count of rows that are processed by redo writer",
}, []string{getKeyspaceLabel(), "changefeed", "type"})
// RedoTotalRowsCountGauge records the total number of rows written to redo log.
RedoTotalRowsCountGauge = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_rows_count_total",
Help: "The total count of rows that are processed by redo writer",
}, []string{getKeyspaceLabel(), "changefeed", "type"})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/metrics/redo.go` around lines 53 - 59, The metric RedoTotalRowsCountGauge
should be a CounterVec not a GaugeVec: replace the variable
RedoTotalRowsCountGauge and its creation via prometheus.NewGaugeVec with a
CounterVec using prometheus.NewCounterVec (or rename the variable to
RedoTotalRowsCountCounter), change prometheus.GaugeOpts to
prometheus.CounterOpts and update the metric Name from "total_rows_count" to
"total_rows_count_total" while keeping the same Namespace, Subsystem, Help text
(adjust help to indicate a cumulative counter) and labels
([]string{getKeyspaceLabel(), "changefeed", "type"}).


// RedoWriteLogDurationHistogram records the latency distributions of writeLog.
RedoWriteLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "write_log_duration_seconds",
Help: "The latency distributions of writeLog called by redo sink",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoFlushLogDurationHistogram records the latency distributions of flushLog.
RedoFlushLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Expand All @@ -58,11 +75,23 @@ var (
Help: "The latency distributions of flushLog called by redo sink",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoWorkerBusyRatio records the busy ratio of redo sink worker.
RedoWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "worker_busy_ratio",
Help: "Busy ratio for redo sink worker.",
}, []string{getKeyspaceLabel(), "changefeed", "type"})
)

func initRedoMetrics(registry *prometheus.Registry) {
registry.MustRegister(RedoFsyncDurationHistogram)
registry.MustRegister(RedoTotalRowsCountGauge)
registry.MustRegister(RedoWriteBytesGauge)
registry.MustRegister(RedoFlushAllDurationHistogram)
registry.MustRegister(RedoWriteLogDurationHistogram)
registry.MustRegister(RedoFlushLogDurationHistogram)
registry.MustRegister(RedoWorkerBusyRatio)
}