[DNM]storage: enhance the storage sink #4219
[DNM]storage: enhance the storage sink #42193AceShowHand wants to merge 4 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR introduces a spool-based buffering architecture for cloud storage sink, replacing the defragmenter pipeline with task-based encoding and writing. Key additions include a memory-aware spool manager with disk-spill support, per-dispatcher path state management with TTL-based cleanup, asynchronous DDL/block event routing through PassBlockEvent interface, and consumer-side path key parsing utilities. Changes
Sequence DiagramsequenceDiagram
participant Disp as Dispatcher
participant Sink as Cloud Storage Sink
participant DML as DMLWriters
participant Enc as EncodingGroup
participant Spool as Spool Manager
participant Writer as Writer (per-shard)
rect rgba(100, 200, 100, 0.5)
Note over Disp,Writer: DML Event Path
Disp->>Sink: AddDMLEvent(event)
Sink->>DML: AddDMLEvent(event)
DML->>DML: newDMLTask(event)
DML->>DML: submitTaskToEncoder(task)
Enc->>Enc: route to input shard
Enc->>Enc: encode task → taskFuture
Enc->>Writer: ConsumeOutputShard(encoded task)
Writer->>Writer: batchTask by table
Writer->>Spool: (optional) enqueue encoded msgs
Writer->>Writer: flush to storage
end
rect rgba(200, 100, 100, 0.5)
Note over Disp,Spool: Block Event (DDL) Path
Disp->>Sink: PassBlockEvent(blockEvent)
Sink->>DML: PassBlockEvent(blockEvent)
DML->>DML: newDrainTask(dispatcherID, commitTs, doneCh)
DML->>DML: submitTaskToEncoder(drainTask)
Enc->>Writer: ConsumeOutputShard(drain task)
Writer->>Writer: flush pending messages
Writer->>Writer: emit DDL marker
Writer-->>Disp: drainMarker.done() via doneCh
end
rect rgba(100, 100, 200, 0.5)
Note over Spool,Writer: Memory Pressure & Spill
Writer->>Spool: Enqueue(msgs, onEnqueued)
alt memory < quota
Spool->>Spool: store in memory
else memory >= quota
Spool->>Spool: serialize and spill to disk
end
Writer->>Spool: Load(entry) for flush
Spool-->>Writer: return msgs or read from disk
Writer->>Spool: Release(entry)
Spool->>Spool: decrement accounting
alt mem drops below low watermark
Spool-->>DML: invoke pending wake callbacks
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes The PR spans diverse architectural components (spool system, task-based pipeline, path state refactoring, dispatcher/sink integration, consumer utilities) with significant logic density in encoding group, writer flush logic, spool segment management, and path state cleanup. Multiple files require cross-domain understanding, though many changes follow consistent patterns within their cohorts. Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ❌ 4❌ Failed checks (3 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers a significant architectural upgrade to the cloud storage sink, aiming to achieve General Availability status by addressing critical performance, scalability, and correctness concerns. The core problem of low throughput due to tick-limited flushing is resolved through a two-stage acknowledgment mechanism and an intelligent internal spool for buffering. DDL event ordering is meticulously preserved using a novel Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-documented enhancement to the storage sink. The core changes include a major refactoring to improve performance and scalability by introducing a spooling mechanism with disk spillover, a two-stage ACK for DMLs to enable early wake-up, and a new DDL drain mechanism to ensure correctness. The removal of the defragmenter in favor of a sharded encoder pipeline is a key improvement to eliminate a global head-of-line blocking point. The implementation is supported by extensive design documentation and appears to be solid. I have one suggestion to improve metric consistency.
| return 0, 0, errors.Trace(inErr) | ||
| } | ||
|
|
||
| d.metricFlushDuration.Observe(time.Since(start).Seconds()) |
There was a problem hiding this comment.
The metric observed here for multipart data file writes (metricFlushDuration) is different from the one used for single-part writes (metricWriteDuration on line 335). This inconsistency can make it harder to analyze data file write performance. For better clarity and consistency in monitoring, I suggest using metricWriteDuration for all data file write operations, regardless of whether they are single-part or multi-part.
| d.metricFlushDuration.Observe(time.Since(start).Seconds()) | |
| d.metricWriteDuration.Observe(time.Since(start).Seconds()) |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/sink/cloudstorage/writer.go (1)
249-276:⚠️ Potential issue | 🟠 Major
ignoreTableTask: spoolLoaderror discards messages without propagating callbacks.When
Loadfails at line 259, the code logs a warning, releases the entry, andcontinues — but theentry.callbacks(which contain PostFlush callbacks for checkpoint advancement) are never invoked. This means a Load failure could stall table progress permanently since the callbacks that drivetableProgress/ checkpoint advancement are lost.Consider invoking
entry's callbacks even whenLoadfails, or at minimum propagating the error upward rather than silently swallowing it.
🤖 Fix all issues with AI agents
In `@docs/plans/2026-02-04-cloudstorage-sink-ga-design.md`:
- Around line 252-263: The design requires three observability metrics that are
missing from downstreamadapter/sink/metrics/cloudstorage.go: add definitions,
registration, and usage for spool_items, wake_calls_total, and
ddl_affected_dispatchers; specifically declare Prometheus metrics (e.g., a Gauge
or GaugeVec named SpoolItems, a CounterVec named WakeCallsTotal, and a Gauge or
CounterVec named DDLAffectedDispatchers), register them in the same
init/registry block currently used by other metrics in cloudstorage.go, and wire
increments/sets where the spool and DDL code update counts (places that
currently update spool_bytes, wake_suppressed_total, and
ddl_drain_duration_seconds). If you prefer not to implement them now, update the
design text to mark spool_items, wake_calls_total, and ddl_affected_dispatchers
as optional/deferred instead.
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Around line 179-202: PassBlockEvent pushes a drain task into d.msgCh and then
blocks waiting on doneCh, which can hang indefinitely if the encoding/dispatch
pipeline has already exited; fix by making the wait use the pipeline's
Run()-scoped context (or a derived cancel) instead of the long-lived d.ctx, or
ensure the pipeline closes/returns an error into doneCh when it shuts down
(e.g., detect shutdown in the dispatcher/Run() and send an error on doneCh or
close it). Update PassBlockEvent to select on doneCh, the Run()-scoped context
(or a context produced/owned by Run()), and only fall back to d.ctx as a last
resort; also modify the dispatcher/Run() path that consumes d.msgCh to inject an
error into the corresponding doneCh when it exits so blocked callers of
PassBlockEvent are unblocked.
In `@downstreamadapter/sink/cloudstorage/spool/codec.go`:
- Around line 52-59: In deserializeMessages validate the uint32 count read from
binary before calling make(..., count): check that count is reasonable (e.g.,
not larger than len(data) and below a configured/constant upper bound or <=
uint32(len(data)/minMessageSize)), and if it exceeds the sanity limit return an
error instead of attempting allocation; update the allocation to use the
validated/capped count and document the chosen bound; reference the
deserializeMessages function and the local variable count when making this
change.
In `@downstreamadapter/sink/cloudstorage/spool/manager.go`:
- Around line 349-351: The allocation of callbacksToRun uses len(s.pendingWake)
before acquiring the mutex, causing a data race; move the make([]func(), 0,
len(s.pendingWake)) so it occurs after s.mu.Lock() (or compute the length into a
local variable while holding the lock) to ensure len(s.pendingWake) is read only
while s.mu is held; update the code around callbacksToRun, s.pendingWake and
s.mu accordingly so the capacity hint is computed under the lock.
- Around line 174-179: The startup code calling os.RemoveAll(rootDir) in
manager.go is unsafe if the changefeed ID components (keyspace, name, ID) are
empty or malformed; update the initialization in the spool manager (the code
that builds rootDir before calling os.RemoveAll and os.MkdirAll) to validate
inputs and the computed path: ensure keyspace, name and ID are non-empty, run
filepath.Clean on rootDir, and verify that the cleaned rootDir contains the
expected defaultDirectoryName segment (or matches an allowed base prefix) before
allowing os.RemoveAll; if validation fails, return an error instead of deleting.
- Around line 310-342: Load has a race where Manager.Close can close
spoolSegment.file after s.mu is released but before file.ReadAt, causing "bad
file descriptor"; fix by holding the appropriate lock across the I/O or
switching to an RW lock: either acquire the mutex (s.mu) and defer its Unlock
until after the ReadAt so the file handle cannot be closed during the read, or
change s.mu to a sync.RWMutex and take RLock in Manager.Load (and have Close
take Lock) so readers hold the lock for the duration of file.ReadAt; ensure
checks of s.closed and lookup of s.segments/entry.pointer happen under that same
lock and reference spoolSegment.file only while the lock is held (or use
reference counting on the file handle if preferred).
In `@downstreamadapter/sink/metrics/cloudstorage.go`:
- Around line 73-79: The histogram CloudStorageFlushFileSizeHistogram currently
uses prometheus.ExponentialBuckets(1024, 2.0, 16) which only covers up to 32 MiB
and will put ~64 MiB files into +Inf; adjust the bucket configuration so it
covers at least 128 MiB (e.g., increase the count to 17 or 18, or raise the
start value) by updating the call to prometheus.ExponentialBuckets in the
CloudStorageFlushFileSizeHistogram definition to use a larger bucket count
(17/18) or a larger initial size so the histogram spans beyond the default 64
MiB file-size target.
🧹 Nitpick comments (17)
pkg/sink/cloudstorage/config.go (1)
264-277: Consider adding an upper-bound guard forspool-disk-quota.Other config fields like
file-sizeandworker-counthave upper-bound limits with warning logs and clamping. A very large spool disk quota could exhaust disk space. Consider adding a soft upper limit with a warning, similar to the pattern used forfile-sizeandworker-count.pkg/sink/cloudstorage/config_test.go (1)
171-193: Good test coverage for spool-disk-quota precedence.Covers both URI-override and replica-config-fallback scenarios. Consider adding a negative test case (e.g.,
spool-disk-quota=0orspool-disk-quota=-1) to verify the validation error path ingetSpoolDiskQuota.pkg/sink/cloudstorage/path.go (1)
381-411: Double cleanup call perGenerateDataFilePathinvocation.
cleanupExpiredPathStateis called ingenerateDataFileName(line 385), andensurePathState+currentTimeare called in bothgenerateDataDirPath(lines 358-359) andgenerateDataFileName(lines 384-386) for the same table key within a singleGenerateDataFilePathcall. The redundant map lookups are minor but could be streamlined by lifting the state lookup toGenerateDataFilePathand passing it down.pkg/sink/cloudstorage/path_test.go (1)
338-338: Consider using a dedicated API method instead of directly accessingf.pathState.Lines 77, 90, 114, etc. consistently use the new encapsulated API (
setCurrentSchemaVersion,currentSchemaVersion,pathStateCount), but this line directly accesses the internalpathStatemap. A method likeremovePathState(table)would maintain consistency.downstreamadapter/sink/cloudstorage/task.go (1)
80-85: Silent drop ondoneChifdone()is called more than once.The non-blocking send silently discards the error if the channel buffer is already full (e.g.,
done()called twice). If the intent is exactly-once signaling, consider adding a guard (e.g.,sync.Once) or at minimum a log warning on thedefaultbranch to aid debugging.♻️ Optional: add a warning on the default path
func (m *drainMarker) done(err error) { select { case m.doneCh <- err: default: + log.Warn("drain marker done called more than once or channel full", + zap.Stringer("dispatcherID", m.dispatcherID), + zap.Uint64("commitTs", m.commitTs)) } }storage-sink-monitoring.md (1)
20-23: Minor: Fenced code block missing language specifier.The ASCII diagram code block should have a language hint (e.g.,
```text) to satisfy markdownlint MD040 and improve rendering in some Markdown viewers.downstreamadapter/sink/cloudstorage/task_indexer.go (1)
29-31:dispatcherToShardmap grows without bound — no eviction mechanism.The design doc (Section 7.1) states that per-dispatcher state "must support reclaim (TTL/LRU or writer lifecycle bound)." This map only grows as new dispatchers are seen but never shrinks. Under dispatcher churn over a long-running deployment, this will leak memory proportionally to the total number of historically-seen dispatchers rather than the active set.
Consider adding a
remove(dispatcherID)method or a periodic purge so the caller can clean up stale entries.downstreamadapter/sink/cloudstorage/spool/codec.go (1)
24-49: Consider direct byte encoding instead ofbinary.Writefor hot-path performance.
binary.Writewith auint32argument uses reflection internally (it callsdataSize(data)→reflect.ValueOf). On a spool serialization path that processes every DML batch, this overhead adds up. Direct little-endian encoding into a[]byte(or usingbinary.LittleEndian.PutUint32) is significantly faster.♻️ Example using direct encoding (serialize side)
func serializeMessages(msgs []*codeccommon.Message) ([]byte, error) { - buf := bytes.NewBuffer(make([]byte, 0, 128)) - if err := binary.Write(buf, binary.LittleEndian, uint32(len(msgs))); err != nil { - return nil, errors.Trace(err) - } - for _, msg := range msgs { - keyLen := uint32(len(msg.Key)) - valueLen := uint32(len(msg.Value)) - rows := uint32(msg.GetRowsCount()) - if err := binary.Write(buf, binary.LittleEndian, keyLen); err != nil { - return nil, errors.Trace(err) - } - if err := binary.Write(buf, binary.LittleEndian, valueLen); err != nil { - return nil, errors.Trace(err) - } - if err := binary.Write(buf, binary.LittleEndian, rows); err != nil { - return nil, errors.Trace(err) - } - if _, err := buf.Write(msg.Key); err != nil { - return nil, errors.Trace(err) - } - if _, err := buf.Write(msg.Value); err != nil { - return nil, errors.Trace(err) - } - } - return buf.Bytes(), nil + // Pre-calculate total size to avoid repeated buffer growth. + totalSize := 4 // count header + for _, msg := range msgs { + totalSize += 12 + len(msg.Key) + len(msg.Value) // 3 x uint32 headers + payload + } + buf := make([]byte, totalSize) + offset := 0 + binary.LittleEndian.PutUint32(buf[offset:], uint32(len(msgs))) + offset += 4 + for _, msg := range msgs { + binary.LittleEndian.PutUint32(buf[offset:], uint32(len(msg.Key))) + offset += 4 + binary.LittleEndian.PutUint32(buf[offset:], uint32(len(msg.Value))) + offset += 4 + binary.LittleEndian.PutUint32(buf[offset:], uint32(msg.GetRowsCount())) + offset += 4 + copy(buf[offset:], msg.Key) + offset += len(msg.Key) + copy(buf[offset:], msg.Value) + offset += len(msg.Value) + } + return buf, nil }downstreamadapter/sink/cloudstorage/encoding_group_test.go (2)
58-61: Unbounded retry loop when seeking distinct shards could theoretically hang.If
GID.Hashwere to map all dispatchers to the same shard (e.g., a degenerate hash), this loop would never terminate. The same pattern appears at lines 82-85 inTestEncodingGroupRouteByDispatcher. Adding a max iteration count (e.g., 100) witht.Fatalwould make the tests more robust against unexpected hash behavior.
216-233: Test helpernewTestDMLEventdirectly constructs aDMLEventstruct literal — verify required fields.The
DMLEventstruct (from the relevant snippet) has many fields. The helper only sets a subset. Notably,Length,RowTypes,CommitTs, andStartTsare zero-valued. If the encoder accesses these fields, the test may not exercise realistic behavior. This is acceptable for a routing/plumbing test, but consider adding a comment noting this is a minimal stub.downstreamadapter/sink/cloudstorage/encoding_group.go (1)
135-156: Ordering dependency betweeninputChandoutputChsends inAdd()— verify no deadlock under saturation.
Add()sends the sametaskFuturetoinputChfirst (Line 147), then tooutputCh(Line 153). If both channels are near capacity (1024 each), there's a window where the future is ininputCh(being processed by the encoder, which callsfinish()) before it appears inoutputCh. This is safe becauseReady()handles an already-closeddonechannel correctly, but it means the encoder may finish encoding before the consumer can even see the future.One edge case worth noting: if
Add()blocks on theoutputChsend (Line 153) while the encoder has already calledfinish()on the future, the encoded payload sits in memory without being consumed untiloutputChdrains. Under prolonged saturation, this means up todefaultChannelSizeencoded payloads may linger in memory beyond spool accounting. Depending on message sizes, this could be significant.Consider documenting this memory accounting gap or bounding the combined channel depth.
docs/plans/2026-02-04-cloudstorage-sink-ga-requirements.md (1)
148-155: Track the config inconsistency identified in Line 151.The document notes that
defaultEnableTableAcrossNodesinpkg/sink/cloudstorage/config.gois inconsistent with the GA default offalse. This should be tracked as a concrete work item (e.g., a follow-up issue) to avoid it being forgotten post-merge.Would you like me to open an issue to track the
defaultEnableTableAcrossNodesconfig alignment?storage-sink.md (1)
62-80: Add a language specifier to the fenced code block.The ASCII diagram code block at line 62 lacks a language specifier, flagged by markdownlint (MD040). Use
```textfor plain-text diagrams.storage-sink-slow.md (1)
62-64: Add language specifiers to fenced code blocks.Lines 62, 73, and 167 have fenced code blocks without language specifiers (MD040). Use
```textfor formulae/diagrams.downstreamadapter/sink/cloudstorage/writer.go (2)
411-419: Silent drop of interval-triggered flush whentoBeFlushedChis full.The
defaultcase at line 418 silently drops the interval flush if the channel is full. ThebatchedTaskis not reset, so data isn't lost — it accumulates and will be retried on the next tick. However, this means interval flushes can be arbitrarily delayed under back-pressure, and the batch can grow well beyondFlushInterval-worth of data without any observability signal.Consider logging at debug level when this happens, or tracking it as a metric, so operators can identify when flush back-pressure is occurring.
62-69:flushReasonasstringtype — consider using integer constants.Using a string-typed enum means every metric label construction allocates (or at least compares) strings. Since these values are used on the hot flush path for Prometheus labels,
iota-based int constants with aString()method would be more efficient and less error-prone.downstreamadapter/sink/cloudstorage/spool/manager.go (1)
379-384: Clamping negative bytes without logging masks accounting bugs.If
memoryBytesordiskBytesgo negative, the code silently clamps to 0. This is good defensive coding, but negative values indicate a double-release or accounting mismatch that should be investigated. Consider adding a warning log when this occurs.🔍 Suggested improvement
if s.memoryBytes < 0 { + log.Warn("spool memory bytes went negative, clamping to zero", + zap.Int64("memoryBytes", s.memoryBytes), + zap.String("keyspace", s.metricKeyspace), + zap.String("changefeed", s.metricChangefeedLabel)) s.memoryBytes = 0 } if s.diskBytes < 0 { + log.Warn("spool disk bytes went negative, clamping to zero", + zap.Int64("diskBytes", s.diskBytes), + zap.String("keyspace", s.metricKeyspace), + zap.String("changefeed", s.metricChangefeedLabel)) s.diskBytes = 0 }
| ## 8. 可观测性 | ||
|
|
||
| 必须提供(至少): | ||
| - spool:`spool_bytes`, `spool_items`, `wake_calls_total`, `wake_suppressed_total` | ||
| - flush: | ||
| - `flush_count{reason=size|interval|ddl|close|error}` | ||
| - `flush_duration_seconds{reason=...}` | ||
| - `flush_file_size_bytes`(直方图,观测贴近 file-size) | ||
| - DDL: | ||
| - `ddl_drain_duration_seconds` | ||
| - `ddl_affected_dispatchers` | ||
| - (可选)`ddl_drain_wait_encode_seconds`, `ddl_drain_wait_remote_seconds` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if ddl_affected_dispatchers metric is defined anywhere in the codebase
rg -rn 'affected_dispatchers' --type=goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
#!/bin/bash
# Find the metrics file
fd 'cloudstorage\.go' --type f | grep -i metricRepository: pingcap/ticdc
Length of output: 103
🏁 Script executed:
#!/bin/bash
# Also search for metrics definitions more broadly
fd 'metrics' --type d | head -20Repository: pingcap/ticdc
Length of output: 110
🏁 Script executed:
#!/bin/bash
# Check the contents of the cloudstorage metrics file
cat -n downstreamadapter/sink/metrics/cloudstorage.go | head -200Repository: pingcap/ticdc
Length of output: 7294
🏁 Script executed:
#!/bin/bash
# Search for spool_items and wake_calls metrics
rg -rn 'spool_items|wake_calls_total' --type=goRepository: pingcap/ticdc
Length of output: 39
Three observability metrics specified in the design are not implemented.
Section 8 specifies three metrics as required ("必须提供"), but they are not defined in downstreamadapter/sink/metrics/cloudstorage.go:
spool_items(spool observability)wake_calls_total(spool observability)ddl_affected_dispatchers(DDL observability)
The other metrics listed (flush metrics, ddl_drain_duration_seconds, spool bytes/memory, wake suppressed) are correctly implemented. These three missing metrics should either be added to the metrics file or explicitly marked as optional/deferred in the design.
🤖 Prompt for AI Agents
In `@docs/plans/2026-02-04-cloudstorage-sink-ga-design.md` around lines 252 - 263,
The design requires three observability metrics that are missing from
downstreamadapter/sink/metrics/cloudstorage.go: add definitions, registration,
and usage for spool_items, wake_calls_total, and ddl_affected_dispatchers;
specifically declare Prometheus metrics (e.g., a Gauge or GaugeVec named
SpoolItems, a CounterVec named WakeCallsTotal, and a Gauge or CounterVec named
DDLAffectedDispatchers), register them in the same init/registry block currently
used by other metrics in cloudstorage.go, and wire increments/sets where the
spool and DDL code update counts (places that currently update spool_bytes,
wake_suppressed_total, and ddl_drain_duration_seconds). If you prefer not to
implement them now, update the design text to mark spool_items,
wake_calls_total, and ddl_affected_dispatchers as optional/deferred instead.
| func (d *dmlWriters) PassBlockEvent(event commonEvent.BlockEvent) error { | ||
| if event == nil { | ||
| return nil | ||
| } | ||
|
|
||
| start := time.Now() | ||
| defer sinkmetrics.CloudStorageDDLDrainDurationHistogram.WithLabelValues( | ||
| d.changefeedID.Keyspace(), | ||
| d.changefeedID.ID().String(), | ||
| ).Observe(time.Since(start).Seconds()) | ||
|
|
||
| doneCh := make(chan error, 1) | ||
| // Invariant for DDL ordering: | ||
| // marker follows the same dispatcher route and is acked only after prior tasks | ||
| // in that route are fully drained by writer. | ||
| d.msgCh.Push(newDrainTask(event.GetDispatcherID(), event.GetCommitTs(), doneCh)) | ||
|
|
||
| select { | ||
| case err := <-doneCh: | ||
| return err | ||
| case <-d.ctx.Done(): | ||
| return errors.Trace(d.ctx.Err()) | ||
| } | ||
| } |
There was a problem hiding this comment.
PassBlockEvent may block indefinitely if the pipeline exits before processing the drain task.
PassBlockEvent pushes a drain task into msgCh and waits on doneCh. If the encoding/dispatch pipeline has already exited (e.g., due to an error in Run()), the drain task will never be consumed and doneCh will never receive. The method then blocks until d.ctx is cancelled, which could be arbitrarily long if d.ctx is a long-lived parent context rather than the Run()-scoped context.
Consider either:
- Using the
Run()-scoped context (or a derived cancel) as the fallback instead ofd.ctx. - Closing
doneChwith an error when the pipeline shuts down (e.g., drainingmsgChon exit).
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/dml_writers.go` around lines 179 - 202,
PassBlockEvent pushes a drain task into d.msgCh and then blocks waiting on
doneCh, which can hang indefinitely if the encoding/dispatch pipeline has
already exited; fix by making the wait use the pipeline's Run()-scoped context
(or a derived cancel) instead of the long-lived d.ctx, or ensure the pipeline
closes/returns an error into doneCh when it shuts down (e.g., detect shutdown in
the dispatcher/Run() and send an error on doneCh or close it). Update
PassBlockEvent to select on doneCh, the Run()-scoped context (or a context
produced/owned by Run()), and only fall back to d.ctx as a last resort; also
modify the dispatcher/Run() path that consumes d.msgCh to inject an error into
the corresponding doneCh when it exits so blocked callers of PassBlockEvent are
unblocked.
| func deserializeMessages(data []byte) ([]*codeccommon.Message, error) { | ||
| reader := bytes.NewReader(data) | ||
| var count uint32 | ||
| if err := binary.Read(reader, binary.LittleEndian, &count); err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
|
|
||
| result := make([]*codeccommon.Message, 0, count) |
There was a problem hiding this comment.
Unbounded pre-allocation from untrusted count value can cause OOM on corrupted spool data.
If spool data on disk is corrupted (bit-flip, truncation, etc.), count could be an arbitrary uint32 value up to ~4 billion. The make([]*codeccommon.Message, 0, count) at Line 59 would attempt to allocate up to ~32 GB of pointer space, crashing the process before the loop even starts.
Add a sanity check on count (e.g., against len(data) or a reasonable upper bound) before allocating.
🛡️ Proposed fix
func deserializeMessages(data []byte) ([]*codeccommon.Message, error) {
reader := bytes.NewReader(data)
var count uint32
if err := binary.Read(reader, binary.LittleEndian, &count); err != nil {
return nil, errors.Trace(err)
}
+ // Each message has at least 12 bytes of header (keyLen + valueLen + rows).
+ // Reject obviously invalid counts to avoid OOM on corrupted data.
+ maxPossible := (uint32(len(data)) - 4) / 12
+ if count > maxPossible {
+ return nil, errors.Errorf("message count %d exceeds maximum possible %d for data size %d", count, maxPossible, len(data))
+ }
result := make([]*codeccommon.Message, 0, count)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func deserializeMessages(data []byte) ([]*codeccommon.Message, error) { | |
| reader := bytes.NewReader(data) | |
| var count uint32 | |
| if err := binary.Read(reader, binary.LittleEndian, &count); err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| result := make([]*codeccommon.Message, 0, count) | |
| func deserializeMessages(data []byte) ([]*codeccommon.Message, error) { | |
| reader := bytes.NewReader(data) | |
| var count uint32 | |
| if err := binary.Read(reader, binary.LittleEndian, &count); err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| // Each message has at least 12 bytes of header (keyLen + valueLen + rows). | |
| // Reject obviously invalid counts to avoid OOM on corrupted data. | |
| maxPossible := (uint32(len(data)) - 4) / 12 | |
| if count > maxPossible { | |
| return nil, errors.Errorf("message count %d exceeds maximum possible %d for data size %d", count, maxPossible, len(data)) | |
| } | |
| result := make([]*codeccommon.Message, 0, count) |
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/codec.go` around lines 52 - 59, In
deserializeMessages validate the uint32 count read from binary before calling
make(..., count): check that count is reasonable (e.g., not larger than
len(data) and below a configured/constant upper bound or <=
uint32(len(data)/minMessageSize)), and if it exceeds the sanity limit return an
error instead of attempting allocation; update the allocation to use the
validated/capped count and document the chosen bound; reference the
deserializeMessages function and the local variable count when making this
change.
| if err := os.RemoveAll(rootDir); err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
| if err := os.MkdirAll(rootDir, 0o755); err != nil { | ||
| return nil, errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
os.RemoveAll(rootDir) on startup could be destructive if path computation is wrong.
If any component of the changefeed ID (keyspace, name, ID) is empty or unexpected, the constructed rootDir could point higher in the directory tree than intended. RemoveAll on such a path would silently delete unrelated data. Consider adding a sanity check that rootDir contains the expected defaultDirectoryName component before removing, or at least validating that the changefeed ID components are non-empty.
🛡️ Suggested defensive check
+ if changefeedID.ID().String() == "" || changefeedID.Name() == "" {
+ return nil, errors.Errorf("invalid changefeed ID for spool directory")
+ }
+
if err := os.RemoveAll(rootDir); err != nil {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err := os.RemoveAll(rootDir); err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| if err := os.MkdirAll(rootDir, 0o755); err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| if changefeedID.ID().String() == "" || changefeedID.Name() == "" { | |
| return nil, errors.Errorf("invalid changefeed ID for spool directory") | |
| } | |
| if err := os.RemoveAll(rootDir); err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| if err := os.MkdirAll(rootDir, 0o755); err != nil { | |
| return nil, errors.Trace(err) | |
| } |
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/manager.go` around lines 174 - 179,
The startup code calling os.RemoveAll(rootDir) in manager.go is unsafe if the
changefeed ID components (keyspace, name, ID) are empty or malformed; update the
initialization in the spool manager (the code that builds rootDir before calling
os.RemoveAll and os.MkdirAll) to validate inputs and the computed path: ensure
keyspace, name and ID are non-empty, run filepath.Clean on rootDir, and verify
that the cleaned rootDir contains the expected defaultDirectoryName segment (or
matches an allowed base prefix) before allowing os.RemoveAll; if validation
fails, return an error instead of deleting.
| // Load fetches messages from memory or spilled segments. | ||
| func (s *Manager) Load(entry *Entry) ([]*codeccommon.Message, []func(), error) { | ||
| if entry == nil { | ||
| return nil, nil, nil | ||
| } | ||
| if entry.memoryMsgs != nil { | ||
| return entry.memoryMsgs, entry.callbacks, nil | ||
| } | ||
| if entry.pointer == nil { | ||
| return nil, entry.callbacks, nil | ||
| } | ||
|
|
||
| s.mu.Lock() | ||
| spoolSegment := s.segments[entry.pointer.segmentID] | ||
| if spoolSegment == nil { | ||
| s.mu.Unlock() | ||
| return nil, nil, errors.Errorf("spool segment %d not found", entry.pointer.segmentID) | ||
| } | ||
| file := spoolSegment.file | ||
| offset := entry.pointer.offset | ||
| length := entry.pointer.length | ||
| s.mu.Unlock() | ||
|
|
||
| buf := make([]byte, length) | ||
| if _, err := file.ReadAt(buf, offset); err != nil { | ||
| return nil, nil, errors.Trace(err) | ||
| } | ||
| msgs, err := deserializeMessages(buf) | ||
| if err != nil { | ||
| return nil, nil, errors.Trace(err) | ||
| } | ||
| return msgs, entry.callbacks, nil | ||
| } |
There was a problem hiding this comment.
Race between Close() and Load(): file can be closed under a concurrent reader.
Load() acquires the lock to look up the segment and get the file handle (lines 322–331), then releases the lock and performs file.ReadAt() (line 334). If Close() runs between the unlock and the ReadAt, it will close the file handle under the reader, causing a "bad file descriptor" error.
Consider either:
- Checking
s.closedbefore the unlock and returning early, and deferring the unlock to after theReadAt. - Or using a
sync.RWMutexand holding the read lock across the I/O.
🔒 Sketch: hold lock across ReadAt
s.mu.Lock()
+ if s.closed {
+ s.mu.Unlock()
+ return nil, nil, errors.New("spool is closed")
+ }
spoolSegment := s.segments[entry.pointer.segmentID]
if spoolSegment == nil {
s.mu.Unlock()
return nil, nil, errors.Errorf("spool segment %d not found", entry.pointer.segmentID)
}
file := spoolSegment.file
offset := entry.pointer.offset
length := entry.pointer.length
s.mu.Unlock()Even with this check, there's still a window between the s.closed check and the ReadAt. A more robust fix would be to hold the lock (or an RWLock) across the I/O, or to use reference counting on the file handle itself.
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/manager.go` around lines 310 - 342,
Load has a race where Manager.Close can close spoolSegment.file after s.mu is
released but before file.ReadAt, causing "bad file descriptor"; fix by holding
the appropriate lock across the I/O or switching to an RW lock: either acquire
the mutex (s.mu) and defer its Unlock until after the ReadAt so the file handle
cannot be closed during the read, or change s.mu to a sync.RWMutex and take
RLock in Manager.Load (and have Close take Lock) so readers hold the lock for
the duration of file.ReadAt; ensure checks of s.closed and lookup of
s.segments/entry.pointer happen under that same lock and reference
spoolSegment.file only while the lock is held (or use reference counting on the
file handle if preferred).
|
|
||
| callbacksToRun := make([]func(), 0, len(s.pendingWake)) | ||
| s.mu.Lock() |
There was a problem hiding this comment.
Data race: len(s.pendingWake) read without holding s.mu.
Line 350 reads s.pendingWake (to compute slice capacity) before acquiring s.mu at line 351. While the functional impact is negligible (just a capacity hint), this is technically a data race under Go's memory model and will be flagged by the race detector.
🐛 Fix: move allocation after lock
- callbacksToRun := make([]func(), 0, len(s.pendingWake))
s.mu.Lock()
+ callbacksToRun := make([]func(), 0, len(s.pendingWake))
if s.closed {
s.mu.Unlock()
return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| callbacksToRun := make([]func(), 0, len(s.pendingWake)) | |
| s.mu.Lock() | |
| s.mu.Lock() | |
| callbacksToRun := make([]func(), 0, len(s.pendingWake)) | |
| if s.closed { | |
| s.mu.Unlock() | |
| return |
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/manager.go` around lines 349 - 351,
The allocation of callbacksToRun uses len(s.pendingWake) before acquiring the
mutex, causing a data race; move the make([]func(), 0, len(s.pendingWake)) so it
occurs after s.mu.Lock() (or compute the length into a local variable while
holding the lock) to ensure len(s.pendingWake) is read only while s.mu is held;
update the code around callbacksToRun, s.pendingWake and s.mu accordingly so the
capacity hint is computed under the lock.
| CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
| Namespace: namespace, | ||
| Subsystem: subsystem, | ||
| Name: "cloud_storage_flush_file_size_bytes", | ||
| Help: "Flushed payload bytes per task by reason", | ||
| Buckets: prometheus.ExponentialBuckets(1024, 2.0, 16), | ||
| }, []string{"namespace", "changefeed", "reason"}) |
There was a problem hiding this comment.
Histogram buckets for CloudStorageFlushFileSizeHistogram don't cover the default file-size target (64 MiB).
ExponentialBuckets(1024, 2.0, 16) produces buckets from 1 KiB to 32 MiB. Since the default file-size is 64 MiB, any file flushed near (or above) the target size will fall into the +Inf bucket, defeating the purpose of this metric — which is to observe how close files get to file-size.
Increase to 17 or 18 buckets, or use a larger start value, so the histogram covers at least 128 MiB.
🔧 Proposed fix
CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_flush_file_size_bytes",
Help: "Flushed payload bytes per task by reason",
- Buckets: prometheus.ExponentialBuckets(1024, 2.0, 16),
+ Buckets: prometheus.ExponentialBuckets(1024, 2.0, 20),
}, []string{"namespace", "changefeed", "reason"})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ | |
| Namespace: namespace, | |
| Subsystem: subsystem, | |
| Name: "cloud_storage_flush_file_size_bytes", | |
| Help: "Flushed payload bytes per task by reason", | |
| Buckets: prometheus.ExponentialBuckets(1024, 2.0, 16), | |
| }, []string{"namespace", "changefeed", "reason"}) | |
| CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ | |
| Namespace: namespace, | |
| Subsystem: subsystem, | |
| Name: "cloud_storage_flush_file_size_bytes", | |
| Help: "Flushed payload bytes per task by reason", | |
| Buckets: prometheus.ExponentialBuckets(1024, 2.0, 20), | |
| }, []string{"namespace", "changefeed", "reason"}) |
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/metrics/cloudstorage.go` around lines 73 - 79, The
histogram CloudStorageFlushFileSizeHistogram currently uses
prometheus.ExponentialBuckets(1024, 2.0, 16) which only covers up to 32 MiB and
will put ~64 MiB files into +Inf; adjust the bucket configuration so it covers
at least 128 MiB (e.g., increase the count to 17 or 18, or raise the start
value) by updating the call to prometheus.ExponentialBuckets in the
CloudStorageFlushFileSizeHistogram definition to use a larger bucket count
(17/18) or a larger initial size so the histogram spans beyond the default 64
MiB file-size target.
|
@3AceShowHand: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/sink/cloudstorage/writer.go (1)
249-276:⚠️ Potential issue | 🟡 MinorSpool entries silently skipped when
d.spool == nil.Lines 256-258 skip spool entries without invoking their callbacks when
d.spoolis nil. If a task somehow contains spool entries while the manager is nil, callbacks for those entries (which drive checkpoint progress) would be permanently lost.If
d.spool == nilis truly impossible when entries are present, an assertion or panic would be more appropriate than a silentcontinue. If it is possible, the callbacks from those entries still need to be invoked.
🤖 Fix all issues with AI agents
In `@cmd/storage-consumer/discovery.go`:
- Around line 107-141: Rename the local variable dmlkey to lowerCamelCase dmlKey
everywhere in parseDMLFilePath (including its declaration, the
ParseIndexFilePath call result assignment, and usages as the map key in
c.tableDMLIdxMap) to follow Go naming conventions; also change the unwrapped
error return after FetchIndexFromFileName to return errors.Trace(err) so it
matches the other error wrapping in this function (affecting the
FetchIndexFromFileName error branch that currently does "return err").
In `@cmd/storage-consumer/path_key.go`:
- Around line 271-307: The IsSchemaFile and FetchIndexFromFileName
implementations in the consumer have diverged from the pkg/sink/cloudstorage
version and use plain fmt.Errorf instead of the wrapped error type; remove the
duplicated implementations from the consumer and import the single canonical
implementation from pkg/sink/cloudstorage (export it if necessary), updating
callers to use cloudstorage.IsSchemaFile and
cloudstorage.FetchIndexFromFileName; ensure the canonical FetchIndexFromFileName
wraps errors using errors.WrapError(errors.ErrStorageSinkInvalidFileName, ...)
to match the existing pkg behavior and keep signature and semantics the same.
In `@downstreamadapter/sink/cloudstorage/task.go`:
- Around line 9-12: The Apache 2.0 license header at the top of the file is
missing the standard "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied." line; update the header comment block immediately above the
package declaration (top-of-file comment in
downstreamadapter/sink/cloudstorage/task.go) by inserting that missing line
between the existing `"AS IS" BASIS,` line and the `See the License for the
specific language governing permissions and` line so the full standard Apache
2.0 boilerplate appears.
In `@pkg/sink/cloudstorage/path.go`:
- Line 539: The call to strconv.ParseUint(indexPart, 10, 64) returns a raw error
but needs to be wrapped; change the single return to parse into v, err :=
strconv.ParseUint(indexPart, 10, 64) and on err return a wrapped error using
errors.WrapError(err, "parse indexPart") (or errors.Trace(err)), otherwise
return v, nil; reference symbols: indexPart, strconv.ParseUint,
errors.WrapError.
🧹 Nitpick comments (5)
docs/plans/2026-02-04-cloudstorage-sink-ga-task-breakdown.md (1)
3-7: Optional: Minor markdown formatting.Static analysis detected a blank line inside the blockquote (line 4 is blank within the
>block), which triggers markdownlint rule MD028. This doesn't affect readability but could be cleaned up for strict markdown compliance.📝 Optional formatting fix
> 文档定位(已整合) +> > - 类型:任务清单与执行进度(唯一打勾清单) > - 总览文档:`storage-sink.md` > - 说明:任务状态仅在本文维护;`storage-sink.md` 负责总体架构与语义说明。Alternatively, remove the blank line entirely if it serves no purpose:
> 文档定位(已整合) -> > - 类型:任务清单与执行进度(唯一打勾清单)cmd/storage-consumer/discovery.go (1)
79-101: Silent error swallowing during storage walk may mask data loss.Both
parseSchemaFilePathandparseDMLFilePatherrors are logged but silently skipped (lines 83-85, 89-92). While this is intentional for resilience, a corrupted or partially-written file will be permanently invisible to the consumer — it won't be retried on subsequent walks either, sincegetNewFilesdoesn't distinguish "failed to parse" from "not yet present."Consider whether a metric counter for skipped/failed files would help operators detect persistent parse failures in production.
cmd/storage-consumer/path_key.go (2)
71-79: Missing doc comments on exported types.
FileIndexKeyandFileIndexare exported types but lack doc comments. Go convention andgolintexpect doc comments on all exported declarations.Proposed fix
+// FileIndexKey identifies a file-index partition within a DML path, optionally scoped to a dispatcher. type FileIndexKey struct { DispatcherID string EnableTableAcrossNodes bool } +// FileIndex combines a FileIndexKey with a numeric file index. type FileIndex struct { FileIndexKey Idx uint64 }
88-107:GenerateDMLFilePathunconditionally appendsd.Tableeven when it could be empty.If
DmlPathKey.Tablewere empty, this would produce a path with a double-slash (e.g.,schema1//123456/...). While current callers likely always setTable, there's no guard here, making this fragile.downstreamadapter/sink/cloudstorage/writer.go (1)
411-419: Interval-triggered flush silently dropped whentoBeFlushedChis full.The
defaultbranch at line 418 means interval flushes are non-blocking — if the 64-slottoBeFlushedChis full, the flush is silently skipped andbatchedTaskcontinues accumulating. In sustained-throughput scenarios where the flush goroutine can't keep up, this could lead to unbounded batch growth in memory until the next successful send.This is acceptable if the batch is expected to eventually be flushed by a size trigger or DDL marker (both of which block). However, consider logging or incrementing a metric when an interval flush is dropped so operators can detect sustained back-pressure.
| func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { | ||
| var dmlkey DmlPathKey | ||
| dispatcherID, err := dmlkey.ParseIndexFilePath( | ||
| putil.GetOrZero(c.replicationCfg.Sink.DateSeparator), | ||
| path, | ||
| ) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| data, err := c.externalStorage.ReadFile(ctx, path) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| fileName := strings.TrimSuffix(string(data), "\n") | ||
| fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| fileIndex := &FileIndex{ | ||
| FileIndexKey: FileIndexKey{ | ||
| DispatcherID: dispatcherID, | ||
| EnableTableAcrossNodes: dispatcherID != "", | ||
| }, | ||
| Idx: fileIdx, | ||
| } | ||
|
|
||
| m, ok := c.tableDMLIdxMap[dmlkey] | ||
| if !ok { | ||
| c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{ | ||
| fileIndex.FileIndexKey: fileIndex.Idx, | ||
| } | ||
| } else if fileIndex.Idx >= m[fileIndex.FileIndexKey] { | ||
| c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Inconsistent error wrapping and variable naming.
Two issues:
-
Line 108:
dmlkeyshould bedmlKey(lowerCamelCase) to match Go naming conventions and the rest of this file (e.g.,schemaKeyon line 145). -
Line 123:
return erris not wrapped witherrors.Trace(err), unlike the other error returns in this function (lines 114, 118). This is inconsistent.
Proposed fix
func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
- var dmlkey DmlPathKey
- dispatcherID, err := dmlkey.ParseIndexFilePath(
+ var dmlKey DmlPathKey
+ dispatcherID, err := dmlKey.ParseIndexFilePath(
putil.GetOrZero(c.replicationCfg.Sink.DateSeparator),
path,
)
if err != nil {
return errors.Trace(err)
}
data, err := c.externalStorage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
fileName := strings.TrimSuffix(string(data), "\n")
fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension)
if err != nil {
- return err
+ return errors.Trace(err)
}
fileIndex := &FileIndex{
FileIndexKey: FileIndexKey{
DispatcherID: dispatcherID,
EnableTableAcrossNodes: dispatcherID != "",
},
Idx: fileIdx,
}
- m, ok := c.tableDMLIdxMap[dmlkey]
+ m, ok := c.tableDMLIdxMap[dmlKey]
if !ok {
- c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{
+ c.tableDMLIdxMap[dmlKey] = fileIndexKeyMap{
fileIndex.FileIndexKey: fileIndex.Idx,
}
} else if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
- c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx
+ c.tableDMLIdxMap[dmlKey][fileIndex.FileIndexKey] = fileIndex.Idx
}
return nil
}As per coding guidelines: "Use lowerCamelCase for variable names in Go" and "wrap it immediately with errors.Trace(err)".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { | |
| var dmlkey DmlPathKey | |
| dispatcherID, err := dmlkey.ParseIndexFilePath( | |
| putil.GetOrZero(c.replicationCfg.Sink.DateSeparator), | |
| path, | |
| ) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| data, err := c.externalStorage.ReadFile(ctx, path) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| fileName := strings.TrimSuffix(string(data), "\n") | |
| fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension) | |
| if err != nil { | |
| return err | |
| } | |
| fileIndex := &FileIndex{ | |
| FileIndexKey: FileIndexKey{ | |
| DispatcherID: dispatcherID, | |
| EnableTableAcrossNodes: dispatcherID != "", | |
| }, | |
| Idx: fileIdx, | |
| } | |
| m, ok := c.tableDMLIdxMap[dmlkey] | |
| if !ok { | |
| c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{ | |
| fileIndex.FileIndexKey: fileIndex.Idx, | |
| } | |
| } else if fileIndex.Idx >= m[fileIndex.FileIndexKey] { | |
| c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx | |
| } | |
| return nil | |
| func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error { | |
| var dmlKey DmlPathKey | |
| dispatcherID, err := dmlKey.ParseIndexFilePath( | |
| putil.GetOrZero(c.replicationCfg.Sink.DateSeparator), | |
| path, | |
| ) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| data, err := c.externalStorage.ReadFile(ctx, path) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| fileName := strings.TrimSuffix(string(data), "\n") | |
| fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| fileIndex := &FileIndex{ | |
| FileIndexKey: FileIndexKey{ | |
| DispatcherID: dispatcherID, | |
| EnableTableAcrossNodes: dispatcherID != "", | |
| }, | |
| Idx: fileIdx, | |
| } | |
| m, ok := c.tableDMLIdxMap[dmlKey] | |
| if !ok { | |
| c.tableDMLIdxMap[dmlKey] = fileIndexKeyMap{ | |
| fileIndex.FileIndexKey: fileIndex.Idx, | |
| } | |
| } else if fileIndex.Idx >= m[fileIndex.FileIndexKey] { | |
| c.tableDMLIdxMap[dmlKey][fileIndex.FileIndexKey] = fileIndex.Idx | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In `@cmd/storage-consumer/discovery.go` around lines 107 - 141, Rename the local
variable dmlkey to lowerCamelCase dmlKey everywhere in parseDMLFilePath
(including its declaration, the ParseIndexFilePath call result assignment, and
usages as the map key in c.tableDMLIdxMap) to follow Go naming conventions; also
change the unwrapped error return after FetchIndexFromFileName to return
errors.Trace(err) so it matches the other error wrapping in this function
(affecting the FetchIndexFromFileName error branch that currently does "return
err").
| // IsSchemaFile checks whether the file is a schema file. | ||
| func IsSchemaFile(filePath string) bool { | ||
| return schemaRE.MatchString(filePath) | ||
| } | ||
|
|
||
| func generateDataFileName(enableTableAcrossNodes bool, dispatcherID string, index uint64, extension string, fileIndexWidth int) string { | ||
| indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d" | ||
| if enableTableAcrossNodes { | ||
| return fmt.Sprintf("CDC_%s_"+indexFmt+"%s", dispatcherID, index, extension) | ||
| } | ||
| return fmt.Sprintf("CDC"+indexFmt+"%s", index, extension) | ||
| } | ||
|
|
||
| func FetchIndexFromFileName(fileName string, extension string) (uint64, error) { | ||
| if len(fileName) < minFileNamePrefixLen+len(extension) || | ||
| !strings.HasPrefix(fileName, "CDC") || | ||
| !strings.HasSuffix(fileName, extension) { | ||
| return 0, fmt.Errorf("filename in storage sink is invalid: %s", fileName) | ||
| } | ||
|
|
||
| fileName = strings.TrimSuffix(fileName, extension) | ||
| indexPart := strings.TrimPrefix(fileName, "CDC") | ||
|
|
||
| if strings.HasPrefix(indexPart, "_") { | ||
| trimmed := strings.TrimPrefix(indexPart, "_") | ||
| separatorIndex := strings.LastIndex(trimmed, "_") | ||
| if separatorIndex <= 0 || separatorIndex >= len(trimmed)-1 { | ||
| return 0, fmt.Errorf("filename in storage sink is invalid: %s", fileName) | ||
| } | ||
| indexPart = trimmed[separatorIndex+1:] | ||
| } | ||
|
|
||
| if len(indexPart) < config.MinFileIndexWidth || !isNumberString(indexPart) { | ||
| return 0, fmt.Errorf("filename in storage sink is invalid: %s", fileName) | ||
| } | ||
|
|
||
| return strconv.ParseUint(indexPart, 10, 64) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate the files mentioned in the review
find . -type f -name "path_key.go" -o -name "path.go" | grep -E "(cmd/storage-consumer|pkg/sink/cloudstorage)"Repository: pingcap/ticdc
Length of output: 123
🏁 Script executed:
# Get the implementations of FetchIndexFromFileName from both files
echo "=== cmd/storage-consumer/path_key.go: FetchIndexFromFileName ==="
rg -A 30 'func FetchIndexFromFileName' cmd/storage-consumer/path_key.go
echo ""
echo "=== pkg/sink/cloudstorage/path.go: FetchIndexFromFileName ==="
rg -A 30 'func FetchIndexFromFileName' pkg/sink/cloudstorage/path.goRepository: pingcap/ticdc
Length of output: 2262
🏁 Script executed:
# Get the implementations of IsSchemaFile from both files
echo "=== cmd/storage-consumer/path_key.go: IsSchemaFile ==="
rg -B 2 -A 2 'func IsSchemaFile' cmd/storage-consumer/path_key.go
echo ""
echo "=== pkg/sink/cloudstorage/path.go: IsSchemaFile ==="
rg -B 2 -A 2 'func IsSchemaFile' pkg/sink/cloudstorage/path.goRepository: pingcap/ticdc
Length of output: 440
🏁 Script executed:
# Search for callers of these functions to understand usage
echo "=== Callers of FetchIndexFromFileName ==="
rg 'FetchIndexFromFileName' --type=go -B 2 -A 2 | head -50
echo ""
echo "=== Callers of IsSchemaFile ==="
rg 'IsSchemaFile' --type=go -B 2 -A 2 | head -50Repository: pingcap/ticdc
Length of output: 4713
FetchIndexFromFileName and IsSchemaFile are duplicated between consumer and sink packages with divergent error handling.
Both functions exist in cmd/storage-consumer/path_key.go and pkg/sink/cloudstorage/path.go with identical core logic. However, the pkg version properly wraps errors with errors.WrapError(errors.ErrStorageSinkInvalidFileName, ...), while the consumer version uses plain fmt.Errorf, violating the error wrapping guideline. This divergence indicates they are already drifting apart.
Consolidate to a single implementation in pkg/sink/cloudstorage and import from the consumer to ensure consistent error handling across both modules.
🤖 Prompt for AI Agents
In `@cmd/storage-consumer/path_key.go` around lines 271 - 307, The IsSchemaFile
and FetchIndexFromFileName implementations in the consumer have diverged from
the pkg/sink/cloudstorage version and use plain fmt.Errorf instead of the
wrapped error type; remove the duplicated implementations from the consumer and
import the single canonical implementation from pkg/sink/cloudstorage (export it
if necessary), updating callers to use cloudstorage.IsSchemaFile and
cloudstorage.FetchIndexFromFileName; ensure the canonical FetchIndexFromFileName
wraps errors using errors.WrapError(errors.ErrStorageSinkInvalidFileName, ...)
to match the existing pkg behavior and keep signature and semantics the same.
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. |
There was a problem hiding this comment.
Incomplete Apache 2.0 license header.
The standard WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. line is missing between the "AS IS" BASIS, line and the See the License... line. This is a common copy-paste issue with the boilerplate.
Proposed fix
// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Unless required by applicable law or agreed to in writing, software | |
| // distributed under the License is distributed on an "AS IS" BASIS, | |
| // See the License for the specific language governing permissions and | |
| // limitations under the License. | |
| // Unless required by applicable law or agreed to in writing, software | |
| // distributed under the License is distributed on an "AS IS" BASIS, | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| // See the License for the specific language governing permissions and | |
| // limitations under the License. |
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/task.go` around lines 9 - 12, The Apache
2.0 license header at the top of the file is missing the standard "WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." line; update
the header comment block immediately above the package declaration (top-of-file
comment in downstreamadapter/sink/cloudstorage/task.go) by inserting that
missing line between the existing `"AS IS" BASIS,` line and the `See the License
for the specific language governing permissions and` line so the full standard
Apache 2.0 boilerplate appears.
| fmt.Errorf("'%s' is a invalid file name", fileName)) | ||
| } | ||
|
|
||
| return strconv.ParseUint(indexPart, 10, 64) |
There was a problem hiding this comment.
Unwrapped error from strconv.ParseUint.
The raw error from strconv.ParseUint is returned without wrapping. Other error paths in this function use errors.WrapError, but this one is missed.
Proposed fix
- return strconv.ParseUint(indexPart, 10, 64)
+ idx, err := strconv.ParseUint(indexPart, 10, 64)
+ if err != nil {
+ return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName,
+ fmt.Errorf("'%s' is a invalid file name: %w", indexPart, err))
+ }
+ return idx, nilAs per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return strconv.ParseUint(indexPart, 10, 64) | |
| idx, err := strconv.ParseUint(indexPart, 10, 64) | |
| if err != nil { | |
| return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName, | |
| fmt.Errorf("'%s' is a invalid file name: %w", indexPart, err)) | |
| } | |
| return idx, nil |
🤖 Prompt for AI Agents
In `@pkg/sink/cloudstorage/path.go` at line 539, The call to
strconv.ParseUint(indexPart, 10, 64) returns a raw error but needs to be
wrapped; change the single return to parse into v, err :=
strconv.ParseUint(indexPart, 10, 64) and on err return a wrapped error using
errors.WrapError(err, "parse indexPart") (or errors.Trace(err)), otherwise
return v, nil; reference symbols: indexPart, strconv.ParseUint,
errors.WrapError.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Chores
Documentation