Skip to content

Commit b96f751

Browse files
committed
changefeedccl: add missing error return during change frontier startup
Before this change, when the change frontier encountered an error while restoring a changefeed's span-level checkpoint, it would ignore the error instead of moving to draining like change aggregators would. Release note: None
1 parent ecbfd04 commit b96f751

File tree

2 files changed

+27
-61
lines changed

2 files changed

+27
-61
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 25 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
320320

321321
spans, err := ca.setupSpansAndFrontier()
322322
if err != nil {
323-
if log.V(2) {
324-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err)
325-
}
323+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error setting up spans and frontier: %v", err)
326324
ca.MoveToDraining(err)
327325
ca.cancel()
328326
return
@@ -348,9 +346,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
348346
scope, _ := opts.GetMetricScope()
349347
ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope)
350348
if err != nil {
351-
if log.V(2) {
352-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err)
353-
}
349+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sli metrics: %v", err)
354350
ca.MoveToDraining(err)
355351
ca.cancel()
356352
return
@@ -360,21 +356,16 @@ func (ca *changeAggregator) Start(ctx context.Context) {
360356
recorder := metricsRecorder(ca.sliMetrics)
361357
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder)
362358
if err != nil {
363-
if log.V(2) {
364-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
365-
}
359+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error wrapping metrics controller: %v", err)
366360
ca.MoveToDraining(err)
367361
ca.cancel()
368-
return
369362
}
370363

371364
ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle,
372365
ca.spec.User(), ca.spec.JobID, recorder)
373366
if err != nil {
374367
err = changefeedbase.MarkRetryableError(err)
375-
if log.V(2) {
376-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err)
377-
}
368+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sink: %v", err)
378369
ca.MoveToDraining(err)
379370
ca.cancel()
380371
return
@@ -406,9 +397,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
406397
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV)
407398
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
408399
if err != nil {
409-
if log.V(2) {
410-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err)
411-
}
400+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error starting kv feed: %v", err)
412401
ca.MoveToDraining(err)
413402
ca.cancel()
414403
return
@@ -418,9 +407,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
418407
ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
419408
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
420409
if err != nil {
421-
if log.V(2) {
422-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err)
423-
}
410+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error creating event consumer: %v", err)
424411
ca.MoveToDraining(err)
425412
ca.cancel()
426413
return
@@ -635,7 +622,7 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
635622
// Checkpointed spans are spans that were above the highwater mark, and we
636623
// must preserve that information in the frontier for future checkpointing.
637624
if err := checkpoint.Restore(ca.frontier, ca.spec.SpanLevelCheckpoint); err != nil {
638-
return nil, err
625+
return nil, errors.Wrapf(err, "failed to restore span-level checkpoint")
639626
}
640627

641628
return spans, nil
@@ -751,9 +738,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
751738
// NB: we do not invoke ca.cancel here -- just merely moving
752739
// to drain state so that the trailing metadata callback
753740
// has a chance to produce shutdown checkpoint.
754-
if log.V(2) {
755-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err)
756-
}
741+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error while checking for node drain: %v", err)
757742
ca.MoveToDraining(err)
758743
break
759744
}
@@ -775,9 +760,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
775760
}
776761
// Shut down the poller if it wasn't already.
777762
ca.cancel()
778-
if log.V(2) {
779-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err)
780-
}
763+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error from tick: %v", err)
781764
ca.MoveToDraining(err)
782765
break
783766
}
@@ -1315,9 +1298,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13151298
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
13161299
sli, err := cf.metrics.getSLIMetrics(scope)
13171300
if err != nil {
1318-
if log.V(2) {
1319-
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err)
1320-
}
1301+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sli metrics: %v", err)
13211302
cf.MoveToDraining(err)
13221303
return
13231304
}
@@ -1327,9 +1308,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13271308
cf.spec.User(), cf.spec.JobID, sli)
13281309
if err != nil {
13291310
err = changefeedbase.MarkRetryableError(err)
1330-
if log.V(2) {
1331-
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err)
1332-
}
1311+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sink: %v", err)
13331312
cf.MoveToDraining(err)
13341313
return
13351314
}
@@ -1342,9 +1321,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13421321

13431322
cf.highWaterAtStart = cf.spec.Feed.StatementTime
13441323
if cf.evalCtx.ChangefeedState == nil {
1345-
if log.V(2) {
1346-
log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state")
1347-
}
1324+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to missing changefeed state")
13481325
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
13491326
return
13501327
}
@@ -1356,9 +1333,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13561333
if cf.spec.JobID != 0 {
13571334
job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
13581335
if err != nil {
1359-
if log.V(2) {
1360-
log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err)
1361-
}
1336+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error loading claimed job: %v", err)
13621337
cf.MoveToDraining(err)
13631338
return
13641339
}
@@ -1403,15 +1378,16 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14031378
// Set up the resolved span frontier.
14041379
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(cf.spec.Feed.StatementTime, initialHighwater, cf.spec.TrackedSpans...)
14051380
if err != nil {
1406-
log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)
1381+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error setting up frontier: %v", err)
14071382
cf.MoveToDraining(err)
14081383
return
14091384
}
14101385

14111386
if err := checkpoint.Restore(cf.frontier, cf.spec.SpanLevelCheckpoint); err != nil {
1412-
if log.V(2) {
1413-
log.Infof(cf.Ctx(), "change frontier encountered error on checkpoint restore: %v", err)
1414-
}
1387+
log.Dev.Warningf(cf.Ctx(),
1388+
"moving to draining due to error restoring span-level checkpoint: %v", err)
1389+
cf.MoveToDraining(err)
1390+
return
14151391
}
14161392

14171393
if cf.knobs.AfterCoordinatorFrontierRestore != nil {
@@ -1560,39 +1536,31 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
15601536
}
15611537
}
15621538

1563-
if log.V(2) {
1564-
log.Infof(cf.Ctx(),
1565-
"change frontier moving to draining after reaching resolved span boundary (%s): %v",
1566-
boundaryType, err)
1567-
}
1539+
log.Dev.Warningf(cf.Ctx(),
1540+
"moving to draining after reaching resolved span boundary (%s): %v",
1541+
boundaryType, err)
15681542
cf.MoveToDraining(err)
15691543
break
15701544
}
15711545

15721546
row, meta := cf.input.Next()
15731547
if meta != nil {
15741548
if meta.Err != nil {
1575-
if log.V(2) {
1576-
log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err)
1577-
}
1549+
log.Dev.Warningf(cf.Ctx(), "moving to draining after getting error from aggregator: %v", meta.Err)
15781550
cf.MoveToDraining(nil /* err */)
15791551
}
15801552
if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil {
15811553
// Seeing changefeed drain info metadata from the aggregator means
15821554
// that the aggregator exited due to node shutdown. Transition to
15831555
// draining so that the remaining aggregators will shut down and
15841556
// transmit their up-to-date frontier.
1585-
if log.V(2) {
1586-
log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed)
1587-
}
1557+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to aggregator shutdown: %s", meta.Changefeed)
15881558
cf.MoveToDraining(changefeedbase.ErrNodeDraining)
15891559
}
15901560
return nil, meta
15911561
}
15921562
if row == nil {
1593-
if log.V(2) {
1594-
log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator")
1595-
}
1563+
log.Dev.Warningf(cf.Ctx(), "moving to draining after getting nil row from aggregator")
15961564
cf.MoveToDraining(nil /* err */)
15971565
break
15981566
}
@@ -1607,9 +1575,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
16071575
}
16081576

16091577
if err := cf.noteAggregatorProgress(row[0]); err != nil {
1610-
if log.V(2) {
1611-
log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err)
1612-
}
1578+
log.Dev.Warningf(cf.Ctx(), "moving to draining after error while processing aggregator progress: %v", err)
16131579
cf.MoveToDraining(err)
16141580
break
16151581
}

pkg/ccl/changefeedccl/checkpoint/checkpoint.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ type SpanForwarder interface {
7878
func Restore(sf SpanForwarder, checkpoint *jobspb.TimestampSpansMap) error {
7979
for ts, spans := range checkpoint.All() {
8080
if ts.IsEmpty() {
81-
return errors.New("checkpoint timestamp is empty")
81+
return errors.AssertionFailedf("checkpoint timestamp is empty")
8282
}
8383
for _, sp := range spans {
8484
if _, err := sf.Forward(sp, ts); err != nil {
85-
return err
85+
return errors.Wrapf(err, "forwarding span %v to %v", sp, ts)
8686
}
8787
}
8888
}

0 commit comments

Comments
 (0)