diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index 135be6a01..feb808777 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "net" "net/http" "os" @@ -809,20 +810,54 @@ func (p *PostgresKeeper) resync(db, followedDB *cluster.DB, tryPgrewind bool) er // postgresql version is > 9.5 since someone can also use an externally // installed pg_rewind for postgres 9.4. If a pg_rewind executable // doesn't exists pgm.SyncFromFollowedPGRewind will return an error and - // fallback to pg_basebackup + // fallback to pg_basebackup. If there is no pg_rewind timeout or + // interval set, we will only try pg_rewind once and fallback to + // pg_basebackup straight away. Otherwise we will keep retrying until + // pg_rewind succeeds or we reach the timeout (and then we will + // fallback to pg_basebackup). if tryPgrewind && p.usePgrewind(db) { - connParams := p.getSUConnParams(db, followedDB) - log.Infow("syncing using pg_rewind", "followedDB", followedDB.UID, "keeper", followedDB.Spec.KeeperUID) - // TODO: Make the forceCheckpoint parameter use cluster specification - if err := pgm.SyncFromFollowedPGRewind(connParams, p.pgSUPassword, true); err != nil { - // log pg_rewind error and fallback to pg_basebackup - log.Errorw("error syncing with pg_rewind", zap.Error(err)) - } else { - pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil)) + var err error + timeout := db.Spec.PgrewindTimeout.Duration + interval := db.Spec.PgrewindInterval.Duration + checkpoint := db.Spec.PgrewindCheckpoint + + pgrewind := func() error { + connParams := p.getSUConnParams(db, followedDB) + log.Infow("syncing using pg_rewind", "followedDB", followedDB.UID, "keeper", followedDB.Spec.KeeperUID) + if err = pgm.SyncFromFollowedPGRewind(connParams, p.pgSUPassword, checkpoint); err != nil { + log.Errorw("error syncing with pg_rewind", zap.Error(err)) + pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil)) + } + return err + } + + if err = pgrewind(); err == nil { return nil } + + // Retry pg_rewind at an interval with jitter until it succeeds or + // we timeout retrying + if timeout != 0 || interval != 0 { + timeoutAfter := time.After(timeout) + jitter := time.Duration(rand.Int63n(int64(interval)) / 4) + intervalTick := time.Tick(interval + jitter) + + Rewind: + for true { + select { + case <-timeoutAfter: + log.Errorw("timed out while trying to run pg_rewind") + break Rewind + case <-intervalTick: + if err = pgrewind(); err == nil { + return nil + } + } + } + } } + // Fallback to pg_basebackup as pg_rewind failed maj, min, err := p.pgm.BinaryVersion() if err != nil { // in case we fail to parse the binary version then log it and just don't use replSlot diff --git a/cmd/sentinel/cmd/sentinel.go b/cmd/sentinel/cmd/sentinel.go index d443ee464..87bbb6ca6 100644 --- a/cmd/sentinel/cmd/sentinel.go +++ b/cmd/sentinel/cmd/sentinel.go @@ -379,6 +379,9 @@ func (s *Sentinel) setDBSpecFromClusterSpec(cd *cluster.ClusterData) { db.Spec.RequestTimeout = *clusterSpec.RequestTimeout db.Spec.MaxStandbys = *clusterSpec.MaxStandbys db.Spec.UsePgrewind = *clusterSpec.UsePgrewind + db.Spec.PgrewindInterval = *clusterSpec.PgrewindInterval + db.Spec.PgrewindTimeout = *clusterSpec.PgrewindTimeout + db.Spec.PgrewindCheckpoint = *clusterSpec.PgrewindCheckpoint db.Spec.PGParameters = clusterSpec.PGParameters db.Spec.PGHBA = clusterSpec.PGHBA if db.Spec.FollowConfig != nil && db.Spec.FollowConfig.Type == cluster.FollowTypeExternal { diff --git a/doc/cluster_spec.md b/doc/cluster_spec.md index 0205f637a..a74aa6bbd 100644 --- a/doc/cluster_spec.md +++ b/doc/cluster_spec.md @@ -27,6 +27,9 @@ Some options in a running cluster specification can be changed to update the des | additionalWalSenders | number of additional wal_senders in addition to the ones internally defined by stolon, useful to provide enough wal senders for external standbys (changing this value requires an instance restart) | no | uint16 | 5 | | additionalMasterReplicationSlots | a list of additional physical replication slots to be created on the master postgres instance. They will be prefixed with `stolon_` (like internal replication slots used for standby replication) to make them "namespaced" from other replication slots. Replication slots starting with `stolon_` and not defined here (and not used for standby replication) will be dropped from the master instance. | no | []string | null | | usePgrewind | try to use pg_rewind for faster instance resyncronization. | no | bool | false | +| pgrewindInterval | interval to wait until next pg_rewind try | no | string (duration) | 0s | +| pgrewindTimeout | time after which we stop trying to pg_rewind | no | string (duration) | 0s | +| pgrewindCheckpoint | force checkpoint on primary before performing pg_rewind | no | bool | false | | initMode | The cluster initialization mode. Can be *new* or *existing*. *new* means that a new db cluster will be created on a random keeper and the other keepers will sync with it. *existing* means that a keeper (that needs to have an already created db cluster) will be choosed as the initial master and the other keepers will sync with it. In this case the `existingConfig` object needs to be populated. | yes | string | | | existingConfig | configuration for initMode of type "existing" | if initMode is "existing" | ExistingConfig | | | mergePgParameters | merge pgParameters of the initialized db cluster, useful the retain initdb generated parameters when InitMode is new, retain current parameters when initMode is existing or pitr. | no | bool | true | diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index deda5439b..ef98fef55 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -66,6 +66,9 @@ const ( DefaultMaxSynchronousStandbys uint16 = 1 DefaultAdditionalWalSenders = 5 DefaultUsePgrewind = false + DefaultPgrewindInterval = 0 * time.Second + DefaultPgrewindTimeout = 0 * time.Second + DefaultPgrewindCheckpoint = false DefaultMergePGParameter = true DefaultRole ClusterRole = ClusterRoleMaster DefaultSUReplAccess SUReplAccessMode = SUReplAccessAll @@ -261,6 +264,12 @@ type ClusterSpec struct { AdditionalMasterReplicationSlots []string `json:"additionalMasterReplicationSlots"` // Whether to use pg_rewind UsePgrewind *bool `json:"usePgrewind,omitempty"` + // Interval to wait until next pg_rewind try + PgrewindInterval *Duration `json:"pgrewindInterval,omitempty"` + // Time after which we stop trying to pg_rewind + PgrewindTimeout *Duration `json:"pgrewindTimeout,omitempty"` + // Checkpoint before performing pg_rewind + PgrewindCheckpoint *bool `json:"pgrewindCheckpoint,omitempty"` // InitMode defines the cluster initialization mode. Current modes are: new, existing, pitr InitMode *ClusterInitMode `json:"initMode,omitempty"` // Whether to merge pgParameters of the initialized db cluster, useful @@ -379,6 +388,15 @@ func (os *ClusterSpec) WithDefaults() *ClusterSpec { if s.UsePgrewind == nil { s.UsePgrewind = BoolP(DefaultUsePgrewind) } + if s.PgrewindInterval == nil { + s.PgrewindInterval = &Duration{Duration: DefaultPgrewindInterval} + } + if s.PgrewindTimeout == nil { + s.PgrewindTimeout = &Duration{Duration: DefaultPgrewindTimeout} + } + if s.PgrewindCheckpoint == nil { + s.PgrewindCheckpoint = BoolP(DefaultPgrewindCheckpoint) + } if s.MinSynchronousStandbys == nil { s.MinSynchronousStandbys = Uint16P(DefaultMinSynchronousStandbys) } @@ -607,6 +625,12 @@ type DBSpec struct { SynchronousReplication bool `json:"synchronousReplication,omitempty"` // Whether to use pg_rewind UsePgrewind bool `json:"usePgrewind,omitempty"` + // Interval to wait until next pg_rewind try + PgrewindInterval Duration `json:"pgrewindInterval,omitempty"` + // Time after which we stop trying to pg_rewind + PgrewindTimeout Duration `json:"pgrewindTimeout,omitempty"` + // Checkpoint before performing pg_rewind + PgrewindCheckpoint bool `json:"pgrewindCheckpoint,omitempty"` // AdditionalWalSenders defines the number of additional wal_senders in // addition to the ones internally defined by stolon AdditionalWalSenders uint16 `json:"additionalWalSenders"`