Skip to content

Comments

[DNM]storage: enhance the storage sink #4219

Open
3AceShowHand wants to merge 4 commits intopingcap:masterfrom
3AceShowHand:enhance-storage-sink
Open

[DNM]storage: enhance the storage sink #4219
3AceShowHand wants to merge 4 commits intopingcap:masterfrom
3AceShowHand:enhance-storage-sink

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Feb 14, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Spool-based buffering for Cloud Storage Sink with memory quota and disk spill support
    • Improved DDL drain handling with new PassBlockEvent interface
    • Dispatcher-based routing and path state management for better scalability
    • New spool disk quota configuration option
  • Chores

    • Removed defragmenter-based pipeline; replaced with task-based encoding and dispatch flow
    • Updated metrics for per-reason flush tracking and spool memory/disk monitoring
  • Documentation

    • Added Cloud Storage Sink GA task breakdown and implementation roadmap
    • Added Cloud Storage Sink spool review guidance
    • Added storage sink architecture overview

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 14, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 14, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 14, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 14, 2026

📝 Walkthrough

Walkthrough

This PR introduces a spool-based buffering architecture for cloud storage sink, replacing the defragmenter pipeline with task-based encoding and writing. Key additions include a memory-aware spool manager with disk-spill support, per-dispatcher path state management with TTL-based cleanup, asynchronous DDL/block event routing through PassBlockEvent interface, and consumer-side path key parsing utilities.

Changes

Cohort / File(s) Summary
Documentation & Planning
docs/plans/*, storage-sink.md
New task breakdown and review guidelines for Cloud Storage Sink GA, plus comprehensive end-to-end architecture documentation covering spool, DDL ordering, and memory semantics.
Spool System
downstreamadapter/sink/cloudstorage/spool/*
New spool manager with memory quota, disk-spill segmentation, per-entry accounting, wake-callback suppression, and serialization/deserialization codecs; includes tests for memory suppression and spill-read-back behavior.
Cloud Storage Writer Pipeline
downstreamadapter/sink/cloudstorage/defragmenter* (removed), dml_writers.go, encoding_group*, writer*, task*.go
Removed defragmenter-based ordering; introduced task-based pipeline with task indexer routing, spool integration in dml_writers and writer, per-shard encoding with taskFuture orchestration, and per-reason flush metrics tracking; includes tests validating routing and drain-marker behavior.
Dispatcher & Block Event Interface
downstreamadapter/dispatcher/basic_dispatcher.go, downstreamadapter/sink/*.go (multiple sinks), downstreamadapter/sink/sink.go
Added PassBlockEvent method to Sink interface and implemented across all sink implementations (cloud storage, blackhole, Kafka, MySQL, Pulsar, Redo); updated dispatcher to route block events asynchronously via sink and conditionally wake on enqueue instead of post-flush.
Path Management & State
pkg/sink/cloudstorage/path.go, pkg/sink/cloudstorage/path_key.go (deleted), cmd/storage-consumer/path_key*.go
Refactored path state from per-table to per-(table, dispatcher) with TTL-based cleanup; moved path key utilities from sink pkg to consumer; added schema/DML path parsing and generation utilities on consumer side for independent file discovery.
Configuration
pkg/config/sink.go, pkg/sink/cloudstorage/config.go, pkg/sink/cloudstorage/config_test.go
Added SpoolDiskQuota configuration option with validation and merging logic; changed defaultEnableTableAcrossNodes from true to false; includes tests validating quota inheritance from replica config.
Metrics & Event Model
downstreamadapter/sink/metrics/cloudstorage.go, pkg/common/event/dml_event*.go
Added per-reason flush task metrics, spool gauges (memory/disk/total bytes), wake suppression counter, and DDL drain duration histogram; introduced PostTxnEnqueued callbacks on DMLEvent with PostEnqueue() lifecycle method.
Consumer/Reader Refactoring
cmd/storage-consumer/consumer.go, cmd/storage-consumer/discovery.go, cmd/storage-consumer/main.go
Migrated from hard-coded cloud storage types to local path key types; added file discovery logic for schema and DML index files; introduced helper to skip initialization during test execution.

Sequence Diagram

sequenceDiagram
    participant Disp as Dispatcher
    participant Sink as Cloud Storage Sink
    participant DML as DMLWriters
    participant Enc as EncodingGroup
    participant Spool as Spool Manager
    participant Writer as Writer (per-shard)
    
    rect rgba(100, 200, 100, 0.5)
    Note over Disp,Writer: DML Event Path
    Disp->>Sink: AddDMLEvent(event)
    Sink->>DML: AddDMLEvent(event)
    DML->>DML: newDMLTask(event)
    DML->>DML: submitTaskToEncoder(task)
    Enc->>Enc: route to input shard
    Enc->>Enc: encode task → taskFuture
    Enc->>Writer: ConsumeOutputShard(encoded task)
    Writer->>Writer: batchTask by table
    Writer->>Spool: (optional) enqueue encoded msgs
    Writer->>Writer: flush to storage
    end
    
    rect rgba(200, 100, 100, 0.5)
    Note over Disp,Spool: Block Event (DDL) Path
    Disp->>Sink: PassBlockEvent(blockEvent)
    Sink->>DML: PassBlockEvent(blockEvent)
    DML->>DML: newDrainTask(dispatcherID, commitTs, doneCh)
    DML->>DML: submitTaskToEncoder(drainTask)
    Enc->>Writer: ConsumeOutputShard(drain task)
    Writer->>Writer: flush pending messages
    Writer->>Writer: emit DDL marker
    Writer-->>Disp: drainMarker.done() via doneCh
    end
    
    rect rgba(100, 100, 200, 0.5)
    Note over Spool,Writer: Memory Pressure & Spill
    Writer->>Spool: Enqueue(msgs, onEnqueued)
    alt memory < quota
        Spool->>Spool: store in memory
    else memory >= quota
        Spool->>Spool: serialize and spill to disk
    end
    Writer->>Spool: Load(entry) for flush
    Spool-->>Writer: return msgs or read from disk
    Writer->>Spool: Release(entry)
    Spool->>Spool: decrement accounting
    alt mem drops below low watermark
        Spool-->>DML: invoke pending wake callbacks
    end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

The PR spans diverse architectural components (spool system, task-based pipeline, path state refactoring, dispatcher/sink integration, consumer utilities) with significant logic density in encoding group, writer flush logic, spool segment management, and path state cleanup. Multiple files require cross-domain understanding, though many changes follow consistent patterns within their cohorts.

Possibly related PRs

Suggested labels

lgtm, area/sink/cloud-storage, type/enhancement

Suggested reviewers

  • wk989898
  • hongyunyan
  • flowbehappy

🐰 Hops excitedly

A spool is born, with memory and grace,
Tasks flow through encoders at steady pace,
Dispatchers wake when buffers align,
DDL drains in order divine!
Storage spins with per-dispatcher care,
GA awaits—let's go there! 🚀

🚥 Pre-merge checks | ❌ 4
❌ Failed checks (3 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The description is entirely a blank template with no actual content, missing issue reference, change details, test information, compatibility notes, and release notes. Fill in all required sections: add issue number, describe the changes and how they work, specify tests included, address compatibility/performance concerns, and provide a release note or state 'None'.
Docstring Coverage ⚠️ Warning Docstring coverage is 12.70% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (146 files):

⚔️ api/v2/changefeed.go (content)
⚔️ api/v2/model.go (content)
⚔️ cmd/cdc/cli/cli_changefeed_helper.go (content)
⚔️ cmd/cdc/cli/cli_changefeed_helper_test.go (content)
⚔️ cmd/cdc/cli/cli_changefeed_move_table.go (content)
⚔️ cmd/cdc/cli/cli_changefeed_resume.go (content)
⚔️ cmd/cdc/cli/cli_changefeed_update.go (content)
⚔️ cmd/kafka-consumer/writer_test.go (content)
⚔️ cmd/storage-consumer/consumer.go (content)
⚔️ cmd/storage-consumer/main.go (content)
⚔️ cmd/util/event_group.go (content)
⚔️ cmd/util/event_group_test.go (content)
⚔️ coordinator/changefeed/changefeed.go (content)
⚔️ coordinator/changefeed/changefeed_test.go (content)
⚔️ coordinator/coordinator.go (content)
⚔️ deployments/kafka-consumer.Dockerfile (content)
⚔️ downstreamadapter/dispatcher/basic_dispatcher.go (content)
⚔️ downstreamadapter/dispatcher/basic_dispatcher_info.go (content)
⚔️ downstreamadapter/dispatcher/block_event_executor.go (content)
⚔️ downstreamadapter/dispatcher/event_dispatcher_test.go (content)
⚔️ downstreamadapter/dispatcher/redo_dispatcher_test.go (content)
⚔️ downstreamadapter/dispatcher/table_progress.go (content)
⚔️ downstreamadapter/dispatchermanager/dispatcher_manager.go (content)
⚔️ downstreamadapter/dispatchermanager/dispatcher_manager_helper.go (content)
⚔️ downstreamadapter/dispatchermanager/dispatcher_manager_info.go (content)
⚔️ downstreamadapter/dispatchermanager/dispatcher_manager_redo.go (content)
⚔️ downstreamadapter/dispatchermanager/dispatcher_manager_test.go (content)
⚔️ downstreamadapter/dispatchermanager/helper.go (content)
⚔️ downstreamadapter/dispatchermanager/helper_test.go (content)
⚔️ downstreamadapter/dispatchermanager/task.go (content)
⚔️ downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go (content)
⚔️ downstreamadapter/sink/blackhole/sink.go (content)
⚔️ downstreamadapter/sink/cloudstorage/dml_writers.go (content)
⚔️ downstreamadapter/sink/cloudstorage/encoding_group.go (content)
⚔️ downstreamadapter/sink/cloudstorage/sink.go (content)
⚔️ downstreamadapter/sink/cloudstorage/writer.go (content)
⚔️ downstreamadapter/sink/cloudstorage/writer_test.go (content)
⚔️ downstreamadapter/sink/eventrouter/partition/index_value.go (content)
⚔️ downstreamadapter/sink/eventrouter/topic/expression_test.go (content)
⚔️ downstreamadapter/sink/kafka/sink.go (content)
⚔️ downstreamadapter/sink/metrics/cloudstorage.go (content)
⚔️ downstreamadapter/sink/mock_sink.go (content)
⚔️ downstreamadapter/sink/mysql/sink.go (content)
⚔️ downstreamadapter/sink/mysql/sink_test.go (content)
⚔️ downstreamadapter/sink/pulsar/sink.go (content)
⚔️ downstreamadapter/sink/redo/sink.go (content)
⚔️ downstreamadapter/sink/sink.go (content)
⚔️ go.mod (content)
⚔️ go.sum (content)
⚔️ heartbeatpb/heartbeat.pb.go (content)
⚔️ heartbeatpb/heartbeat.proto (content)
⚔️ logservice/eventstore/event_store.go (content)
⚔️ logservice/logpuller/subscription_client.go (content)
⚔️ logservice/schemastore/multi_version.go (content)
⚔️ logservice/schemastore/persist_storage_ddl_handlers.go (content)
⚔️ logservice/txnutil/lock_resolver.go (content)
⚔️ maintainer/barrier_event.go (content)
⚔️ maintainer/barrier_test.go (content)
⚔️ maintainer/maintainer_controller.go (content)
⚔️ maintainer/maintainer_controller_bootstrap.go (content)
⚔️ maintainer/maintainer_controller_helper.go (content)
⚔️ maintainer/maintainer_helper.go (content)
⚔️ maintainer/operator/operator_add.go (content)
⚔️ maintainer/operator/operator_add_test.go (content)
⚔️ maintainer/operator/operator_controller.go (content)
⚔️ maintainer/operator/operator_controller_test.go (content)
⚔️ maintainer/operator/operator_move.go (content)
⚔️ maintainer/operator/operator_remove.go (content)
⚔️ maintainer/operator/operator_remove_test.go (content)
⚔️ maintainer/operator/operator_split.go (content)
⚔️ maintainer/replica/replication_span.go (content)
⚔️ maintainer/replica/replication_span_test.go (content)
⚔️ maintainer/scheduler/balance_splits.go (content)
⚔️ maintainer/scheduler/basic.go (content)
⚔️ maintainer/span/span_controller.go (content)
⚔️ metrics/grafana/ticdc_new_arch.json (content)
⚔️ metrics/nextgengrafana/ticdc_new_arch_next_gen.json (content)
⚔️ metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json (content)
⚔️ pkg/api/util.go (content)
⚔️ pkg/api/v2/changefeed.go (content)
⚔️ pkg/api/v2/mock/changefeed_mock.go (content)
⚔️ pkg/applier/redo_test.go (content)
⚔️ pkg/applier/storage.go (content)
⚔️ pkg/common/column.go (content)
⚔️ pkg/common/event/dml_event.go (content)
⚔️ pkg/common/event/dml_event_test.go (content)
⚔️ pkg/common/event/table_schema_store.go (content)
⚔️ pkg/common/table_info.go (content)
⚔️ pkg/common/table_info_helper.go (content)
⚔️ pkg/common/types.go (content)
⚔️ pkg/config/changefeed.go (content)
⚔️ pkg/config/replica_config.go (content)
⚔️ pkg/config/sink.go (content)
⚔️ pkg/diff/util.go (content)
⚔️ pkg/errors/error.go (content)
⚔️ pkg/eventservice/main_test.go (content)
⚔️ pkg/leakutil/leak_helper_test.go (content)
⚔️ pkg/metrics/sink.go (content)
⚔️ pkg/notify/notify.go (content)
⚔️ pkg/orchestrator/etcd_worker.go (content)
⚔️ pkg/pdutil/api_client.go (content)
⚔️ pkg/scheduler/balance.go (content)
⚔️ pkg/sink/cloudstorage/config.go (content)
⚔️ pkg/sink/cloudstorage/config_test.go (content)
⚔️ pkg/sink/cloudstorage/path.go (content)
⚔️ pkg/sink/cloudstorage/path_test.go (content)
⚔️ pkg/sink/mysql/config.go (content)
⚔️ pkg/sink/mysql/config_test.go (content)
⚔️ pkg/sink/mysql/helper.go (content)
⚔️ pkg/sink/mysql/mysql_writer.go (content)
⚔️ pkg/sink/mysql/mysql_writer_ddl_ts_test.go (content)
⚔️ pkg/sink/mysql/mysql_writer_dml.go (content)
⚔️ pkg/sink/mysql/mysql_writer_dml_test.go (content)
⚔️ pkg/sink/mysql/mysql_writer_for_ddl_ts.go (content)
⚔️ pkg/sink/mysql/mysql_writer_test.go (content)
⚔️ pkg/sink/mysql/sql_builder.go (content)
⚔️ pkg/sink/sqlmodel/multi_row.go (content)
⚔️ pkg/sink/sqlmodel/row_change.go (content)
⚔️ pkg/sink/sqlmodel/utils.go (content)
⚔️ pkg/sink/sqlmodel/where_handle.go (content)
⚔️ pkg/txnutil/gc/gc_manager.go (content)
⚔️ pkg/txnutil/gc/gc_service.go (content)
⚔️ pkg/txnutil/gc/testing.go (content)
⚔️ pkg/upstream/upstream.go (content)
⚔️ scripts/generate-mock.sh (content)
⚔️ tests/integration_tests/_utils/move_table_with_retry (content)
⚔️ tests/integration_tests/_utils/run_pulsar_consumer (content)
⚔️ tests/integration_tests/_utils/split_table_with_retry (content)
⚔️ tests/integration_tests/api_v2/model.go (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/conf/diff_config.toml (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/data/downstream_prepare.sql (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/data/upstream_dml.sql (content)
⚔️ tests/integration_tests/correctness_for_shared_column_schema/data/upstream_prepare.sql (content)
⚔️ tests/integration_tests/ddl_for_split_tables_with_merge_and_split/run.sh (content)
⚔️ tests/integration_tests/ddl_wait/run.sh (content)
⚔️ tests/integration_tests/gc_safepoint/run.sh (content)
⚔️ tests/integration_tests/generate_column/data/stored.sql (content)
⚔️ tests/integration_tests/generate_column/data/virtual.sql (content)
⚔️ tests/integration_tests/generate_column/run.sh (content)
⚔️ tests/integration_tests/http_api/util/test_case.py (content)
⚔️ tests/integration_tests/http_api_tls/util/test_case.py (content)
⚔️ tests/integration_tests/http_api_tls_with_user_auth/util/test_case.py (content)
⚔️ tests/integration_tests/in_flight_ddl_during_scheduling/run.sh (content)
⚔️ tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh (content)
⚔️ tests/integration_tests/move_table/main.go (content)
⚔️ tests/integration_tests/run_light_it_in_ci.sh (content)

These conflicts must be resolved before merging into master.
Resolve conflicts locally and push changes to this branch.
Title check ❓ Inconclusive The title is extremely vague and generic, using 'enhance' without specifying which aspects or features are being enhanced. Replace with a more specific title that describes the main enhancement, e.g., 'storage: implement spool-based buffering for cloud storage sink' or 'storage: refactor cloud storage sink with task-based pipeline'.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
⚔️ Resolve merge conflicts (beta)
  • Auto-commit resolved conflicts to branch enhance-storage-sink
  • Post resolved changes as copyable diffs in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers a significant architectural upgrade to the cloud storage sink, aiming to achieve General Availability status by addressing critical performance, scalability, and correctness concerns. The core problem of low throughput due to tick-limited flushing is resolved through a two-stage acknowledgment mechanism and an intelligent internal spool for buffering. DDL event ordering is meticulously preserved using a novel DrainMarker approach, ensuring data consistency. The internal data processing pipeline has been streamlined for better parallelism and efficiency, particularly for scenarios involving numerous dispatchers. These changes are thoroughly documented with detailed design, implementation, and review plans, alongside new monitoring capabilities.

Highlights

  • Two-Stage Acknowledgment for DML: Implemented a two-stage acknowledgment model for DML events, decoupling the event wake-up from checkpoint advancement. This allows for earlier wake-up of dynamic streams, improving throughput by enabling the sink to continuously receive and aggregate data more effectively before flushing.
  • Spool Manager for Buffering and Backpressure: Introduced a new internal spool manager with both memory and disk tiers. This manager provides robust buffering for encoded messages and uses internal watermarks to control backpressure, preventing unbounded memory growth while absorbing bursts of data.
  • DDL Ordering with DrainMarker: Enhanced DDL event processing with a PassBlockEvent mechanism and DrainMarkers. This ensures that all DML events preceding a DDL are fully flushed to remote storage before the DDL's schema file is written, guaranteeing correctness in the new early-wake model.
  • Refactored Encoding and Writing Pipeline: The global defragmenter component has been removed to eliminate head-of-line blocking. The encoding and writing pipeline is now sharded and uses a taskIndexer for efficient, ordered processing per dispatcher, improving scalability and parallelism.
  • Enhanced Scalability for Dispatchers: Improved per-dispatcher state management within the FilePathGenerator by introducing TTL-based cleanup and active set processing. This supports a large number of dispatchers (up to 1000) with controlled memory usage and reduced CPU overhead.
  • Comprehensive Observability Metrics: Added new metrics for spool usage, flush reasons (e.g., interval, size, DDL), DDL drain duration, and shard busy time. These metrics provide deeper insights into the sink's performance and internal state, aiding in troubleshooting and monitoring.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • docs/plans/2026-02-03-storage-sink-ga.md
    • Added a superseded GA plan document for storage sink.
  • docs/plans/2026-02-04-cloudstorage-sink-ga-ddl-behavior.md
    • Added a detailed document analyzing DDL behavior in the cloud storage sink, especially with table splitting and barrier mechanisms.
  • docs/plans/2026-02-04-cloudstorage-sink-ga-design.md
    • Added a comprehensive design document outlining the architecture, two-stage ACK, spool, DDL drain, and scalability for the cloud storage sink GA.
  • docs/plans/2026-02-04-cloudstorage-sink-ga-implementation.md
    • Added an implementation plan detailing milestones, code changes, testing, and risk mitigation for the cloud storage sink GA.
  • docs/plans/2026-02-04-cloudstorage-sink-ga-requirements.md
    • Added a requirements document specifying goals, non-goals, scope, constraints, and acceptance criteria for the cloud storage sink GA.
  • docs/plans/2026-02-04-cloudstorage-sink-ga-spool.md
    • Added a detailed document explaining the design and functionality of the spool manager, including memory/disk tiers and watermarks.
  • docs/plans/2026-02-14-cloudstorage-sink-spool-review-recommendations.md
    • Added a document providing review recommendations and risk points for the cloud storage sink spool and task pipeline changes.
  • downstreamadapter/dispatcher/basic_dispatcher.go
    • Updated DML event handling to use AddPostEnqueueFunc for early-wake.
    • Integrated PassBlockEvent call for DDLs before AddBlockEventToSink to ensure per-dispatcher drain.
  • downstreamadapter/sink/blackhole/sink.go
    • Added a no-op PassBlockEvent method to conform to the updated Sink interface.
  • downstreamadapter/sink/cloudstorage/defragmenter.go
    • Removed the defragmenter component.
  • downstreamadapter/sink/cloudstorage/defragmenter_test.go
    • Removed the test file for the defragmenter.
  • downstreamadapter/sink/cloudstorage/dml_writers.go
    • Refactored dmlWriters to remove the defragmenter.
    • Integrated the new spoolpkg.Manager.
    • Updated Run method to reflect the new task processing pipeline.
    • Implemented PassBlockEvent to handle DDL drain tasks.
    • Modified AddDMLEvent to push newDMLTask.
  • downstreamadapter/sink/cloudstorage/encoding_group.go
    • Refactored the encodingGroup to use taskFuture and taskIndexer for sharded processing.
    • Updated runEncoder to process taskFutures and encode DML tasks.
    • Added Add method for routing tasks to input/output shards.
    • Added ConsumeOutputShard for writers to consume encoded tasks.
  • downstreamadapter/sink/cloudstorage/encoding_group_test.go
    • Added new tests for taskIndexer routing and encodingGroup DML task encoding.
  • downstreamadapter/sink/cloudstorage/sink.go
    • Updated comments to reflect the new data flow without a defragmenter.
    • Modified New function to initialize dmlWriters with context and handle errors.
    • Implemented PassBlockEvent by delegating to s.dmlWriters.PassBlockEvent.
  • downstreamadapter/sink/cloudstorage/spool/codec.go
    • Added utility functions for serializing and deserializing messages for the spool.
  • downstreamadapter/sink/cloudstorage/spool/manager.go
    • Added the implementation of the spool.Manager for buffering encoded messages with memory and disk tiers.
    • Implemented Enqueue, Load, Release, and Close methods for spool management.
    • Incorporated watermark logic for wake suppression and resumption.
  • downstreamadapter/sink/cloudstorage/spool/manager_test.go
    • Added tests for the spool manager's wake suppression, resumption, and spill/read back functionality.
  • downstreamadapter/sink/cloudstorage/task.go
    • Added definitions for task (DML or Drain) and drainMarker structs to represent processing units in the new pipeline.
  • downstreamadapter/sink/cloudstorage/task_indexer.go
    • Added taskIndexer to manage routing logic for tasks to encoder input shards and writer output shards.
  • downstreamadapter/sink/cloudstorage/writer.go
    • Refactored newWriter to integrate the spoolpkg.Manager and use a task-based input channel.
    • Updated flushMessages to handle writerTasks, including drain markers, and observe flush tasks with explicit reasons.
    • Modified writeDataFile to load messages from spool entries if spilled.
    • Refactored genAndDispatchTask to build table batches and emit writerTasks with specific flushReasons.
    • Added enqueueTask and closeInput methods for task management.
  • downstreamadapter/sink/cloudstorage/writer_test.go
    • Updated TestWriterRun to use the new task types and enqueue mechanism.
    • Added TestWriterDrainMarker to verify DDL drain functionality.
  • downstreamadapter/sink/kafka/sink.go
    • Added a no-op PassBlockEvent method to conform to the updated Sink interface.
  • downstreamadapter/sink/metrics/cloudstorage.go
    • Added new Prometheus metrics: CloudStorageFlushTaskCounter, CloudStorageFlushDurationByReasonHistogram, CloudStorageFlushFileSizeHistogram, CloudStorageDDLDrainDurationHistogram, CloudStorageSpoolMemoryBytesGauge, CloudStorageSpoolDiskBytesGauge, CloudStorageSpoolTotalBytesGauge, CloudStorageWakeSuppressedCounter.
    • Renamed CloudStorageWorkerBusyRatio to CloudStorageShardBusySeconds and updated its help text.
    • Updated InitCloudStorageMetrics to register all new and modified metrics.
  • downstreamadapter/sink/mock_sink.go
    • Added a no-op PassBlockEvent method to conform to the updated Sink interface.
  • downstreamadapter/sink/mysql/sink.go
    • Added a no-op PassBlockEvent method to conform to the updated Sink interface.
  • downstreamadapter/sink/pulsar/sink.go
    • Added a no-op PassBlockEvent method to conform to the updated Sink interface.
  • downstreamadapter/sink/redo/sink.go
    • Added a no-op PassBlockEvent method to conform to the updated Sink interface.
  • downstreamadapter/sink/sink.go
    • Added PassBlockEvent to the Sink interface.
  • pkg/common/event/dml_event.go
    • Added PostTxnEnqueued slice and postEnqueueCalled flag for early-wake functionality.
    • Modified PostFlush() to call PostEnqueue().
    • Added PostEnqueue(), ClearPostEnqueueFunc(), and AddPostEnqueueFunc() methods.
  • pkg/common/event/dml_event_test.go
    • Added tests for PostEnqueue functionality and its interaction with PostFlush.
  • pkg/config/sink.go
    • Added SpoolDiskQuota field to CloudStorageConfig.
  • pkg/sink/cloudstorage/config.go
    • Changed defaultEnableTableAcrossNodes to false.
    • Added defaultSpoolDiskQuota constant.
    • Updated urlConfig and Config structs to include SpoolDiskQuota.
    • Modified Apply and mergeConfig functions to handle SpoolDiskQuota.
    • Added getSpoolDiskQuota function for parsing spool disk quota from URL parameters.
  • pkg/sink/cloudstorage/config_test.go
    • Updated TestConfigApply to reflect new default values for SpoolDiskQuota and EnableTableAcrossNodes.
    • Added TestSpoolDiskQuotaConfig to verify spool disk quota configuration parsing.
  • pkg/sink/cloudstorage/path.go
    • Removed hasher and versionMap fields.
    • Introduced tablePathStateKey and tablePathState structs for improved path state management.
    • Refactored FilePathGenerator to use a pathState map with TTL-based cleanup.
    • Updated CheckOrWriteSchema, generateDataDirPath, and generateDataFileName to utilize the new path state management.
    • Added helper methods like setPathStateCleanupConfig, pathStateCount, currentSchemaVersion, ensurePathState, updateSchemaVersion, and cleanupExpiredPathState.
  • pkg/sink/cloudstorage/path_test.go
    • Updated tests to align with the new path state management logic.
    • Added TestPathStateCleanup to verify the TTL-based cleanup of path states.
  • storage-sink-encoder-group.md
    • Added a design document discussing options for the storage sink encoder group, focusing on removing the defragmenter and ensuring ordering.
  • storage-sink-monitoring.md
    • Added monitoring and performance notes for the storage sink, identifying bottlenecks and suggesting new metrics.
  • storage-sink-slow.md
    • Added an analysis document detailing the root cause of low write throughput in the storage sink and proposing solutions.
  • storage-sink.md
    • Added a detailed analysis of the cloud storage sink's implementation principles, performance bottlenecks, and optimization strategies.
Activity
  • The pull request introduces a comprehensive set of new documentation files, detailing the requirements, design, implementation plan, DDL behavior, and spool mechanism for the cloud storage sink's GA release.
  • Review recommendations, performance analysis, and monitoring guidelines are provided in dedicated markdown files.
  • The core DML processing pipeline has been significantly refactored, removing the global defragmenter and introducing a sharded approach for encoding and writing.
  • A new spool.Manager has been implemented to provide robust buffering with memory and disk tiers, along with watermark-based backpressure control.
  • The DDL handling has been enhanced with PassBlockEvent and DrainMarker to ensure strict ordering guarantees in the new early-wake environment.
  • The Sink interface has been extended with a PassBlockEvent method, and all other sink implementations have been updated with no-op versions.
  • The DMLEvent structure now includes PostTxnEnqueued callbacks to support the two-stage acknowledgment model.
  • Configuration options for the cloud storage sink have been updated to include SpoolDiskQuota and adjust defaultEnableTableAcrossNodes.
  • Path state management in FilePathGenerator has been refactored to be more scalable and efficient, incorporating TTL-based cleanup.
  • Extensive new Prometheus metrics have been added to provide detailed observability into the sink's internal operations, including spool usage, flush reasons, and DDL drain durations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@3AceShowHand 3AceShowHand changed the title storage: enhance the storage sink [DNM]storage: enhance the storage sink Feb 14, 2026
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-documented enhancement to the storage sink. The core changes include a major refactoring to improve performance and scalability by introducing a spooling mechanism with disk spillover, a two-stage ACK for DMLs to enable early wake-up, and a new DDL drain mechanism to ensure correctness. The removal of the defragmenter in favor of a sharded encoder pipeline is a key improvement to eliminate a global head-of-line blocking point. The implementation is supported by extensive design documentation and appears to be solid. I have one suggestion to improve metric consistency.

return 0, 0, errors.Trace(inErr)
}

d.metricFlushDuration.Observe(time.Since(start).Seconds())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The metric observed here for multipart data file writes (metricFlushDuration) is different from the one used for single-part writes (metricWriteDuration on line 335). This inconsistency can make it harder to analyze data file write performance. For better clarity and consistency in monitoring, I suggest using metricWriteDuration for all data file write operations, regardless of whether they are single-part or multi-part.

Suggested change
d.metricFlushDuration.Observe(time.Since(start).Seconds())
d.metricWriteDuration.Observe(time.Since(start).Seconds())

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/sink/cloudstorage/writer.go (1)

249-276: ⚠️ Potential issue | 🟠 Major

ignoreTableTask: spool Load error discards messages without propagating callbacks.

When Load fails at line 259, the code logs a warning, releases the entry, and continues — but the entry.callbacks (which contain PostFlush callbacks for checkpoint advancement) are never invoked. This means a Load failure could stall table progress permanently since the callbacks that drive tableProgress / checkpoint advancement are lost.

Consider invoking entry's callbacks even when Load fails, or at minimum propagating the error upward rather than silently swallowing it.

🤖 Fix all issues with AI agents
In `@docs/plans/2026-02-04-cloudstorage-sink-ga-design.md`:
- Around line 252-263: The design requires three observability metrics that are
missing from downstreamadapter/sink/metrics/cloudstorage.go: add definitions,
registration, and usage for spool_items, wake_calls_total, and
ddl_affected_dispatchers; specifically declare Prometheus metrics (e.g., a Gauge
or GaugeVec named SpoolItems, a CounterVec named WakeCallsTotal, and a Gauge or
CounterVec named DDLAffectedDispatchers), register them in the same
init/registry block currently used by other metrics in cloudstorage.go, and wire
increments/sets where the spool and DDL code update counts (places that
currently update spool_bytes, wake_suppressed_total, and
ddl_drain_duration_seconds). If you prefer not to implement them now, update the
design text to mark spool_items, wake_calls_total, and ddl_affected_dispatchers
as optional/deferred instead.

In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Around line 179-202: PassBlockEvent pushes a drain task into d.msgCh and then
blocks waiting on doneCh, which can hang indefinitely if the encoding/dispatch
pipeline has already exited; fix by making the wait use the pipeline's
Run()-scoped context (or a derived cancel) instead of the long-lived d.ctx, or
ensure the pipeline closes/returns an error into doneCh when it shuts down
(e.g., detect shutdown in the dispatcher/Run() and send an error on doneCh or
close it). Update PassBlockEvent to select on doneCh, the Run()-scoped context
(or a context produced/owned by Run()), and only fall back to d.ctx as a last
resort; also modify the dispatcher/Run() path that consumes d.msgCh to inject an
error into the corresponding doneCh when it exits so blocked callers of
PassBlockEvent are unblocked.

In `@downstreamadapter/sink/cloudstorage/spool/codec.go`:
- Around line 52-59: In deserializeMessages validate the uint32 count read from
binary before calling make(..., count): check that count is reasonable (e.g.,
not larger than len(data) and below a configured/constant upper bound or <=
uint32(len(data)/minMessageSize)), and if it exceeds the sanity limit return an
error instead of attempting allocation; update the allocation to use the
validated/capped count and document the chosen bound; reference the
deserializeMessages function and the local variable count when making this
change.

In `@downstreamadapter/sink/cloudstorage/spool/manager.go`:
- Around line 349-351: The allocation of callbacksToRun uses len(s.pendingWake)
before acquiring the mutex, causing a data race; move the make([]func(), 0,
len(s.pendingWake)) so it occurs after s.mu.Lock() (or compute the length into a
local variable while holding the lock) to ensure len(s.pendingWake) is read only
while s.mu is held; update the code around callbacksToRun, s.pendingWake and
s.mu accordingly so the capacity hint is computed under the lock.
- Around line 174-179: The startup code calling os.RemoveAll(rootDir) in
manager.go is unsafe if the changefeed ID components (keyspace, name, ID) are
empty or malformed; update the initialization in the spool manager (the code
that builds rootDir before calling os.RemoveAll and os.MkdirAll) to validate
inputs and the computed path: ensure keyspace, name and ID are non-empty, run
filepath.Clean on rootDir, and verify that the cleaned rootDir contains the
expected defaultDirectoryName segment (or matches an allowed base prefix) before
allowing os.RemoveAll; if validation fails, return an error instead of deleting.
- Around line 310-342: Load has a race where Manager.Close can close
spoolSegment.file after s.mu is released but before file.ReadAt, causing "bad
file descriptor"; fix by holding the appropriate lock across the I/O or
switching to an RW lock: either acquire the mutex (s.mu) and defer its Unlock
until after the ReadAt so the file handle cannot be closed during the read, or
change s.mu to a sync.RWMutex and take RLock in Manager.Load (and have Close
take Lock) so readers hold the lock for the duration of file.ReadAt; ensure
checks of s.closed and lookup of s.segments/entry.pointer happen under that same
lock and reference spoolSegment.file only while the lock is held (or use
reference counting on the file handle if preferred).

In `@downstreamadapter/sink/metrics/cloudstorage.go`:
- Around line 73-79: The histogram CloudStorageFlushFileSizeHistogram currently
uses prometheus.ExponentialBuckets(1024, 2.0, 16) which only covers up to 32 MiB
and will put ~64 MiB files into +Inf; adjust the bucket configuration so it
covers at least 128 MiB (e.g., increase the count to 17 or 18, or raise the
start value) by updating the call to prometheus.ExponentialBuckets in the
CloudStorageFlushFileSizeHistogram definition to use a larger bucket count
(17/18) or a larger initial size so the histogram spans beyond the default 64
MiB file-size target.
🧹 Nitpick comments (17)
pkg/sink/cloudstorage/config.go (1)

264-277: Consider adding an upper-bound guard for spool-disk-quota.

Other config fields like file-size and worker-count have upper-bound limits with warning logs and clamping. A very large spool disk quota could exhaust disk space. Consider adding a soft upper limit with a warning, similar to the pattern used for file-size and worker-count.

pkg/sink/cloudstorage/config_test.go (1)

171-193: Good test coverage for spool-disk-quota precedence.

Covers both URI-override and replica-config-fallback scenarios. Consider adding a negative test case (e.g., spool-disk-quota=0 or spool-disk-quota=-1) to verify the validation error path in getSpoolDiskQuota.

pkg/sink/cloudstorage/path.go (1)

381-411: Double cleanup call per GenerateDataFilePath invocation.

cleanupExpiredPathState is called in generateDataFileName (line 385), and ensurePathState + currentTime are called in both generateDataDirPath (lines 358-359) and generateDataFileName (lines 384-386) for the same table key within a single GenerateDataFilePath call. The redundant map lookups are minor but could be streamlined by lifting the state lookup to GenerateDataFilePath and passing it down.

pkg/sink/cloudstorage/path_test.go (1)

338-338: Consider using a dedicated API method instead of directly accessing f.pathState.

Lines 77, 90, 114, etc. consistently use the new encapsulated API (setCurrentSchemaVersion, currentSchemaVersion, pathStateCount), but this line directly accesses the internal pathState map. A method like removePathState(table) would maintain consistency.

downstreamadapter/sink/cloudstorage/task.go (1)

80-85: Silent drop on doneCh if done() is called more than once.

The non-blocking send silently discards the error if the channel buffer is already full (e.g., done() called twice). If the intent is exactly-once signaling, consider adding a guard (e.g., sync.Once) or at minimum a log warning on the default branch to aid debugging.

♻️ Optional: add a warning on the default path
 func (m *drainMarker) done(err error) {
 	select {
 	case m.doneCh <- err:
 	default:
+		log.Warn("drain marker done called more than once or channel full",
+			zap.Stringer("dispatcherID", m.dispatcherID),
+			zap.Uint64("commitTs", m.commitTs))
 	}
 }
storage-sink-monitoring.md (1)

20-23: Minor: Fenced code block missing language specifier.

The ASCII diagram code block should have a language hint (e.g., ```text) to satisfy markdownlint MD040 and improve rendering in some Markdown viewers.

downstreamadapter/sink/cloudstorage/task_indexer.go (1)

29-31: dispatcherToShard map grows without bound — no eviction mechanism.

The design doc (Section 7.1) states that per-dispatcher state "must support reclaim (TTL/LRU or writer lifecycle bound)." This map only grows as new dispatchers are seen but never shrinks. Under dispatcher churn over a long-running deployment, this will leak memory proportionally to the total number of historically-seen dispatchers rather than the active set.

Consider adding a remove(dispatcherID) method or a periodic purge so the caller can clean up stale entries.

downstreamadapter/sink/cloudstorage/spool/codec.go (1)

24-49: Consider direct byte encoding instead of binary.Write for hot-path performance.

binary.Write with a uint32 argument uses reflection internally (it calls dataSize(data)reflect.ValueOf). On a spool serialization path that processes every DML batch, this overhead adds up. Direct little-endian encoding into a []byte (or using binary.LittleEndian.PutUint32) is significantly faster.

♻️ Example using direct encoding (serialize side)
 func serializeMessages(msgs []*codeccommon.Message) ([]byte, error) {
-	buf := bytes.NewBuffer(make([]byte, 0, 128))
-	if err := binary.Write(buf, binary.LittleEndian, uint32(len(msgs))); err != nil {
-		return nil, errors.Trace(err)
-	}
-	for _, msg := range msgs {
-		keyLen := uint32(len(msg.Key))
-		valueLen := uint32(len(msg.Value))
-		rows := uint32(msg.GetRowsCount())
-		if err := binary.Write(buf, binary.LittleEndian, keyLen); err != nil {
-			return nil, errors.Trace(err)
-		}
-		if err := binary.Write(buf, binary.LittleEndian, valueLen); err != nil {
-			return nil, errors.Trace(err)
-		}
-		if err := binary.Write(buf, binary.LittleEndian, rows); err != nil {
-			return nil, errors.Trace(err)
-		}
-		if _, err := buf.Write(msg.Key); err != nil {
-			return nil, errors.Trace(err)
-		}
-		if _, err := buf.Write(msg.Value); err != nil {
-			return nil, errors.Trace(err)
-		}
-	}
-	return buf.Bytes(), nil
+	// Pre-calculate total size to avoid repeated buffer growth.
+	totalSize := 4 // count header
+	for _, msg := range msgs {
+		totalSize += 12 + len(msg.Key) + len(msg.Value) // 3 x uint32 headers + payload
+	}
+	buf := make([]byte, totalSize)
+	offset := 0
+	binary.LittleEndian.PutUint32(buf[offset:], uint32(len(msgs)))
+	offset += 4
+	for _, msg := range msgs {
+		binary.LittleEndian.PutUint32(buf[offset:], uint32(len(msg.Key)))
+		offset += 4
+		binary.LittleEndian.PutUint32(buf[offset:], uint32(len(msg.Value)))
+		offset += 4
+		binary.LittleEndian.PutUint32(buf[offset:], uint32(msg.GetRowsCount()))
+		offset += 4
+		copy(buf[offset:], msg.Key)
+		offset += len(msg.Key)
+		copy(buf[offset:], msg.Value)
+		offset += len(msg.Value)
+	}
+	return buf, nil
 }
downstreamadapter/sink/cloudstorage/encoding_group_test.go (2)

58-61: Unbounded retry loop when seeking distinct shards could theoretically hang.

If GID.Hash were to map all dispatchers to the same shard (e.g., a degenerate hash), this loop would never terminate. The same pattern appears at lines 82-85 in TestEncodingGroupRouteByDispatcher. Adding a max iteration count (e.g., 100) with t.Fatal would make the tests more robust against unexpected hash behavior.


216-233: Test helper newTestDMLEvent directly constructs a DMLEvent struct literal — verify required fields.

The DMLEvent struct (from the relevant snippet) has many fields. The helper only sets a subset. Notably, Length, RowTypes, CommitTs, and StartTs are zero-valued. If the encoder accesses these fields, the test may not exercise realistic behavior. This is acceptable for a routing/plumbing test, but consider adding a comment noting this is a minimal stub.

downstreamadapter/sink/cloudstorage/encoding_group.go (1)

135-156: Ordering dependency between inputCh and outputCh sends in Add() — verify no deadlock under saturation.

Add() sends the same taskFuture to inputCh first (Line 147), then to outputCh (Line 153). If both channels are near capacity (1024 each), there's a window where the future is in inputCh (being processed by the encoder, which calls finish()) before it appears in outputCh. This is safe because Ready() handles an already-closed done channel correctly, but it means the encoder may finish encoding before the consumer can even see the future.

One edge case worth noting: if Add() blocks on the outputCh send (Line 153) while the encoder has already called finish() on the future, the encoded payload sits in memory without being consumed until outputCh drains. Under prolonged saturation, this means up to defaultChannelSize encoded payloads may linger in memory beyond spool accounting. Depending on message sizes, this could be significant.

Consider documenting this memory accounting gap or bounding the combined channel depth.

docs/plans/2026-02-04-cloudstorage-sink-ga-requirements.md (1)

148-155: Track the config inconsistency identified in Line 151.

The document notes that defaultEnableTableAcrossNodes in pkg/sink/cloudstorage/config.go is inconsistent with the GA default of false. This should be tracked as a concrete work item (e.g., a follow-up issue) to avoid it being forgotten post-merge.

Would you like me to open an issue to track the defaultEnableTableAcrossNodes config alignment?

storage-sink.md (1)

62-80: Add a language specifier to the fenced code block.

The ASCII diagram code block at line 62 lacks a language specifier, flagged by markdownlint (MD040). Use ```text for plain-text diagrams.

storage-sink-slow.md (1)

62-64: Add language specifiers to fenced code blocks.

Lines 62, 73, and 167 have fenced code blocks without language specifiers (MD040). Use ```text for formulae/diagrams.

downstreamadapter/sink/cloudstorage/writer.go (2)

411-419: Silent drop of interval-triggered flush when toBeFlushedCh is full.

The default case at line 418 silently drops the interval flush if the channel is full. The batchedTask is not reset, so data isn't lost — it accumulates and will be retried on the next tick. However, this means interval flushes can be arbitrarily delayed under back-pressure, and the batch can grow well beyond FlushInterval-worth of data without any observability signal.

Consider logging at debug level when this happens, or tracking it as a metric, so operators can identify when flush back-pressure is occurring.


62-69: flushReason as string type — consider using integer constants.

Using a string-typed enum means every metric label construction allocates (or at least compares) strings. Since these values are used on the hot flush path for Prometheus labels, iota-based int constants with a String() method would be more efficient and less error-prone.

downstreamadapter/sink/cloudstorage/spool/manager.go (1)

379-384: Clamping negative bytes without logging masks accounting bugs.

If memoryBytes or diskBytes go negative, the code silently clamps to 0. This is good defensive coding, but negative values indicate a double-release or accounting mismatch that should be investigated. Consider adding a warning log when this occurs.

🔍 Suggested improvement
 	if s.memoryBytes < 0 {
+		log.Warn("spool memory bytes went negative, clamping to zero",
+			zap.Int64("memoryBytes", s.memoryBytes),
+			zap.String("keyspace", s.metricKeyspace),
+			zap.String("changefeed", s.metricChangefeedLabel))
 		s.memoryBytes = 0
 	}
 	if s.diskBytes < 0 {
+		log.Warn("spool disk bytes went negative, clamping to zero",
+			zap.Int64("diskBytes", s.diskBytes),
+			zap.String("keyspace", s.metricKeyspace),
+			zap.String("changefeed", s.metricChangefeedLabel))
 		s.diskBytes = 0
 	}

Comment on lines 252 to 263
## 8. 可观测性

必须提供(至少):
- spool:`spool_bytes`, `spool_items`, `wake_calls_total`, `wake_suppressed_total`
- flush:
- `flush_count{reason=size|interval|ddl|close|error}`
- `flush_duration_seconds{reason=...}`
- `flush_file_size_bytes`(直方图,观测贴近 file-size)
- DDL:
- `ddl_drain_duration_seconds`
- `ddl_affected_dispatchers`
- (可选)`ddl_drain_wait_encode_seconds`, `ddl_drain_wait_remote_seconds`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if ddl_affected_dispatchers metric is defined anywhere in the codebase
rg -rn 'affected_dispatchers' --type=go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

#!/bin/bash
# Find the metrics file
fd 'cloudstorage\.go' --type f | grep -i metric

Repository: pingcap/ticdc

Length of output: 103


🏁 Script executed:

#!/bin/bash
# Also search for metrics definitions more broadly
fd 'metrics' --type d | head -20

Repository: pingcap/ticdc

Length of output: 110


🏁 Script executed:

#!/bin/bash
# Check the contents of the cloudstorage metrics file
cat -n downstreamadapter/sink/metrics/cloudstorage.go | head -200

Repository: pingcap/ticdc

Length of output: 7294


🏁 Script executed:

#!/bin/bash
# Search for spool_items and wake_calls metrics
rg -rn 'spool_items|wake_calls_total' --type=go

Repository: pingcap/ticdc

Length of output: 39


Three observability metrics specified in the design are not implemented.

Section 8 specifies three metrics as required ("必须提供"), but they are not defined in downstreamadapter/sink/metrics/cloudstorage.go:

  • spool_items (spool observability)
  • wake_calls_total (spool observability)
  • ddl_affected_dispatchers (DDL observability)

The other metrics listed (flush metrics, ddl_drain_duration_seconds, spool bytes/memory, wake suppressed) are correctly implemented. These three missing metrics should either be added to the metrics file or explicitly marked as optional/deferred in the design.

🤖 Prompt for AI Agents
In `@docs/plans/2026-02-04-cloudstorage-sink-ga-design.md` around lines 252 - 263,
The design requires three observability metrics that are missing from
downstreamadapter/sink/metrics/cloudstorage.go: add definitions, registration,
and usage for spool_items, wake_calls_total, and ddl_affected_dispatchers;
specifically declare Prometheus metrics (e.g., a Gauge or GaugeVec named
SpoolItems, a CounterVec named WakeCallsTotal, and a Gauge or CounterVec named
DDLAffectedDispatchers), register them in the same init/registry block currently
used by other metrics in cloudstorage.go, and wire increments/sets where the
spool and DDL code update counts (places that currently update spool_bytes,
wake_suppressed_total, and ddl_drain_duration_seconds). If you prefer not to
implement them now, update the design text to mark spool_items,
wake_calls_total, and ddl_affected_dispatchers as optional/deferred instead.

Comment on lines +179 to +202
func (d *dmlWriters) PassBlockEvent(event commonEvent.BlockEvent) error {
if event == nil {
return nil
}

start := time.Now()
defer sinkmetrics.CloudStorageDDLDrainDurationHistogram.WithLabelValues(
d.changefeedID.Keyspace(),
d.changefeedID.ID().String(),
).Observe(time.Since(start).Seconds())

doneCh := make(chan error, 1)
// Invariant for DDL ordering:
// marker follows the same dispatcher route and is acked only after prior tasks
// in that route are fully drained by writer.
d.msgCh.Push(newDrainTask(event.GetDispatcherID(), event.GetCommitTs(), doneCh))

select {
case err := <-doneCh:
return err
case <-d.ctx.Done():
return errors.Trace(d.ctx.Err())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

PassBlockEvent may block indefinitely if the pipeline exits before processing the drain task.

PassBlockEvent pushes a drain task into msgCh and waits on doneCh. If the encoding/dispatch pipeline has already exited (e.g., due to an error in Run()), the drain task will never be consumed and doneCh will never receive. The method then blocks until d.ctx is cancelled, which could be arbitrarily long if d.ctx is a long-lived parent context rather than the Run()-scoped context.

Consider either:

  1. Using the Run()-scoped context (or a derived cancel) as the fallback instead of d.ctx.
  2. Closing doneCh with an error when the pipeline shuts down (e.g., draining msgCh on exit).
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/dml_writers.go` around lines 179 - 202,
PassBlockEvent pushes a drain task into d.msgCh and then blocks waiting on
doneCh, which can hang indefinitely if the encoding/dispatch pipeline has
already exited; fix by making the wait use the pipeline's Run()-scoped context
(or a derived cancel) instead of the long-lived d.ctx, or ensure the pipeline
closes/returns an error into doneCh when it shuts down (e.g., detect shutdown in
the dispatcher/Run() and send an error on doneCh or close it). Update
PassBlockEvent to select on doneCh, the Run()-scoped context (or a context
produced/owned by Run()), and only fall back to d.ctx as a last resort; also
modify the dispatcher/Run() path that consumes d.msgCh to inject an error into
the corresponding doneCh when it exits so blocked callers of PassBlockEvent are
unblocked.

Comment on lines +52 to +59
func deserializeMessages(data []byte) ([]*codeccommon.Message, error) {
reader := bytes.NewReader(data)
var count uint32
if err := binary.Read(reader, binary.LittleEndian, &count); err != nil {
return nil, errors.Trace(err)
}

result := make([]*codeccommon.Message, 0, count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unbounded pre-allocation from untrusted count value can cause OOM on corrupted spool data.

If spool data on disk is corrupted (bit-flip, truncation, etc.), count could be an arbitrary uint32 value up to ~4 billion. The make([]*codeccommon.Message, 0, count) at Line 59 would attempt to allocate up to ~32 GB of pointer space, crashing the process before the loop even starts.

Add a sanity check on count (e.g., against len(data) or a reasonable upper bound) before allocating.

🛡️ Proposed fix
 func deserializeMessages(data []byte) ([]*codeccommon.Message, error) {
 	reader := bytes.NewReader(data)
 	var count uint32
 	if err := binary.Read(reader, binary.LittleEndian, &count); err != nil {
 		return nil, errors.Trace(err)
 	}
+	// Each message has at least 12 bytes of header (keyLen + valueLen + rows).
+	// Reject obviously invalid counts to avoid OOM on corrupted data.
+	maxPossible := (uint32(len(data)) - 4) / 12
+	if count > maxPossible {
+		return nil, errors.Errorf("message count %d exceeds maximum possible %d for data size %d", count, maxPossible, len(data))
+	}
 
 	result := make([]*codeccommon.Message, 0, count)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func deserializeMessages(data []byte) ([]*codeccommon.Message, error) {
reader := bytes.NewReader(data)
var count uint32
if err := binary.Read(reader, binary.LittleEndian, &count); err != nil {
return nil, errors.Trace(err)
}
result := make([]*codeccommon.Message, 0, count)
func deserializeMessages(data []byte) ([]*codeccommon.Message, error) {
reader := bytes.NewReader(data)
var count uint32
if err := binary.Read(reader, binary.LittleEndian, &count); err != nil {
return nil, errors.Trace(err)
}
// Each message has at least 12 bytes of header (keyLen + valueLen + rows).
// Reject obviously invalid counts to avoid OOM on corrupted data.
maxPossible := (uint32(len(data)) - 4) / 12
if count > maxPossible {
return nil, errors.Errorf("message count %d exceeds maximum possible %d for data size %d", count, maxPossible, len(data))
}
result := make([]*codeccommon.Message, 0, count)
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/codec.go` around lines 52 - 59, In
deserializeMessages validate the uint32 count read from binary before calling
make(..., count): check that count is reasonable (e.g., not larger than
len(data) and below a configured/constant upper bound or <=
uint32(len(data)/minMessageSize)), and if it exceeds the sanity limit return an
error instead of attempting allocation; update the allocation to use the
validated/capped count and document the chosen bound; reference the
deserializeMessages function and the local variable count when making this
change.

Comment on lines +174 to +179
if err := os.RemoveAll(rootDir); err != nil {
return nil, errors.Trace(err)
}
if err := os.MkdirAll(rootDir, 0o755); err != nil {
return nil, errors.Trace(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

os.RemoveAll(rootDir) on startup could be destructive if path computation is wrong.

If any component of the changefeed ID (keyspace, name, ID) is empty or unexpected, the constructed rootDir could point higher in the directory tree than intended. RemoveAll on such a path would silently delete unrelated data. Consider adding a sanity check that rootDir contains the expected defaultDirectoryName component before removing, or at least validating that the changefeed ID components are non-empty.

🛡️ Suggested defensive check
+	if changefeedID.ID().String() == "" || changefeedID.Name() == "" {
+		return nil, errors.Errorf("invalid changefeed ID for spool directory")
+	}
+
 	if err := os.RemoveAll(rootDir); err != nil {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err := os.RemoveAll(rootDir); err != nil {
return nil, errors.Trace(err)
}
if err := os.MkdirAll(rootDir, 0o755); err != nil {
return nil, errors.Trace(err)
}
if changefeedID.ID().String() == "" || changefeedID.Name() == "" {
return nil, errors.Errorf("invalid changefeed ID for spool directory")
}
if err := os.RemoveAll(rootDir); err != nil {
return nil, errors.Trace(err)
}
if err := os.MkdirAll(rootDir, 0o755); err != nil {
return nil, errors.Trace(err)
}
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/manager.go` around lines 174 - 179,
The startup code calling os.RemoveAll(rootDir) in manager.go is unsafe if the
changefeed ID components (keyspace, name, ID) are empty or malformed; update the
initialization in the spool manager (the code that builds rootDir before calling
os.RemoveAll and os.MkdirAll) to validate inputs and the computed path: ensure
keyspace, name and ID are non-empty, run filepath.Clean on rootDir, and verify
that the cleaned rootDir contains the expected defaultDirectoryName segment (or
matches an allowed base prefix) before allowing os.RemoveAll; if validation
fails, return an error instead of deleting.

Comment on lines +310 to +342
// Load fetches messages from memory or spilled segments.
func (s *Manager) Load(entry *Entry) ([]*codeccommon.Message, []func(), error) {
if entry == nil {
return nil, nil, nil
}
if entry.memoryMsgs != nil {
return entry.memoryMsgs, entry.callbacks, nil
}
if entry.pointer == nil {
return nil, entry.callbacks, nil
}

s.mu.Lock()
spoolSegment := s.segments[entry.pointer.segmentID]
if spoolSegment == nil {
s.mu.Unlock()
return nil, nil, errors.Errorf("spool segment %d not found", entry.pointer.segmentID)
}
file := spoolSegment.file
offset := entry.pointer.offset
length := entry.pointer.length
s.mu.Unlock()

buf := make([]byte, length)
if _, err := file.ReadAt(buf, offset); err != nil {
return nil, nil, errors.Trace(err)
}
msgs, err := deserializeMessages(buf)
if err != nil {
return nil, nil, errors.Trace(err)
}
return msgs, entry.callbacks, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race between Close() and Load(): file can be closed under a concurrent reader.

Load() acquires the lock to look up the segment and get the file handle (lines 322–331), then releases the lock and performs file.ReadAt() (line 334). If Close() runs between the unlock and the ReadAt, it will close the file handle under the reader, causing a "bad file descriptor" error.

Consider either:

  • Checking s.closed before the unlock and returning early, and deferring the unlock to after the ReadAt.
  • Or using a sync.RWMutex and holding the read lock across the I/O.
🔒 Sketch: hold lock across ReadAt
 	s.mu.Lock()
+	if s.closed {
+		s.mu.Unlock()
+		return nil, nil, errors.New("spool is closed")
+	}
 	spoolSegment := s.segments[entry.pointer.segmentID]
 	if spoolSegment == nil {
 		s.mu.Unlock()
 		return nil, nil, errors.Errorf("spool segment %d not found", entry.pointer.segmentID)
 	}
 	file := spoolSegment.file
 	offset := entry.pointer.offset
 	length := entry.pointer.length
 	s.mu.Unlock()

Even with this check, there's still a window between the s.closed check and the ReadAt. A more robust fix would be to hold the lock (or an RWLock) across the I/O, or to use reference counting on the file handle itself.

🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/manager.go` around lines 310 - 342,
Load has a race where Manager.Close can close spoolSegment.file after s.mu is
released but before file.ReadAt, causing "bad file descriptor"; fix by holding
the appropriate lock across the I/O or switching to an RW lock: either acquire
the mutex (s.mu) and defer its Unlock until after the ReadAt so the file handle
cannot be closed during the read, or change s.mu to a sync.RWMutex and take
RLock in Manager.Load (and have Close take Lock) so readers hold the lock for
the duration of file.ReadAt; ensure checks of s.closed and lookup of
s.segments/entry.pointer happen under that same lock and reference
spoolSegment.file only while the lock is held (or use reference counting on the
file handle if preferred).

Comment on lines +349 to +351

callbacksToRun := make([]func(), 0, len(s.pendingWake))
s.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Data race: len(s.pendingWake) read without holding s.mu.

Line 350 reads s.pendingWake (to compute slice capacity) before acquiring s.mu at line 351. While the functional impact is negligible (just a capacity hint), this is technically a data race under Go's memory model and will be flagged by the race detector.

🐛 Fix: move allocation after lock
-	callbacksToRun := make([]func(), 0, len(s.pendingWake))
 	s.mu.Lock()
+	callbacksToRun := make([]func(), 0, len(s.pendingWake))
 	if s.closed {
 		s.mu.Unlock()
 		return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
callbacksToRun := make([]func(), 0, len(s.pendingWake))
s.mu.Lock()
s.mu.Lock()
callbacksToRun := make([]func(), 0, len(s.pendingWake))
if s.closed {
s.mu.Unlock()
return
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/spool/manager.go` around lines 349 - 351,
The allocation of callbacksToRun uses len(s.pendingWake) before acquiring the
mutex, causing a data race; move the make([]func(), 0, len(s.pendingWake)) so it
occurs after s.mu.Lock() (or compute the length into a local variable while
holding the lock) to ensure len(s.pendingWake) is read only while s.mu is held;
update the code around callbacksToRun, s.pendingWake and s.mu accordingly so the
capacity hint is computed under the lock.

Comment on lines +73 to +79
CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_flush_file_size_bytes",
Help: "Flushed payload bytes per task by reason",
Buckets: prometheus.ExponentialBuckets(1024, 2.0, 16),
}, []string{"namespace", "changefeed", "reason"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Histogram buckets for CloudStorageFlushFileSizeHistogram don't cover the default file-size target (64 MiB).

ExponentialBuckets(1024, 2.0, 16) produces buckets from 1 KiB to 32 MiB. Since the default file-size is 64 MiB, any file flushed near (or above) the target size will fall into the +Inf bucket, defeating the purpose of this metric — which is to observe how close files get to file-size.

Increase to 17 or 18 buckets, or use a larger start value, so the histogram covers at least 128 MiB.

🔧 Proposed fix
 	CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
 		Namespace: namespace,
 		Subsystem: subsystem,
 		Name:      "cloud_storage_flush_file_size_bytes",
 		Help:      "Flushed payload bytes per task by reason",
-		Buckets:   prometheus.ExponentialBuckets(1024, 2.0, 16),
+		Buckets:   prometheus.ExponentialBuckets(1024, 2.0, 20),
 	}, []string{"namespace", "changefeed", "reason"})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_flush_file_size_bytes",
Help: "Flushed payload bytes per task by reason",
Buckets: prometheus.ExponentialBuckets(1024, 2.0, 16),
}, []string{"namespace", "changefeed", "reason"})
CloudStorageFlushFileSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_flush_file_size_bytes",
Help: "Flushed payload bytes per task by reason",
Buckets: prometheus.ExponentialBuckets(1024, 2.0, 20),
}, []string{"namespace", "changefeed", "reason"})
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/metrics/cloudstorage.go` around lines 73 - 79, The
histogram CloudStorageFlushFileSizeHistogram currently uses
prometheus.ExponentialBuckets(1024, 2.0, 16) which only covers up to 32 MiB and
will put ~64 MiB files into +Inf; adjust the bucket configuration so it covers
at least 128 MiB (e.g., increase the count to 17 or 18, or raise the start
value) by updating the call to prometheus.ExponentialBuckets in the
CloudStorageFlushFileSizeHistogram definition to use a larger bucket count
(17/18) or a larger initial size so the histogram spans beyond the default 64
MiB file-size target.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 14, 2026

@3AceShowHand: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review efcf279 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 14, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/sink/cloudstorage/writer.go (1)

249-276: ⚠️ Potential issue | 🟡 Minor

Spool entries silently skipped when d.spool == nil.

Lines 256-258 skip spool entries without invoking their callbacks when d.spool is nil. If a task somehow contains spool entries while the manager is nil, callbacks for those entries (which drive checkpoint progress) would be permanently lost.

If d.spool == nil is truly impossible when entries are present, an assertion or panic would be more appropriate than a silent continue. If it is possible, the callbacks from those entries still need to be invoked.

🤖 Fix all issues with AI agents
In `@cmd/storage-consumer/discovery.go`:
- Around line 107-141: Rename the local variable dmlkey to lowerCamelCase dmlKey
everywhere in parseDMLFilePath (including its declaration, the
ParseIndexFilePath call result assignment, and usages as the map key in
c.tableDMLIdxMap) to follow Go naming conventions; also change the unwrapped
error return after FetchIndexFromFileName to return errors.Trace(err) so it
matches the other error wrapping in this function (affecting the
FetchIndexFromFileName error branch that currently does "return err").

In `@cmd/storage-consumer/path_key.go`:
- Around line 271-307: The IsSchemaFile and FetchIndexFromFileName
implementations in the consumer have diverged from the pkg/sink/cloudstorage
version and use plain fmt.Errorf instead of the wrapped error type; remove the
duplicated implementations from the consumer and import the single canonical
implementation from pkg/sink/cloudstorage (export it if necessary), updating
callers to use cloudstorage.IsSchemaFile and
cloudstorage.FetchIndexFromFileName; ensure the canonical FetchIndexFromFileName
wraps errors using errors.WrapError(errors.ErrStorageSinkInvalidFileName, ...)
to match the existing pkg behavior and keep signature and semantics the same.

In `@downstreamadapter/sink/cloudstorage/task.go`:
- Around line 9-12: The Apache 2.0 license header at the top of the file is
missing the standard "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied." line; update the header comment block immediately above the
package declaration (top-of-file comment in
downstreamadapter/sink/cloudstorage/task.go) by inserting that missing line
between the existing `"AS IS" BASIS,` line and the `See the License for the
specific language governing permissions and` line so the full standard Apache
2.0 boilerplate appears.

In `@pkg/sink/cloudstorage/path.go`:
- Line 539: The call to strconv.ParseUint(indexPart, 10, 64) returns a raw error
but needs to be wrapped; change the single return to parse into v, err :=
strconv.ParseUint(indexPart, 10, 64) and on err return a wrapped error using
errors.WrapError(err, "parse indexPart") (or errors.Trace(err)), otherwise
return v, nil; reference symbols: indexPart, strconv.ParseUint,
errors.WrapError.
🧹 Nitpick comments (5)
docs/plans/2026-02-04-cloudstorage-sink-ga-task-breakdown.md (1)

3-7: Optional: Minor markdown formatting.

Static analysis detected a blank line inside the blockquote (line 4 is blank within the > block), which triggers markdownlint rule MD028. This doesn't affect readability but could be cleaned up for strict markdown compliance.

📝 Optional formatting fix
 > 文档定位(已整合)
+>
 > - 类型:任务清单与执行进度(唯一打勾清单)
 > - 总览文档:`storage-sink.md`
 > - 说明:任务状态仅在本文维护;`storage-sink.md` 负责总体架构与语义说明。

Alternatively, remove the blank line entirely if it serves no purpose:

 > 文档定位(已整合)
-> 
 > - 类型:任务清单与执行进度(唯一打勾清单)
cmd/storage-consumer/discovery.go (1)

79-101: Silent error swallowing during storage walk may mask data loss.

Both parseSchemaFilePath and parseDMLFilePath errors are logged but silently skipped (lines 83-85, 89-92). While this is intentional for resilience, a corrupted or partially-written file will be permanently invisible to the consumer — it won't be retried on subsequent walks either, since getNewFiles doesn't distinguish "failed to parse" from "not yet present."

Consider whether a metric counter for skipped/failed files would help operators detect persistent parse failures in production.

cmd/storage-consumer/path_key.go (2)

71-79: Missing doc comments on exported types.

FileIndexKey and FileIndex are exported types but lack doc comments. Go convention and golint expect doc comments on all exported declarations.

Proposed fix
+// FileIndexKey identifies a file-index partition within a DML path, optionally scoped to a dispatcher.
 type FileIndexKey struct {
 	DispatcherID           string
 	EnableTableAcrossNodes bool
 }
 
+// FileIndex combines a FileIndexKey with a numeric file index.
 type FileIndex struct {
 	FileIndexKey
 	Idx uint64
 }

88-107: GenerateDMLFilePath unconditionally appends d.Table even when it could be empty.

If DmlPathKey.Table were empty, this would produce a path with a double-slash (e.g., schema1//123456/...). While current callers likely always set Table, there's no guard here, making this fragile.

downstreamadapter/sink/cloudstorage/writer.go (1)

411-419: Interval-triggered flush silently dropped when toBeFlushedCh is full.

The default branch at line 418 means interval flushes are non-blocking — if the 64-slot toBeFlushedCh is full, the flush is silently skipped and batchedTask continues accumulating. In sustained-throughput scenarios where the flush goroutine can't keep up, this could lead to unbounded batch growth in memory until the next successful send.

This is acceptable if the batch is expected to eventually be flushed by a size trigger or DDL marker (both of which block). However, consider logging or incrementing a metric when an interval flush is dropped so operators can detect sustained back-pressure.

Comment on lines +107 to +141
func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
var dmlkey DmlPathKey
dispatcherID, err := dmlkey.ParseIndexFilePath(
putil.GetOrZero(c.replicationCfg.Sink.DateSeparator),
path,
)
if err != nil {
return errors.Trace(err)
}
data, err := c.externalStorage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
fileName := strings.TrimSuffix(string(data), "\n")
fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension)
if err != nil {
return err
}
fileIndex := &FileIndex{
FileIndexKey: FileIndexKey{
DispatcherID: dispatcherID,
EnableTableAcrossNodes: dispatcherID != "",
},
Idx: fileIdx,
}

m, ok := c.tableDMLIdxMap[dmlkey]
if !ok {
c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{
fileIndex.FileIndexKey: fileIndex.Idx,
}
} else if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx
}
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent error wrapping and variable naming.

Two issues:

  1. Line 108: dmlkey should be dmlKey (lowerCamelCase) to match Go naming conventions and the rest of this file (e.g., schemaKey on line 145).

  2. Line 123: return err is not wrapped with errors.Trace(err), unlike the other error returns in this function (lines 114, 118). This is inconsistent.

Proposed fix
 func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
-	var dmlkey DmlPathKey
-	dispatcherID, err := dmlkey.ParseIndexFilePath(
+	var dmlKey DmlPathKey
+	dispatcherID, err := dmlKey.ParseIndexFilePath(
 		putil.GetOrZero(c.replicationCfg.Sink.DateSeparator),
 		path,
 	)
 	if err != nil {
 		return errors.Trace(err)
 	}
 	data, err := c.externalStorage.ReadFile(ctx, path)
 	if err != nil {
 		return errors.Trace(err)
 	}
 	fileName := strings.TrimSuffix(string(data), "\n")
 	fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	fileIndex := &FileIndex{
 		FileIndexKey: FileIndexKey{
 			DispatcherID:           dispatcherID,
 			EnableTableAcrossNodes: dispatcherID != "",
 		},
 		Idx: fileIdx,
 	}
 
-	m, ok := c.tableDMLIdxMap[dmlkey]
+	m, ok := c.tableDMLIdxMap[dmlKey]
 	if !ok {
-		c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{
+		c.tableDMLIdxMap[dmlKey] = fileIndexKeyMap{
 			fileIndex.FileIndexKey: fileIndex.Idx,
 		}
 	} else if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
-		c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx
+		c.tableDMLIdxMap[dmlKey][fileIndex.FileIndexKey] = fileIndex.Idx
 	}
 	return nil
 }

As per coding guidelines: "Use lowerCamelCase for variable names in Go" and "wrap it immediately with errors.Trace(err)".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
var dmlkey DmlPathKey
dispatcherID, err := dmlkey.ParseIndexFilePath(
putil.GetOrZero(c.replicationCfg.Sink.DateSeparator),
path,
)
if err != nil {
return errors.Trace(err)
}
data, err := c.externalStorage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
fileName := strings.TrimSuffix(string(data), "\n")
fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension)
if err != nil {
return err
}
fileIndex := &FileIndex{
FileIndexKey: FileIndexKey{
DispatcherID: dispatcherID,
EnableTableAcrossNodes: dispatcherID != "",
},
Idx: fileIdx,
}
m, ok := c.tableDMLIdxMap[dmlkey]
if !ok {
c.tableDMLIdxMap[dmlkey] = fileIndexKeyMap{
fileIndex.FileIndexKey: fileIndex.Idx,
}
} else if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
c.tableDMLIdxMap[dmlkey][fileIndex.FileIndexKey] = fileIndex.Idx
}
return nil
func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
var dmlKey DmlPathKey
dispatcherID, err := dmlKey.ParseIndexFilePath(
putil.GetOrZero(c.replicationCfg.Sink.DateSeparator),
path,
)
if err != nil {
return errors.Trace(err)
}
data, err := c.externalStorage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
fileName := strings.TrimSuffix(string(data), "\n")
fileIdx, err := FetchIndexFromFileName(fileName, c.fileExtension)
if err != nil {
return errors.Trace(err)
}
fileIndex := &FileIndex{
FileIndexKey: FileIndexKey{
DispatcherID: dispatcherID,
EnableTableAcrossNodes: dispatcherID != "",
},
Idx: fileIdx,
}
m, ok := c.tableDMLIdxMap[dmlKey]
if !ok {
c.tableDMLIdxMap[dmlKey] = fileIndexKeyMap{
fileIndex.FileIndexKey: fileIndex.Idx,
}
} else if fileIndex.Idx >= m[fileIndex.FileIndexKey] {
c.tableDMLIdxMap[dmlKey][fileIndex.FileIndexKey] = fileIndex.Idx
}
return nil
}
🤖 Prompt for AI Agents
In `@cmd/storage-consumer/discovery.go` around lines 107 - 141, Rename the local
variable dmlkey to lowerCamelCase dmlKey everywhere in parseDMLFilePath
(including its declaration, the ParseIndexFilePath call result assignment, and
usages as the map key in c.tableDMLIdxMap) to follow Go naming conventions; also
change the unwrapped error return after FetchIndexFromFileName to return
errors.Trace(err) so it matches the other error wrapping in this function
(affecting the FetchIndexFromFileName error branch that currently does "return
err").

Comment on lines +271 to +307
// IsSchemaFile checks whether the file is a schema file.
func IsSchemaFile(filePath string) bool {
return schemaRE.MatchString(filePath)
}

func generateDataFileName(enableTableAcrossNodes bool, dispatcherID string, index uint64, extension string, fileIndexWidth int) string {
indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d"
if enableTableAcrossNodes {
return fmt.Sprintf("CDC_%s_"+indexFmt+"%s", dispatcherID, index, extension)
}
return fmt.Sprintf("CDC"+indexFmt+"%s", index, extension)
}

func FetchIndexFromFileName(fileName string, extension string) (uint64, error) {
if len(fileName) < minFileNamePrefixLen+len(extension) ||
!strings.HasPrefix(fileName, "CDC") ||
!strings.HasSuffix(fileName, extension) {
return 0, fmt.Errorf("filename in storage sink is invalid: %s", fileName)
}

fileName = strings.TrimSuffix(fileName, extension)
indexPart := strings.TrimPrefix(fileName, "CDC")

if strings.HasPrefix(indexPart, "_") {
trimmed := strings.TrimPrefix(indexPart, "_")
separatorIndex := strings.LastIndex(trimmed, "_")
if separatorIndex <= 0 || separatorIndex >= len(trimmed)-1 {
return 0, fmt.Errorf("filename in storage sink is invalid: %s", fileName)
}
indexPart = trimmed[separatorIndex+1:]
}

if len(indexPart) < config.MinFileIndexWidth || !isNumberString(indexPart) {
return 0, fmt.Errorf("filename in storage sink is invalid: %s", fileName)
}

return strconv.ParseUint(indexPart, 10, 64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, locate the files mentioned in the review
find . -type f -name "path_key.go" -o -name "path.go" | grep -E "(cmd/storage-consumer|pkg/sink/cloudstorage)"

Repository: pingcap/ticdc

Length of output: 123


🏁 Script executed:

# Get the implementations of FetchIndexFromFileName from both files
echo "=== cmd/storage-consumer/path_key.go: FetchIndexFromFileName ==="
rg -A 30 'func FetchIndexFromFileName' cmd/storage-consumer/path_key.go

echo ""
echo "=== pkg/sink/cloudstorage/path.go: FetchIndexFromFileName ==="
rg -A 30 'func FetchIndexFromFileName' pkg/sink/cloudstorage/path.go

Repository: pingcap/ticdc

Length of output: 2262


🏁 Script executed:

# Get the implementations of IsSchemaFile from both files
echo "=== cmd/storage-consumer/path_key.go: IsSchemaFile ==="
rg -B 2 -A 2 'func IsSchemaFile' cmd/storage-consumer/path_key.go

echo ""
echo "=== pkg/sink/cloudstorage/path.go: IsSchemaFile ==="
rg -B 2 -A 2 'func IsSchemaFile' pkg/sink/cloudstorage/path.go

Repository: pingcap/ticdc

Length of output: 440


🏁 Script executed:

# Search for callers of these functions to understand usage
echo "=== Callers of FetchIndexFromFileName ==="
rg 'FetchIndexFromFileName' --type=go -B 2 -A 2 | head -50

echo ""
echo "=== Callers of IsSchemaFile ==="
rg 'IsSchemaFile' --type=go -B 2 -A 2 | head -50

Repository: pingcap/ticdc

Length of output: 4713


FetchIndexFromFileName and IsSchemaFile are duplicated between consumer and sink packages with divergent error handling.

Both functions exist in cmd/storage-consumer/path_key.go and pkg/sink/cloudstorage/path.go with identical core logic. However, the pkg version properly wraps errors with errors.WrapError(errors.ErrStorageSinkInvalidFileName, ...), while the consumer version uses plain fmt.Errorf, violating the error wrapping guideline. This divergence indicates they are already drifting apart.

Consolidate to a single implementation in pkg/sink/cloudstorage and import from the consumer to ensure consistent error handling across both modules.

🤖 Prompt for AI Agents
In `@cmd/storage-consumer/path_key.go` around lines 271 - 307, The IsSchemaFile
and FetchIndexFromFileName implementations in the consumer have diverged from
the pkg/sink/cloudstorage version and use plain fmt.Errorf instead of the
wrapped error type; remove the duplicated implementations from the consumer and
import the single canonical implementation from pkg/sink/cloudstorage (export it
if necessary), updating callers to use cloudstorage.IsSchemaFile and
cloudstorage.FetchIndexFromFileName; ensure the canonical FetchIndexFromFileName
wraps errors using errors.WrapError(errors.ErrStorageSinkInvalidFileName, ...)
to match the existing pkg behavior and keep signature and semantics the same.

Comment on lines +9 to +12
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incomplete Apache 2.0 license header.

The standard WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. line is missing between the "AS IS" BASIS, line and the See the License... line. This is a common copy-paste issue with the boilerplate.

Proposed fix
 // distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
🤖 Prompt for AI Agents
In `@downstreamadapter/sink/cloudstorage/task.go` around lines 9 - 12, The Apache
2.0 license header at the top of the file is missing the standard "WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." line; update
the header comment block immediately above the package declaration (top-of-file
comment in downstreamadapter/sink/cloudstorage/task.go) by inserting that
missing line between the existing `"AS IS" BASIS,` line and the `See the License
for the specific language governing permissions and` line so the full standard
Apache 2.0 boilerplate appears.

fmt.Errorf("'%s' is a invalid file name", fileName))
}

return strconv.ParseUint(indexPart, 10, 64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unwrapped error from strconv.ParseUint.

The raw error from strconv.ParseUint is returned without wrapping. Other error paths in this function use errors.WrapError, but this one is missed.

Proposed fix
-	return strconv.ParseUint(indexPart, 10, 64)
+	idx, err := strconv.ParseUint(indexPart, 10, 64)
+	if err != nil {
+		return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName,
+			fmt.Errorf("'%s' is a invalid file name: %w", indexPart, err))
+	}
+	return idx, nil

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return strconv.ParseUint(indexPart, 10, 64)
idx, err := strconv.ParseUint(indexPart, 10, 64)
if err != nil {
return 0, errors.WrapError(errors.ErrStorageSinkInvalidFileName,
fmt.Errorf("'%s' is a invalid file name: %w", indexPart, err))
}
return idx, nil
🤖 Prompt for AI Agents
In `@pkg/sink/cloudstorage/path.go` at line 539, The call to
strconv.ParseUint(indexPart, 10, 64) returns a raw error but needs to be
wrapped; change the single return to parse into v, err :=
strconv.ParseUint(indexPart, 10, 64) and on err return a wrapped error using
errors.WrapError(err, "parse indexPart") (or errors.Trace(err)), otherwise
return v, nil; reference symbols: indexPart, strconv.ParseUint,
errors.WrapError.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant