Skip to content

Commit 7546fcc

Browse files
committed
improve the canary features
Signed-off-by: Jiaxin Shan <[email protected]>
1 parent 1061839 commit 7546fcc

File tree

5 files changed

+128
-26
lines changed

5 files changed

+128
-26
lines changed

pkg/controller/stormservice/canary.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ func (r *StormServiceReconciler) processCanaryUpdate(ctx context.Context, stormS
139139
currentStepIndex := canaryStatus.CurrentStep
140140
if currentStepIndex < int32(len(steps)) {
141141
currentStep := steps[currentStepIndex]
142-
// If current step is a manual pause step but no pause condition exists, advance
142+
// Treat as resume ONLY if we've already marked PausedAt (i.e., entered this pause step)
143+
// and the CanaryPauseStep condition has been removed by the user.
143144
if currentStep.Pause != nil && currentStep.Pause.IsManualPause() {
144-
if !r.hasPauseCondition(canaryStatus, orchestrationv1alpha1.PauseReasonCanaryPauseStep) {
145+
if canaryStatus.PausedAt != nil && !r.hasPauseCondition(canaryStatus, orchestrationv1alpha1.PauseReasonCanaryPauseStep) {
145146
klog.Infof("Manual pause condition removed for StormService %s/%s, advancing to next step",
146147
stormService.Namespace, stormService.Name)
147148
return r.advanceCanaryStep(ctx, stormService)
@@ -211,7 +212,7 @@ func (r *StormServiceReconciler) processCanaryPauseStep(ctx context.Context, sto
211212
addStatusUpdate(func(status *orchestrationv1alpha1.CanaryStatus) {
212213
status.PausedAt = &now
213214
status.Phase = orchestrationv1alpha1.CanaryPhasePaused
214-
215+
215216
// Add pause condition to track why it's paused
216217
pauseCondition := orchestrationv1alpha1.PauseCondition{
217218
Reason: orchestrationv1alpha1.PauseReasonCanaryPauseStep,
@@ -297,8 +298,12 @@ func (r *StormServiceReconciler) processCanaryPauseStep(ctx context.Context, sto
297298
return ctrl.Result{}, fmt.Errorf("failed to add CanaryPauseStep pause condition: %w", err)
298299
}
299300
} else {
300-
r.EventRecorder.Eventf(stormService, "Normal", "CanaryPauseManual",
301-
"Canary paused at manual pause step. Remove CanaryPauseStep pause condition to continue")
301+
// Emit a consistent CanaryUpdate event even if the pause condition already exists
302+
update := newCanaryStatusUpdate().
303+
addEvent("Canary paused at manual pause step. Remove CanaryPauseStep pause condition to continue")
304+
if err := r.applyCanaryStatusUpdate(ctx, stormService, update); err != nil {
305+
return ctrl.Result{}, fmt.Errorf("failed to record manual pause event: %w", err)
306+
}
302307
}
303308

304309
// Requeue periodically to check if pause condition was removed
@@ -342,15 +347,15 @@ func (r *StormServiceReconciler) applyCanaryWeight(ctx context.Context, stormSer
342347

343348
// Check if this is a scaling event during canary (compare with current status replicas)
344349
currentStatusReplicas := stormService.Status.Replicas
345-
350+
346351
// If replicas changed during canary, log and notify but continue with recalculation
347352
if currentStatusReplicas > 0 && currentStatusReplicas != totalReplicas {
348-
klog.Infof("Scaling detected during canary for StormService %s/%s: %d -> %d replicas, recalculating distribution",
353+
klog.Infof("Scaling detected during canary for StormService %s/%s: %d -> %d replicas, recalculating distribution",
349354
stormService.Namespace, stormService.Name, currentStatusReplicas, totalReplicas)
350-
355+
351356
// Log the recalculation event
352357
r.EventRecorder.Eventf(stormService, "Normal", "CanaryScaling",
353-
"Recalculated canary distribution due to scaling: %d%% weight with %d total replicas (%d canary, %d stable)",
358+
"Recalculated canary distribution due to scaling: %d%% weight with %d total replicas (%d canary, %d stable)",
354359
weight, totalReplicas, canaryReplicas, stableReplicas)
355360
}
356361

@@ -401,7 +406,7 @@ func (r *StormServiceReconciler) advanceCanaryStep(ctx context.Context, stormSer
401406
update := newCanaryStatusUpdate().
402407
addStatusUpdate(func(status *orchestrationv1alpha1.CanaryStatus) {
403408
status.CurrentStep = nextStep
404-
status.PausedAt = nil // Clear pause timestamp
409+
status.PausedAt = nil // Clear pause timestamp
405410
status.PauseConditions = nil // Clear pause conditions when resuming
406411
status.Phase = orchestrationv1alpha1.CanaryPhaseProgressing
407412
})
@@ -474,15 +479,17 @@ func (r *StormServiceReconciler) completeCanary(ctx context.Context, stormServic
474479
return ctrl.Result{}, fmt.Errorf("failed to promote canary revision: %w", err)
475480
}
476481

482+
// Use single patch operation for the final status update
483+
// Capture the original object BEFORE mutating status to ensure the patch contains the changes
484+
original := stormService.DeepCopy()
485+
477486
// Step 3: Update main status to reflect the promotion
478487
stormService.Status.CurrentRevision = promotedRevision
479488
stormService.Status.UpdateRevision = promotedRevision
480489

481490
// Step 4: Clear canary status - this triggers normal rollout logic to take over
482491
stormService.Status.CanaryStatus = nil
483492

484-
// Use single patch operation for the final status update
485-
original := stormService.DeepCopy()
486493
if err := r.Status().Patch(ctx, stormService, client.MergeFrom(original)); err != nil {
487494
return ctrl.Result{}, fmt.Errorf("failed to clear canary status and promote revision: %w", err)
488495
}

pkg/controller/stormservice/canary_test.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -237,29 +237,33 @@ func TestProcessCanaryWeightStep(t *testing.T) {
237237

238238
// Verify events were recorded (may receive multiple events)
239239
var events []string
240-
timeout := time.After(time.Second)
241-
eventLoop:
242-
for len(events) < 2 {
240+
timeout := time.After(2 * time.Second)
241+
collect:
242+
for len(events) < 3 { // collect a few events or until timeout
243243
select {
244244
case event := <-eventRecorder.Events:
245245
events = append(events, event)
246246
case <-timeout:
247-
break eventLoop
247+
break collect
248248
}
249249
}
250250

251-
// Should have received both CanaryWeightApplied and CanaryReplicaMode events
251+
// We expect at least the generic CanaryUpdate (weight set) and a mode-specific event
252+
// Replica mode is used here since replicas=3
252253
require.True(t, len(events) >= 1, "Expected at least one event")
253254

254-
// Check that we got either CanaryWeightApplied or CanaryReplicaMode (or both)
255-
foundWeightEvent := false
256-
for _, event := range events {
257-
if strings.Contains(event, "33%") && (strings.Contains(event, "CanaryWeightApplied") || strings.Contains(event, "CanaryReplicaMode")) {
258-
foundWeightEvent = true
259-
break
255+
foundUpdate := false
256+
foundReplicaMode := false
257+
for _, e := range events {
258+
if strings.Contains(e, "CanaryUpdate") && strings.Contains(e, "33%") {
259+
foundUpdate = true
260+
}
261+
if strings.Contains(e, "CanaryReplicaMode") && strings.Contains(e, "33%") {
262+
foundReplicaMode = true
260263
}
261264
}
262-
assert.True(t, foundWeightEvent, "Expected to find weight-related event with 33%, got events: %v", events)
265+
assert.True(t, foundUpdate, "Expected CanaryUpdate event with 33%%, got: %v", events)
266+
assert.True(t, foundReplicaMode, "Expected CanaryReplicaMode event with 33%%, got: %v", events)
263267
}
264268

265269
func TestProcessCanaryPauseStep_AutomaticPause(t *testing.T) {
@@ -459,6 +463,8 @@ func TestAdvanceCanaryStep(t *testing.T) {
459463
assert.Equal(t, int32(1), stormService.Status.CanaryStatus.CurrentStep)
460464
assert.Nil(t, stormService.Status.CanaryStatus.PausedAt)
461465
assert.Equal(t, orchestrationv1alpha1.CanaryPhaseProgressing, stormService.Status.CanaryStatus.Phase)
466+
// Pause conditions should also be cleared when advancing
467+
assert.Empty(t, stormService.Status.CanaryStatus.PauseConditions)
462468
}
463469

464470
func TestHasPauseCondition(t *testing.T) {

test/e2e/stormservice_canary_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,12 @@ func waitForCanaryResume(t *testing.T, client *v1alpha1.Clientset, name string)
727727
return false, err
728728
}
729729

730+
if storm.Status.CanaryStatus == nil {
731+
// Already completed or cleared; consider as resumed for this check
732+
t.Logf("canary status cleared - considered resumed")
733+
return true, nil
734+
}
735+
730736
t.Logf("resume check: spec.paused=%v, canary phase=%s, step=%d, pausedAt=%v",
731737
storm.Spec.Paused, storm.Status.CanaryStatus.Phase,
732738
storm.Status.CanaryStatus.CurrentStep, storm.Status.CanaryStatus.PausedAt)

test/e2e/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,12 @@ func waitForStormServiceReady(ctx context.Context, t *testing.T, k8sClient *kube
181181

182182
// Check if all replicas are ready
183183
if ss.Status.ReadyReplicas > 0 && ss.Status.ReadyReplicas == ss.Status.Replicas {
184-
t.Logf("StormService %s is ready: %d/%d replicas ready",
184+
t.Logf("StormService %s is ready: %d/%d replicas ready",
185185
stormService.Name, ss.Status.ReadyReplicas, ss.Status.Replicas)
186186
return true, nil
187187
}
188188

189-
t.Logf("Waiting for StormService %s to be ready: %d/%d replicas ready",
189+
t.Logf("Waiting for StormService %s to be ready: %d/%d replicas ready",
190190
stormService.Name, ss.Status.ReadyReplicas, ss.Status.Replicas)
191191
return false, nil
192192
})

test/integration/controller/stormservice_canary_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,89 @@ var _ = ginkgo.Describe("StormService Canary Controller Integration Test", func(
317317
g.Expect(cleared.Status.CanaryStatus.Phase).To(gomega.Equal(orchestrationapi.CanaryPhaseProgressing))
318318
}, time.Second*10, time.Millisecond*500).Should(gomega.Succeed())
319319
})
320+
321+
ginkgo.It("should clear canary status on completion", func() {
322+
name := "clear-canary-completion"
323+
stormService := createCanaryStormService(ns.Name, name)
324+
325+
// Create StormService
326+
gomega.Expect(k8sClient.Create(ctx, stormService)).To(gomega.Succeed())
327+
328+
// Wait for initial revision to be set by the controller
329+
gomega.Eventually(func() bool {
330+
got := &orchestrationapi.StormService{}
331+
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(stormService), got); err != nil {
332+
return false
333+
}
334+
return got.Status.CurrentRevision != ""
335+
}, time.Second*30, time.Millisecond*300).Should(gomega.BeTrue())
336+
337+
// Trigger a new revision by changing the image to start canary
338+
gomega.Eventually(func(g gomega.Gomega) {
339+
latest := &orchestrationapi.StormService{}
340+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(stormService), latest)).To(gomega.Succeed())
341+
// Change container image to trigger updateRevision != currentRevision
342+
if len(latest.Spec.Template.Spec.Roles) > 0 && len(latest.Spec.Template.Spec.Roles[0].Template.Spec.Containers) > 0 {
343+
latest.Spec.Template.Spec.Roles[0].Template.Spec.Containers[0].Image = "nginx:1.21"
344+
g.Expect(k8sClient.Update(ctx, latest)).To(gomega.Succeed())
345+
}
346+
}, time.Second*10, time.Millisecond*200).Should(gomega.Succeed())
347+
348+
// Wait until controller detects update (either revisions differ or canary status appears)
349+
gomega.Eventually(func() bool {
350+
updated := &orchestrationapi.StormService{}
351+
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(stormService), updated); err != nil {
352+
return false
353+
}
354+
return updated.Status.UpdateRevision != updated.Status.CurrentRevision || updated.Status.CanaryStatus != nil
355+
}, time.Second*60, time.Millisecond*300).Should(gomega.BeTrue())
356+
357+
// Simulate completion by setting CurrentStep >= len(steps) and 100% weight
358+
gomega.Eventually(func(g gomega.Gomega) {
359+
latest := &orchestrationapi.StormService{}
360+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(stormService), latest)).To(gomega.Succeed())
361+
steps := latest.Spec.UpdateStrategy.Canary.Steps
362+
// Ensure canaryStatus exists to edit
363+
if latest.Status.CanaryStatus == nil {
364+
latest.Status.CanaryStatus = &orchestrationapi.CanaryStatus{}
365+
}
366+
latest.Status.CanaryStatus.CurrentStep = int32(len(steps)) // mark as completed
367+
latest.Status.CanaryStatus.CurrentWeight = 100
368+
// Use observed revisions for promotion fields
369+
latest.Status.CanaryStatus.StableRevision = latest.Status.CurrentRevision
370+
latest.Status.CanaryStatus.CanaryRevision = latest.Status.UpdateRevision
371+
latest.Status.CanaryStatus.Phase = orchestrationapi.CanaryPhaseProgressing
372+
373+
g.Expect(k8sClient.Status().Update(ctx, latest)).To(gomega.Succeed())
374+
}, time.Second*5, time.Millisecond*200).Should(gomega.Succeed())
375+
376+
// Status update should be enough to trigger reconcile; no spec poke required
377+
378+
// Verify canaryStatus is cleared by controller completion logic and revisions align
379+
gomega.Eventually(func(g gomega.Gomega) {
380+
final := &orchestrationapi.StormService{}
381+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(stormService), final)).To(gomega.Succeed())
382+
383+
// Check if canary is completed first
384+
if final.Status.CanaryStatus != nil {
385+
// If canary status exists, it should be in completed phase or cleared
386+
if final.Status.CanaryStatus.Phase == orchestrationapi.CanaryPhaseCompleted {
387+
// Phase is completed, status should be cleared soon - continue waiting
388+
g.Expect(final.Status.CanaryStatus).ToNot(gomega.BeNil()) // This will pass, allowing retry
389+
}
390+
// Still in progress - continue waiting
391+
g.Expect(final.Status.CanaryStatus.Phase).To(gomega.BeElementOf(
392+
orchestrationapi.CanaryPhaseInitializing,
393+
orchestrationapi.CanaryPhaseProgressing,
394+
orchestrationapi.CanaryPhasePaused,
395+
orchestrationapi.CanaryPhaseCompleted,
396+
)) // This will pass for valid phases, allowing retry
397+
} else {
398+
// Canary status is cleared, verify revisions align
399+
g.Expect(final.Status.UpdateRevision).To(gomega.Equal(final.Status.CurrentRevision))
400+
}
401+
}, time.Second*120, time.Millisecond*500).Should(gomega.Succeed())
402+
})
320403
})
321404

322405
ginkgo.Context("Canary Mode Detection", func() {

0 commit comments

Comments
 (0)