Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -809,20 +810,54 @@ func (p *PostgresKeeper) resync(db, followedDB *cluster.DB, tryPgrewind bool) er
// postgresql version is > 9.5 since someone can also use an externally
// installed pg_rewind for postgres 9.4. If a pg_rewind executable
// doesn't exists pgm.SyncFromFollowedPGRewind will return an error and
// fallback to pg_basebackup
// fallback to pg_basebackup. If there is no pg_rewind timeout or
// interval set, we will only try pg_rewind once and fallback to
// pg_basebackup straight away. Otherwise we will keep retrying until
// pg_rewind succeeds or we reach the timeout (and then we will
// fallback to pg_basebackup).
if tryPgrewind && p.usePgrewind(db) {
connParams := p.getSUConnParams(db, followedDB)
log.Infow("syncing using pg_rewind", "followedDB", followedDB.UID, "keeper", followedDB.Spec.KeeperUID)
// TODO: Make the forceCheckpoint parameter use cluster specification
if err := pgm.SyncFromFollowedPGRewind(connParams, p.pgSUPassword, true); err != nil {
// log pg_rewind error and fallback to pg_basebackup
log.Errorw("error syncing with pg_rewind", zap.Error(err))
} else {
pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil))
var err error
timeout := db.Spec.PgrewindTimeout.Duration
interval := db.Spec.PgrewindInterval.Duration
checkpoint := db.Spec.PgrewindCheckpoint

pgrewind := func() error {
connParams := p.getSUConnParams(db, followedDB)
log.Infow("syncing using pg_rewind", "followedDB", followedDB.UID, "keeper", followedDB.Spec.KeeperUID)
if err = pgm.SyncFromFollowedPGRewind(connParams, p.pgSUPassword, checkpoint); err != nil {
log.Errorw("error syncing with pg_rewind", zap.Error(err))
pgm.SetRecoveryParameters(p.createRecoveryParameters(true, standbySettings, nil, nil))
}
return err
}

if err = pgrewind(); err == nil {
return nil
}

// Retry pg_rewind at an interval with jitter until it succeeds or
// we timeout retrying
if timeout != 0 || interval != 0 {
timeoutAfter := time.After(timeout)
jitter := time.Duration(rand.Int63n(int64(interval)) / 4)
intervalTick := time.Tick(interval + jitter)

Rewind:
for true {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using for true instead of just for seems to be the convention in the codebase.

select {
case <-timeoutAfter:
log.Errorw("timed out while trying to run pg_rewind")
break Rewind
case <-intervalTick:
if err = pgrewind(); err == nil {
return nil
}
}
}
}
}

// Fallback to pg_basebackup as pg_rewind failed
maj, min, err := p.pgm.BinaryVersion()
if err != nil {
// in case we fail to parse the binary version then log it and just don't use replSlot
Expand Down
3 changes: 3 additions & 0 deletions cmd/sentinel/cmd/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ func (s *Sentinel) setDBSpecFromClusterSpec(cd *cluster.ClusterData) {
db.Spec.RequestTimeout = *clusterSpec.RequestTimeout
db.Spec.MaxStandbys = *clusterSpec.MaxStandbys
db.Spec.UsePgrewind = *clusterSpec.UsePgrewind
db.Spec.PgrewindInterval = *clusterSpec.PgrewindInterval
Copy link
Author

@dyson dyson Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case of these new config options is a little sad but just following the convention used elsewhere.

db.Spec.PgrewindTimeout = *clusterSpec.PgrewindTimeout
db.Spec.PgrewindCheckpoint = *clusterSpec.PgrewindCheckpoint
db.Spec.PGParameters = clusterSpec.PGParameters
db.Spec.PGHBA = clusterSpec.PGHBA
if db.Spec.FollowConfig != nil && db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {
Expand Down
3 changes: 3 additions & 0 deletions doc/cluster_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ Some options in a running cluster specification can be changed to update the des
| additionalWalSenders | number of additional wal_senders in addition to the ones internally defined by stolon, useful to provide enough wal senders for external standbys (changing this value requires an instance restart) | no | uint16 | 5 |
| additionalMasterReplicationSlots | a list of additional physical replication slots to be created on the master postgres instance. They will be prefixed with `stolon_` (like internal replication slots used for standby replication) to make them "namespaced" from other replication slots. Replication slots starting with `stolon_` and not defined here (and not used for standby replication) will be dropped from the master instance. | no | []string | null |
| usePgrewind | try to use pg_rewind for faster instance resyncronization. | no | bool | false |
| pgrewindInterval | interval to wait until next pg_rewind try | no | string (duration) | 0s |
| pgrewindTimeout | time after which we stop trying to pg_rewind | no | string (duration) | 0s |
| pgrewindCheckpoint | force checkpoint on primary before performing pg_rewind | no | bool | false |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is named differently to the PR to add this configuration parameter: sorintlab#644

I think maybe we should leave the removal of this hardcoding out of this PR?

| initMode | The cluster initialization mode. Can be *new* or *existing*. *new* means that a new db cluster will be created on a random keeper and the other keepers will sync with it. *existing* means that a keeper (that needs to have an already created db cluster) will be choosed as the initial master and the other keepers will sync with it. In this case the `existingConfig` object needs to be populated. | yes | string | |
| existingConfig | configuration for initMode of type "existing" | if initMode is "existing" | ExistingConfig | |
| mergePgParameters | merge pgParameters of the initialized db cluster, useful the retain initdb generated parameters when InitMode is new, retain current parameters when initMode is existing or pitr. | no | bool | true |
Expand Down
24 changes: 24 additions & 0 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ const (
DefaultMaxSynchronousStandbys uint16 = 1
DefaultAdditionalWalSenders = 5
DefaultUsePgrewind = false
DefaultPgrewindInterval = 0 * time.Second
DefaultPgrewindTimeout = 0 * time.Second
DefaultPgrewindCheckpoint = false
DefaultMergePGParameter = true
DefaultRole ClusterRole = ClusterRoleMaster
DefaultSUReplAccess SUReplAccessMode = SUReplAccessAll
Expand Down Expand Up @@ -261,6 +264,12 @@ type ClusterSpec struct {
AdditionalMasterReplicationSlots []string `json:"additionalMasterReplicationSlots"`
// Whether to use pg_rewind
UsePgrewind *bool `json:"usePgrewind,omitempty"`
// Interval to wait until next pg_rewind try
PgrewindInterval *Duration `json:"pgrewindInterval,omitempty"`
// Time after which we stop trying to pg_rewind
PgrewindTimeout *Duration `json:"pgrewindTimeout,omitempty"`
// Checkpoint before performing pg_rewind
PgrewindCheckpoint *bool `json:"pgrewindCheckpoint,omitempty"`
// InitMode defines the cluster initialization mode. Current modes are: new, existing, pitr
InitMode *ClusterInitMode `json:"initMode,omitempty"`
// Whether to merge pgParameters of the initialized db cluster, useful
Expand Down Expand Up @@ -379,6 +388,15 @@ func (os *ClusterSpec) WithDefaults() *ClusterSpec {
if s.UsePgrewind == nil {
s.UsePgrewind = BoolP(DefaultUsePgrewind)
}
if s.PgrewindInterval == nil {
s.PgrewindInterval = &Duration{Duration: DefaultPgrewindInterval}
}
if s.PgrewindTimeout == nil {
s.PgrewindTimeout = &Duration{Duration: DefaultPgrewindTimeout}
}
if s.PgrewindCheckpoint == nil {
s.PgrewindCheckpoint = BoolP(DefaultPgrewindCheckpoint)
}
if s.MinSynchronousStandbys == nil {
s.MinSynchronousStandbys = Uint16P(DefaultMinSynchronousStandbys)
}
Expand Down Expand Up @@ -607,6 +625,12 @@ type DBSpec struct {
SynchronousReplication bool `json:"synchronousReplication,omitempty"`
// Whether to use pg_rewind
UsePgrewind bool `json:"usePgrewind,omitempty"`
// Interval to wait until next pg_rewind try
PgrewindInterval Duration `json:"pgrewindInterval,omitempty"`
// Time after which we stop trying to pg_rewind
PgrewindTimeout Duration `json:"pgrewindTimeout,omitempty"`
// Checkpoint before performing pg_rewind
PgrewindCheckpoint bool `json:"pgrewindCheckpoint,omitempty"`
// AdditionalWalSenders defines the number of additional wal_senders in
// addition to the ones internally defined by stolon
AdditionalWalSenders uint16 `json:"additionalWalSenders"`
Expand Down