From 02bdcbcb51e94c41903b810fee9beafd617d068e Mon Sep 17 00:00:00 2001 From: Kevin Cao <39608887+kev-cao@users.noreply.github.com> Date: Fri, 10 Oct 2025 16:14:47 -0400 Subject: [PATCH] logical: add initial/catchup scan metrics to offline scan This patch teaches the LDR offline scan processor to emit checkpoint range stats and update the `logical_replication.scanning_ranges` and `logical_replication.catchup_ranges` metrics. Informs: #152273 Release note: LDR now updates the `logical_replication.scanning_ranges` and `logical_replication.catchup_ranges` metrics during fast initial scan. --- .../logical/logical_replication_job.go | 9 ++- .../logical/offline_initial_scan_processor.go | 59 +++++++++++++++---- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/pkg/crosscluster/logical/logical_replication_job.go b/pkg/crosscluster/logical/logical_replication_job.go index ff979932870b..831613f8938f 100644 --- a/pkg/crosscluster/logical/logical_replication_job.go +++ b/pkg/crosscluster/logical/logical_replication_job.go @@ -456,9 +456,11 @@ type logicalReplicationPlanner struct { } type logicalReplicationPlanInfo struct { - sourceSpans []roachpb.Span - partitionPgUrls []string - destTableBySrcID map[descpb.ID]dstTableMetadata + sourceSpans []roachpb.Span + partitionPgUrls []string + destTableBySrcID map[descpb.ID]dstTableMetadata + // Number of processors writing data on the destination cluster (offline or + // otherwise). writeProcessorCount int } @@ -738,6 +740,7 @@ func (p *logicalReplicationPlanner) planOfflineInitialScan( SQLInstanceID: instanceID, Core: execinfrapb.ProcessorCoreUnion{LogicalReplicationOfflineScan: &spec}, }) + info.writeProcessorCount++ } } diff --git a/pkg/crosscluster/logical/offline_initial_scan_processor.go b/pkg/crosscluster/logical/offline_initial_scan_processor.go index 713d8ea8d095..0a53262c137f 100644 --- a/pkg/crosscluster/logical/offline_initial_scan_processor.go +++ b/pkg/crosscluster/logical/offline_initial_scan_processor.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/backup" "github.com/cockroachdb/cockroach/pkg/crosscluster" + "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/bulk" @@ -65,6 +66,8 @@ type offlineInitialScanProcessor struct { checkpointCh chan offlineCheckpoint + rangeStatsCh chan *streampb.StreamEvent_RangeStats + rekey *backup.KeyRewriter batcher *bulk.SSTBatcher @@ -104,6 +107,7 @@ func newNewOfflineInitialScanProcessor( processorID: processorID, stopCh: make(chan struct{}), checkpointCh: make(chan offlineCheckpoint), + rangeStatsCh: make(chan *streampb.StreamEvent_RangeStats), errCh: make(chan error, 1), rekey: rekeyer, lastKeyAdded: roachpb.Key{}, @@ -220,6 +224,7 @@ func (o *offlineInitialScanProcessor) Start(ctx context.Context) { }) o.workerGroup.GoCtx(func(ctx context.Context) error { defer close(o.checkpointCh) + defer close(o.rangeStatsCh) pprof.Do(ctx, pprof.Labels("proc", fmt.Sprintf("%d", o.ProcessorID)), func(ctx context.Context) { for event := range o.subscription.Events() { if err := o.handleEvent(ctx, event); err != nil { @@ -245,16 +250,8 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P case checkpoint, ok := <-o.checkpointCh: switch { case !ok: - select { - case err := <-o.errCh: - o.MoveToDrainingAndLogError(err) - return nil, o.DrainHelper() - case <-time.After(10 * time.Second): - logcrash.ReportOrPanic(o.Ctx(), &o.FlowCtx.Cfg.Settings.SV, - "event channel closed but no error found on err channel after 10 seconds") - o.MoveToDrainingAndLogError(nil /* error */) - return nil, o.DrainHelper() - } + o.MoveToDrainingAndLogError(o.waitForErr()) + return nil, o.DrainHelper() case checkpoint.afterInitialScanCompletion: // The previous checkpoint completed the initial scan and was already // ingested by the coordinator, so we can gracefully shut down the @@ -273,6 +270,18 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P } return row, nil } + case stats, ok := <-o.rangeStatsCh: + if !ok { + o.MoveToDrainingAndLogError(o.waitForErr()) + return nil, o.DrainHelper() + } + + meta, err := replicationutils.StreamRangeStatsToProgressMeta(o.FlowCtx, o.ProcessorID, stats) + if err != nil { + o.MoveToDrainingAndLogError(err) + return nil, o.DrainHelper() + } + return nil, meta case err := <-o.errCh: o.MoveToDrainingAndLogError(err) return nil, o.DrainHelper() @@ -345,7 +354,7 @@ func (o *offlineInitialScanProcessor) handleEvent( return err } case crosscluster.CheckpointEvent: - if err := o.checkpoint(ctx, event.GetCheckpoint().ResolvedSpans); err != nil { + if err := o.checkpoint(ctx, event.GetCheckpoint()); err != nil { return err } case crosscluster.SSTableEvent, crosscluster.DeleteRangeEvent: @@ -358,9 +367,26 @@ func (o *offlineInitialScanProcessor) handleEvent( return nil } +// waitForErr waits for an error to be sent on the error channel and returns the +// error if one is found within the timeout. +func (o *offlineInitialScanProcessor) waitForErr() error { + select { + case err := <-o.errCh: + return err + case <-time.After(10 * time.Second): + logcrash.ReportOrPanic(o.Ctx(), &o.FlowCtx.Cfg.Settings.SV, + "event channel closed but no error found on err channel after 10 seconds") + return nil + } +} + func (o *offlineInitialScanProcessor) checkpoint( - ctx context.Context, resolvedSpans []jobspb.ResolvedSpan, + ctx context.Context, checkpoint *streampb.StreamEvent_StreamCheckpoint, ) error { + if checkpoint == nil { + return errors.New("nil checkpoint event") + } + resolvedSpans := checkpoint.ResolvedSpans if resolvedSpans == nil { return errors.New("checkpoint event expected to have resolved spans") } @@ -406,6 +432,15 @@ func (o *offlineInitialScanProcessor) checkpoint( // shutdown the processor. o.initialScanCompleted = true } + + if checkpoint.RangeStats != nil { + select { + case o.rangeStatsCh <- checkpoint.RangeStats: + case <-o.stopCh: + case <-ctx.Done(): + return ctx.Err() + } + } return nil }