Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
// Check if we need to force manual strategy due to external modification
rolloutStrategy := w.Spec.RolloutStrategy
if w.Status.LastModifierIdentity != getControllerIdentity() &&
w.Status.LastModifierIdentity != serverDeleteVersionIdentity &&
w.Status.LastModifierIdentity != "" &&
!temporalState.IgnoreLastModifier {
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
controllerVersionEnvKey = "CONTROLLER_VERSION"
controllerIdentityEnvKey = "CONTROLLER_IDENTITY"
ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"

serverDeleteVersionIdentity = "try-delete-for-add-version"
)

// Version is set by goreleaser via ldflags at build time
Expand Down
32 changes: 26 additions & 6 deletions internal/temporal/worker_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,37 @@ func GetWorkerDeploymentState(
versionInfo.Status = temporaliov1alpha1.VersionStatusDrained

// Get drain time information
versionResp, err := deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{
BuildID: version.Version.BuildId,
})
if err == nil {
drainedSince := versionResp.Info.DrainageInfo.LastChangedTime
var desc temporalClient.WorkerDeploymentVersionDescription
describeVersionUntilDrainTime := func() error {
desc, err = deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{
BuildID: version.Version.BuildId,
})
if err != nil {
return err
}
if desc.Info.DrainageInfo == nil {
return fmt.Errorf("drainage info nil for build %s", version.Version.BuildId)
}
if desc.Info.DrainageInfo.DrainageStatus != temporalClient.WorkerDeploymentVersionDrainageStatusDrained {
return fmt.Errorf("version info does not say that build %s is drained", version.Version.BuildId)
}
return err
}
// At first, version is found in DeploymentInfo.VersionSummaries but may not have the full drainage info in
// describe version, so we describe with backoff.
// If the version was just deleted by the server, we may never succeed at describing it, and it should
// be treated as NotRegistered, since it no longer exists in Temporal.
var notFound *serviceerror.NotFound
if err = withBackoff(10*time.Second, 1*time.Second, describeVersionUntilDrainTime); err == nil { //revive:disable-line:max-control-nesting
drainedSince := desc.Info.DrainageInfo.LastChangedTime
versionInfo.DrainedSince = &drainedSince
// If the deployment exists and has replicas, we assume there are versioned pollers, no need to check
deployment, ok := k8sDeployments[version.Version.BuildId]
if !ok || deployment.Status.Replicas == 0 { //revive:disable-line:max-control-nesting
versionInfo.NoTaskQueuesHaveVersionedPoller = noTaskQueuesHaveVersionedPollers(ctx, client, versionResp.Info.TaskQueuesInfos)
versionInfo.NoTaskQueuesHaveVersionedPoller = noTaskQueuesHaveVersionedPollers(ctx, client, desc.Info.TaskQueuesInfos)
}
} else if errors.As(err, &notFound) { //revive:disable-line:max-control-nesting
versionInfo.Status = temporaliov1alpha1.VersionStatusNotRegistered
}
} else {
versionInfo.Status = temporaliov1alpha1.VersionStatusInactive
Expand Down
1 change: 1 addition & 0 deletions internal/tests/internal/env_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
testDrainageVisibilityGracePeriod = time.Second
testDrainageRefreshInterval = time.Second
testMaxVersionsIneligibleForDeletion = 5
testMaxVersionsInDeployment = 6
)

// setupKubebuilderAssets sets up the KUBEBUILDER_ASSETS environment variable if not already set
Expand Down
61 changes: 57 additions & 4 deletions internal/tests/internal/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestIntegration(t *testing.T) {
// make versions drain faster
dc.OverrideValue("matching.wv.VersionDrainageStatusVisibilityGracePeriod", testDrainageVisibilityGracePeriod)
dc.OverrideValue("matching.wv.VersionDrainageStatusRefreshInterval", testDrainageRefreshInterval)
dc.OverrideValue("matching.maxVersionsInDeployment", testMaxVersionsInDeployment)
ts := temporaltest.NewServer(
temporaltest.WithT(t),
temporaltest.WithBaseServerOptions(temporal.WithDynamicConfigClient(dc)),
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestIntegration(t *testing.T) {
),
},
{
name: "manual-rollout-blocked-at-max-replicas",
name: "manual-rollout-blocked-at-max-versions-ineligible-for-deletion",
builder: testhelpers.NewTestCase().
WithInput(
testhelpers.NewTemporalWorkerDeploymentBuilder().
Expand Down Expand Up @@ -355,7 +356,7 @@ func TestIntegration(t *testing.T) {
),
},
{
name: "all-at-once-blocked-at-max-replicas",
name: "all-at-once-blocked-at-max-versions-ineligible-for-deletion",
builder: testhelpers.NewTestCase().
WithInput(
testhelpers.NewTemporalWorkerDeploymentBuilder().
Expand Down Expand Up @@ -582,7 +583,7 @@ func TestIntegration(t *testing.T) {
),
},
{
name: "progressive-rollout-blocked-at-max-replicas",
name: "progressive-rollout-blocked-at-ctrlr-max-versions",
builder: testhelpers.NewTestCase().
WithInput(
testhelpers.NewTemporalWorkerDeploymentBuilder().
Expand Down Expand Up @@ -700,14 +701,15 @@ func TestIntegration(t *testing.T) {
// make versions drain faster
dcShortTTL.OverrideValue("matching.wv.VersionDrainageStatusVisibilityGracePeriod", testDrainageVisibilityGracePeriod)
dcShortTTL.OverrideValue("matching.wv.VersionDrainageStatusRefreshInterval", testDrainageRefreshInterval)
dcShortTTL.OverrideValue("matching.maxVersionsInDeployment", testMaxVersionsInDeployment)
tsShortTTL := temporaltest.NewServer(
temporaltest.WithT(t),
temporaltest.WithBaseServerOptions(temporal.WithDynamicConfigClient(dcShortTTL)),
)
testsShortPollerTTL := []testCase{
// Note: Add tests that require pollers to expire quickly here
{
name: "nth-rollout-unblocked-after-pollers-die",
name: "6th-rollout-unblocked-after-pollers-die-max-ctrlr-versions",
builder: testhelpers.NewTestCase().
WithInput(
testhelpers.NewTemporalWorkerDeploymentBuilder().
Expand Down Expand Up @@ -754,6 +756,57 @@ func TestIntegration(t *testing.T) {
testhelpers.NewDeploymentInfo("v5", 1),
),
},
{
name: "7th-rollout-unblocked-after-pollers-die-version-deleted",
builder: testhelpers.NewTestCase().
WithInput(
testhelpers.NewTemporalWorkerDeploymentBuilder().
WithAllAtOnceStrategy().
WithTargetTemplate("v6").
WithStatus(
testhelpers.NewStatusBuilder().
WithTargetVersion("v5", temporaliov1alpha1.VersionStatusCurrent, -1, true, true).
WithCurrentVersion("v5", true, true).
WithDeprecatedVersions( // drained AND has no pollers -> eligible for deletion
testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, true, true),
testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, true, true),
testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, true, true),
testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, true, true),
testhelpers.NewDeprecatedVersionInfo("v4", temporaliov1alpha1.VersionStatusDrained, true, true, true),
),
),
).
WithExistingDeployments(
testhelpers.NewDeploymentInfo("v0", 0), // 0 replicas -> no pollers
testhelpers.NewDeploymentInfo("v1", 1),
testhelpers.NewDeploymentInfo("v2", 1),
testhelpers.NewDeploymentInfo("v3", 1),
testhelpers.NewDeploymentInfo("v4", 1),
testhelpers.NewDeploymentInfo("v5", 1),
).
WithWaitTime(5*time.Second).
WithExpectedStatus(
testhelpers.NewStatusBuilder().
WithTargetVersion("v6", temporaliov1alpha1.VersionStatusCurrent, -1, false, false).
WithCurrentVersion("v6", true, false).
WithDeprecatedVersions( // drained AND has pollers -> eligible for deletion
testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, false, true),
testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, false, true),
testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, false, true),
testhelpers.NewDeprecatedVersionInfo("v4", temporaliov1alpha1.VersionStatusDrained, true, false, true),
testhelpers.NewDeprecatedVersionInfo("v5", temporaliov1alpha1.VersionStatusDrained, true, false, true),
),
).
WithExpectedDeployments(
testhelpers.NewDeploymentInfo("v0", 0), // 0 replicas -> no pollers
testhelpers.NewDeploymentInfo("v1", 1),
testhelpers.NewDeploymentInfo("v2", 1),
testhelpers.NewDeploymentInfo("v3", 1),
testhelpers.NewDeploymentInfo("v4", 1),
testhelpers.NewDeploymentInfo("v5", 1),
testhelpers.NewDeploymentInfo("v6", 1),
),
},
}

for _, tc := range testsShortPollerTTL {
Expand Down