-
Notifications
You must be signed in to change notification settings - Fork 41
metrics: add redo metrics #4207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,81 @@ | ||||||||||||||||||||||||||||||
| // Copyright 2026 PingCAP, Inc. | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||
| // you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||
| // You may obtain a copy of the License at | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| // Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||
| // See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||
| // limitations under the License. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| package redo | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/common" | ||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/metrics" | ||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/redo" | ||||||||||||||||||||||||||||||
| "github.com/prometheus/client_golang/prometheus" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
Comment on lines
+16
to
+23
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Run formatter to unblock CI before merge. The pipeline already reports As per coding guidelines, 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| type redoSinkMetrics struct { | ||||||||||||||||||||||||||||||
| changefeedID common.ChangeFeedID | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| rowWriteLogDuration prometheus.Observer | ||||||||||||||||||||||||||||||
| ddlWriteLogDuration prometheus.Observer | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| rowTotalCount prometheus.Counter | ||||||||||||||||||||||||||||||
| ddlTotalCount prometheus.Counter | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| rowWorkerBusyRatio prometheus.Counter | ||||||||||||||||||||||||||||||
| ddlWorkerBusyRatio prometheus.Counter | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func newRedoSinkMetrics(changefeedID common.ChangeFeedID) *redoSinkMetrics { | ||||||||||||||||||||||||||||||
| keyspace := changefeedID.Keyspace() | ||||||||||||||||||||||||||||||
| changefeed := changefeedID.Name() | ||||||||||||||||||||||||||||||
| return &redoSinkMetrics{ | ||||||||||||||||||||||||||||||
| changefeedID: changefeedID, | ||||||||||||||||||||||||||||||
| rowWriteLogDuration: metrics.RedoWriteLogDurationHistogram. | ||||||||||||||||||||||||||||||
| WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType), | ||||||||||||||||||||||||||||||
| ddlWriteLogDuration: metrics.RedoWriteLogDurationHistogram. | ||||||||||||||||||||||||||||||
| WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType), | ||||||||||||||||||||||||||||||
| rowTotalCount: metrics.RedoTotalRowsCountGauge. | ||||||||||||||||||||||||||||||
| WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType), | ||||||||||||||||||||||||||||||
| ddlTotalCount: metrics.RedoTotalRowsCountGauge. | ||||||||||||||||||||||||||||||
| WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType), | ||||||||||||||||||||||||||||||
|
Comment on lines
+47
to
+50
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # Find and inspect RedoTotalRowsCountGauge declaration
rg -n "RedoTotalRowsCountGauge" --type go -A 2 -B 1Repository: pingcap/ticdc Length of output: 1698 🏁 Script executed: # Inspect the metrics.go file to see field types around lines 31-32
fd "metrics.go" -x cat -n {} | grep -A 10 -B 5 "rowTotalCount\|ddlTotalCount"Repository: pingcap/ticdc Length of output: 2339 🏁 Script executed: # Check the struct definition in downstreamadapter/sink/redo/metrics.go
fd "downstreamadapter/sink/redo/metrics.go" -x cat -n {}Repository: pingcap/ticdc Length of output: 39 Change
Either change the field types to 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| rowWorkerBusyRatio: metrics.RedoWorkerBusyRatio. | ||||||||||||||||||||||||||||||
| WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType), | ||||||||||||||||||||||||||||||
| ddlWorkerBusyRatio: metrics.RedoWorkerBusyRatio. | ||||||||||||||||||||||||||||||
| WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType), | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) { | ||||||||||||||||||||||||||||||
| if rows > 0 { | ||||||||||||||||||||||||||||||
| m.rowTotalCount.Add(float64(rows)) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| m.rowWriteLogDuration.Observe(duration.Seconds()) | ||||||||||||||||||||||||||||||
| m.rowWorkerBusyRatio.Add(duration.Seconds()) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+58
to
+64
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Zero-row calls inflate duration histogram and busy ratio. The Consider extending the guard to skip observing duration when there is nothing to flush: 🛡️ Proposed fix func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) {
- if rows > 0 {
- m.rowTotalCount.Add(float64(rows))
- }
- m.rowWriteLogDuration.Observe(duration.Seconds())
- m.rowWorkerBusyRatio.Add(duration.Seconds())
+ if rows > 0 {
+ m.rowTotalCount.Add(float64(rows))
+ m.rowWriteLogDuration.Observe(duration.Seconds())
+ m.rowWorkerBusyRatio.Add(duration.Seconds())
+ }
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func (m *redoSinkMetrics) observeDDLWrite(duration time.Duration) { | ||||||||||||||||||||||||||||||
| m.ddlTotalCount.Inc() | ||||||||||||||||||||||||||||||
| m.ddlWriteLogDuration.Observe(duration.Seconds()) | ||||||||||||||||||||||||||||||
| m.ddlWorkerBusyRatio.Add(duration.Seconds()) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func (m *redoSinkMetrics) close() { | ||||||||||||||||||||||||||||||
| keyspace := m.changefeedID.Keyspace() | ||||||||||||||||||||||||||||||
| changefeed := m.changefeedID.Name() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} { | ||||||||||||||||||||||||||||||
| metrics.RedoWriteLogDurationHistogram.DeleteLabelValues(keyspace, changefeed, logType) | ||||||||||||||||||||||||||||||
| metrics.RedoTotalRowsCountGauge.DeleteLabelValues(keyspace, changefeed, logType) | ||||||||||||||||||||||||||||||
| metrics.RedoWorkerBusyRatio.DeleteLabelValues(keyspace, changefeed, logType) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,7 +22,6 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| commonEvent "github.com/pingcap/ticdc/pkg/common/event" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/config" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/metrics" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/redo" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/redo/writer" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/pingcap/ticdc/pkg/redo/writer/factory" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -44,9 +43,10 @@ type Sink struct { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logBuffer *chann.UnlimitedChannel[writer.RedoEvent, any] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // isNormal indicate whether the sink is in the normal state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isNormal *atomic.Bool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isClosed *atomic.Bool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| statistics *metrics.Statistics | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isNormal *atomic.Bool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isClosed *atomic.Bool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metric *redoSinkMetrics | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func Verify(ctx context.Context, changefeedID common.ChangeFeedID, cfg *config.ConsistentConfig) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -68,10 +68,9 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ChangeFeedID: changefeedID, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| MaxLogSizeInBytes: util.GetOrZero(cfg.MaxLogSize) * redo.Megabyte, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isNormal: atomic.NewBool(true), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isClosed: atomic.NewBool(false), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| statistics: metrics.NewStatistics(changefeedID, "redo"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isNormal: atomic.NewBool(true), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isClosed: atomic.NewBool(false), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start := time.Now() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ddlWriter, err := factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoDDLLogFileType) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -94,6 +93,7 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.ddlWriter = ddlWriter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.dmlWriter = dmlWriter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.metric = newRedoSinkMetrics(changefeedID) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return s | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -114,51 +114,49 @@ func (s *Sink) Run(ctx context.Context) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| switch e := event.(type) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case *commonEvent.DDLEvent: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := s.statistics.RecordDDLExecution(func() (string, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ddlType := e.GetDDLType().String() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return ddlType, s.ddlWriter.WriteEvents(s.ctx, e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start := time.Now() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := s.ddlWriter.WriteEvents(s.ctx, e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.isNormal.Store(false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if s.metric != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.metric.observeDDLWrite(time.Since(start)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By removing |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _ = s.statistics.RecordBatchExecution(func() (int, int64, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var calledCount atomic.Uint64 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // The callback of the last row will trigger the callback of the txn. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if calledCount.Inc() == totalCount { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for _, callback := range postTxnFlushed { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| callback() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var calledCount atomic.Uint64 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // The callback of the last row will trigger the callback of the txn. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if calledCount.Inc() == totalCount { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for _, callback := range postTxnFlushed { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| callback() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| rowsCount := uint64(event.Len()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| row, ok := event.GetNextRow() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event.Rewind() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.logBuffer.Push(&commonEvent.RedoRowEvent{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| StartTs: event.StartTs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CommitTs: event.CommitTs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Event: row, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PhysicalTableID: event.PhysicalTableID, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TableInfo: event.TableInfo, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Callback: rowCallback, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| rowsCount := uint64(event.Len()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| row, ok := event.GetNextRow() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event.Rewind() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+131
to
+149
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
# Verify DMLEvent size semantics and row iteration behavior.
rg -n --type=go -C3 'type\s+DMLEvent\s+struct|func\s*\(.*\*DMLEvent\)\s*Len\s*\(|func\s*\(.*\*DMLEvent\)\s*GetNextRow\s*\('
# Find all AddDMLEvent call sites and nearby guards.
rg -n --type=go -C4 '\.AddDMLEvent\s*\('
# Check whether empty-event guards already exist upstream.
rg -n --type=go -C3 'Len\(\)\s*==\s*0|Len\(\)\s*>\s*0'Repository: pingcap/ticdc Length of output: 26689 🏁 Script executed: #!/bin/bash
# Examine the AddDMLEvent implementation in redo/sink.go around lines 131-149
cat -n downstreamadapter/sink/redo/sink.go | sed -n '120,160p'
# Check the basic_dispatcher context to see if it filters empty events before AddDMLEvent
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '285,310p'
# Look for event creation to understand if empty events are possible
rg -n --type=go -B5 -A2 'PostTxnFlushed.*=' | head -50Repository: pingcap/ticdc Length of output: 5458 🏁 Script executed: #!/bin/bash
# Let me check the broader context of line 607 in basic_dispatcher to understand if it filters before AddDMLEvent
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '580,620p'
# Also check if there are other AddDMLEvent calls elsewhere that might guard against empty events
rg -n --type=go -B10 'AddDMLEvent' downstreamadapter/dispatcher/basic_dispatcher.go | head -40Repository: pingcap/ticdc Length of output: 3384 🏁 Script executed: #!/bin/bash
# Check what creates DMLEvent instances and whether they can be empty
rg -n --type=go -B5 'Length\s*=' pkg/common/event/dml_event.go | head -60
# Check the filtering logic in AddDMLEventsToSink more carefully
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '265,290p'
# Verify if there are other code paths that might bypass the empty event guard
rg -n --type=go 'AddDMLEvent' downstreamadapter/ | grep -v test | grep -v '\.AddDMLEvent'Repository: pingcap/ticdc Length of output: 3060 🏁 Script executed: #!/bin/bash
# Check FilterDMLEvent implementation to see if it can return empty events
rg -n --type=go -A20 'func FilterDMLEvent' pkg/common/event/
# Check if there are code paths calling AddDMLEvent directly (not through basic_dispatcher)
rg -n --type=go 'sink\.AddDMLEvent' --glob='!test' | grep -v basic_dispatcherRepository: pingcap/ticdc Length of output: 1884 🏁 Script executed: #!/bin/bash
# Check the storage-consumer path to see if it guards against empty events
cat -n cmd/storage-consumer/consumer.go | sed -n '345,365p'
# Also verify if FilterDMLEvent can return an event with Length == 0
rg -n --type=go -A50 'func FilterDMLEvent' pkg/common/event/active_active.go | sed -n '1,80p'
# Check if there's any code path in redo sink tests that tests empty events
rg -n --type=go 'Len.*0|empty' downstreamadapter/sink/redo/sink_test.go | head -20Repository: pingcap/ticdc Length of output: 2338 🏁 Script executed: #!/bin/bash
# Check if redo sink is used in storage-consumer
rg -n --type=go 'redo.*sink|NewSink' cmd/storage-consumer/consumer.go | head -20
# Check all direct calls to sink.AddDMLEvent (not through dispatcher)
rg -n --type=go 'sink\.AddDMLEvent|\.AddDMLEvent' cmd/ pkg/applier/ --glob='!test' -B3 -A1 | grep -v 'test\|Test'
# Check FilterDMLEvent to see if it can produce an empty event (check the return)
rg -n --type=go -A100 'func FilterDMLEvent' pkg/common/event/active_active.go | sed -n '1,120p'Repository: pingcap/ticdc Length of output: 4581 🏁 Script executed: #!/bin/bash
# Check if redo sink is directly instantiated outside of dispatcher context
rg -n --type=go 'NewSink|redo\.Sink' downstreamadapter/sink/redo/ --glob='!test'
# Check all sink interface implementations and their AddDMLEvent calls
rg -n --type=go 'type.*sink.*interface|interface.*Sink' downstreamadapter/sink/
# Verify if there are entry points that create events and pass them directly to sink
rg -n --type=go -B5 'GetAllEvents\(\)' cmd/Repository: pingcap/ticdc Length of output: 745 🏁 Script executed: #!/bin/bash
# Check the GetAllEvents implementation and if it can return empty events
cat -n cmd/util/event_group.go | sed -n '120,135p'
# Check if storage-consumer or applier code could create empty DML events
rg -n --type=go -B10 'AddDMLEvent' pkg/applier/redo.go | head -30
# Check event group to understand event creation
cat -n cmd/util/event_group.go | sed -n '1,50p'Repository: pingcap/ticdc Length of output: 2748 🏁 Script executed: #!/bin/bash
# Check if applier.redo.go filters events before AddDMLEvent call
cat -n pkg/applier/redo.go | sed -n '385,410p'
# Check storage-consumer to see if it filters events
cat -n cmd/storage-consumer/consumer.go | sed -n '340,365p'
# Verify if event groups can have empty events
rg -n --type=go -B10 'AppendEvent\|AddEvent' cmd/util/event_group.go | head -40Repository: pingcap/ticdc Length of output: 1582 🏁 Script executed: #!/bin/bash
# Verify if FilterDMLEvent can actually return an event with Length == 0
cat -n pkg/common/event/active_active.go | sed -n '440,470p'
# Check if there's any validation after filtering in AddDMLEventsToSink
cat -n downstreamadapter/dispatcher/basic_dispatcher.go | sed -n '273,302p'
# Double-check: is there any guard that checks event.Len() after FilterDMLEvent?
rg -n --type=go -A10 'FilterDMLEvent' downstreamadapter/dispatcher/basic_dispatcher.goRepository: pingcap/ticdc Length of output: 4183 🏁 Script executed: #!/bin/bash
# Final verification: check if there are any other guards after AddDMLEvent in sink implementations
rg -n --type=go -A15 'func.*AddDMLEvent' downstreamadapter/sink/redo/sink.go | head -30
# Check if other sink implementations have similar patterns
rg -n --type=go -A10 'func.*AddDMLEvent' downstreamadapter/sink/mysql/sink.go | head -20Repository: pingcap/ticdc Length of output: 1085 Guard against empty DML events to prevent silently skipped transaction callbacks. If 🔧 Defensive guard rowsCount := uint64(event.Len())
+ if rowsCount == 0 {
+ for _, callback := range event.PostTxnFlushed {
+ callback()
+ }
+ return
+ }
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return int(event.Len()), event.GetSize(), nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.logBuffer.Push(&commonEvent.RedoRowEvent{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| StartTs: event.StartTs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CommitTs: event.CommitTs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Event: row, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PhysicalTableID: event.PhysicalTableID, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TableInfo: event.TableInfo, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Callback: rowCallback, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (s *Sink) IsNormal() bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -194,8 +192,8 @@ func (s *Sink) Close(_ bool) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| zap.Error(err)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if s.statistics != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.statistics.Close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if s.metric != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.metric.close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("redo sink closed", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -204,16 +202,19 @@ func (s *Sink) Close(_ bool) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (s *Sink) sendMessages(ctx context.Context) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return ctx.Err() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if e, ok := s.logBuffer.Get(); ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := s.dmlWriter.WriteEvents(ctx, e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| e, ok := s.logBuffer.Get() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start := time.Now() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := s.dmlWriter.WriteEvents(ctx, e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+211
to
+214
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrap DML writer errors before returning from the send loop. At Line 213, 🔧 Proposed fix err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
- return err
+ return errors.Trace(err)
}As per coding guidelines, when an error comes from a third-party or library call in Go, wrap it immediately with 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if s.metric != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s.metric.observeRowWrite(1, time.Since(start)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -50,6 +50,23 @@ var ( | |||||||||||||||||||||||||||||
| Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16), | ||||||||||||||||||||||||||||||
| }, []string{getKeyspaceLabel(), "changefeed", "type"}) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // RedoTotalRowsCountGauge records the total number of rows written to redo log. | ||||||||||||||||||||||||||||||
| RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ | ||||||||||||||||||||||||||||||
| Namespace: namespace, | ||||||||||||||||||||||||||||||
| Subsystem: subsystem, | ||||||||||||||||||||||||||||||
| Name: "total_rows_count", | ||||||||||||||||||||||||||||||
| Help: "The total count of rows that are processed by redo writer", | ||||||||||||||||||||||||||||||
| }, []string{getKeyspaceLabel(), "changefeed", "type"}) | ||||||||||||||||||||||||||||||
|
Comment on lines
+53
to
+59
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # Find where RedoTotalRowsCountGauge is used in the codebase
rg "RedoTotalRowsCountGauge" -A 3 -B 1Repository: pingcap/ticdc Length of output: 1911 🌐 Web query:
💡 Result: Rule of thumb
“Total” metrics best practices (monotonically increasing)
Common confusion:
|
||||||||||||||||||||||||||||||
| // RedoTotalRowsCountGauge records the total number of rows written to redo log. | |
| RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ | |
| Namespace: namespace, | |
| Subsystem: subsystem, | |
| Name: "total_rows_count", | |
| Help: "The total count of rows that are processed by redo writer", | |
| }, []string{getKeyspaceLabel(), "changefeed", "type"}) | |
| // RedoTotalRowsCountGauge records the total number of rows written to redo log. | |
| RedoTotalRowsCountGauge = prometheus.NewCounterVec(prometheus.CounterOpts{ | |
| Namespace: namespace, | |
| Subsystem: subsystem, | |
| Name: "total_rows_count_total", | |
| Help: "The total count of rows that are processed by redo writer", | |
| }, []string{getKeyspaceLabel(), "changefeed", "type"}) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/metrics/redo.go` around lines 53 - 59, The metric RedoTotalRowsCountGauge
should be a CounterVec not a GaugeVec: replace the variable
RedoTotalRowsCountGauge and its creation via prometheus.NewGaugeVec with a
CounterVec using prometheus.NewCounterVec (or rename the variable to
RedoTotalRowsCountCounter), change prometheus.GaugeOpts to
prometheus.CounterOpts and update the metric Name from "total_rows_count" to
"total_rows_count_total" while keeping the same Namespace, Subsystem, Help text
(adjust help to indicate a cumulative counter) and labels
([]string{getKeyspaceLabel(), "changefeed", "type"}).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The copyright year is set to 2026, which is in the future. Please correct it to the current year.