Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions workflow/controller/artifact_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ func (woc *wfOperationCtx) HasArtifactGC() bool {
return true
}
}
for _, steps := range template.Steps {
for _, step := range steps.Steps {
if step.Inline != nil {
for _, artifact := range step.Inline.Outputs.Artifacts {
strategy := woc.execWf.GetArtifactGCStrategy(&artifact)
if strategy != wfv1.ArtifactGCStrategyUndefined && strategy != wfv1.ArtifactGCNever {
return true
}
}
}
}
}
}

// need to go to woc.wf.Status.StoredTemplates in the case of a Step referencing a WorkflowTemplate
Expand Down
105 changes: 105 additions & 0 deletions workflow/controller/artifact_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,3 +723,108 @@ func TestWorkflowHasArtifactGC(t *testing.T) {
}

}

func TestInlineWorkflowHasArtifactGC(t *testing.T) {
tests := []struct {
name string
workflowArtGCStrategySpec string
artifactGCStrategySpec string
expectedResult bool
}{
{
name: "WorkflowSpecGC_Completion",
workflowArtGCStrategySpec: `
artifactGC:
strategy: OnWorkflowCompletion`,
artifactGCStrategySpec: "",
expectedResult: true,
},
{
name: "ArtifactSpecGC_Completion",
workflowArtGCStrategySpec: "",
artifactGCStrategySpec: `
artifactGC:
strategy: OnWorkflowCompletion`,
expectedResult: true,
},
{
name: "WorkflowSpecGC_Deletion",
workflowArtGCStrategySpec: `
artifactGC:
strategy: OnWorkflowDeletion`,
artifactGCStrategySpec: "",
expectedResult: true,
},
{
name: "ArtifactSpecGC_Deletion",
workflowArtGCStrategySpec: "",
artifactGCStrategySpec: `
artifactGC:
strategy: OnWorkflowDeletion`,
expectedResult: true,
},
{
name: "NoGC",
workflowArtGCStrategySpec: "",
artifactGCStrategySpec: "",
expectedResult: false,
},
{
name: "WorkflowSpecGC_None",
workflowArtGCStrategySpec: `
artifactGC:
strategy: ""`,
artifactGCStrategySpec: "",
expectedResult: false,
},
{
name: "ArtifactSpecGC_None",
workflowArtGCStrategySpec: `
artifactGC:
strategy: OnWorkflowDeletion`,
artifactGCStrategySpec: `
artifactGC:
strategy: Never`,
expectedResult: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

workflowSpec := fmt.Sprintf(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-passing-
spec:
entrypoint: whalesay
%s
templates:
- name: whalesay
steps:
- - name: generate-artifact
inline:
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["sleep 1; cowsay hello world | tee /tmp/hello_world.txt"]
outputs:
artifacts:
- name: out
path: /out
s3:
key: out
%s`, tt.workflowArtGCStrategySpec, tt.artifactGCStrategySpec)

wf := wfv1.MustUnmarshalWorkflow(workflowSpec)
ctx := logging.TestContext(t.Context())
cancel, controller := newController(ctx, wf)
defer cancel()
woc := newWorkflowOperationCtx(ctx, wf, controller)

hasArtifact := woc.HasArtifactGC()

assert.Equal(t, tt.expectedResult, hasArtifact)
})
}
}
9 changes: 6 additions & 3 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
{
sgNode, err := woc.wf.GetNodeByName(sgNodeName)
if err != nil {
_ = woc.initializeNode(ctx, sgNodeName, wfv1.NodeTypeStepGroup, stepTemplateScope, &wfv1.WorkflowStep{}, stepsCtx.boundaryID, wfv1.NodeRunning, &wfv1.NodeFlag{}, true)
_ = woc.initializeNode(ctx, sgNodeName, wfv1.NodeTypeStepGroup, stepTemplateScope, &wfv1.WorkflowStep{Template: tmpl.Name}, stepsCtx.boundaryID, wfv1.NodeRunning, &wfv1.NodeFlag{}, true)
} else if !sgNode.Fulfilled() {
_ = woc.markNodePhase(ctx, sgNodeName, wfv1.NodeRunning)
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
}
}

sgNode, err := woc.executeStepGroup(ctx, stepGroup.Steps, sgNodeName, &stepsCtx)
sgNode, err := woc.executeStepGroup(ctx, stepGroup.Steps, sgNodeName, &stepsCtx, tmpl.Name)
if err != nil {
return woc.markNodeError(ctx, sgNodeName, err), nil
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func (woc *wfOperationCtx) updateOutboundNodes(ctx context.Context, nodeName str

// executeStepGroup examines a list of parallel steps and executes them in parallel.
// Handles referencing of variables in scope, expands `withItem` clauses, and evaluates `when` expressions
func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext, tmplName string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(sgNodeName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -259,6 +259,9 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv

// Kick off all parallel steps in the group
for _, step := range stepGroup {
if step.Template == "" && step.TemplateRef == nil {
step.Template = tmplName
}
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)

// Check the step's when clause to decide if it should execute
Expand Down
Loading