Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions examples/test/always-succeed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: state-machine.converged-computing.org/v1alpha1
kind: StateMachine
metadata:
name: state-machine
spec:
manager:
pullPolicy: Never
workflow:
completed: 2
cluster:
maxSize: 2

jobs:
- name: job_a
properties:
always-succeed: "1"
config:
nodes: 1
coresPerTask: 1
image: rockylinux:9
script: |
sleep 5
exit 1

- name: job_b
properties:
always-succeed: "1"
config:
nodes: 1
coresPerTask: 1
image: rockylinux:9
script: |
sleep 5
exit 1

- name: job_c
config:
nodes: 1
coresPerTask: 1
properties:
always-succeed: "1"
image: rockylinux:9
script: |
sleep 5
exit 1
2 changes: 1 addition & 1 deletion python/state_machine_operator/machine/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def new_state_machine(config, jobid, tracker_type="kubernetes"):
"mark_failed": mark_failed,
# Booleans to check state
"is_failed": is_failed,
"is_succeeded": is_failed,
"is_succeeded": is_succeeded,
"is_running": is_running,
"is_complete": False,
"jobid": jobid,
Expand Down
6 changes: 3 additions & 3 deletions python/state_machine_operator/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def watch(self):
Watch is an event driven means to watch for changes and update job states
accordingly.
"""
# TODO we should have some kind of timeout that does not rely on an event
# TODO we should have some kind of check that does not rely on an event
for job in self.tracker.stream_events():

# Not a job associated with the workflow, or is ignored
Expand All @@ -398,8 +398,8 @@ def watch(self):
if job.is_active() and not job.is_completed():
continue

# The job just completed and ran successfully, trigger the next step
if job.is_succeeded() and job.is_completed():
# The job ran successfully, trigger the next step
if job.is_succeeded():
self.add_timestamp(f"{job.label}_succeeded")
LOGGER.debug(f"Job {job.jobid} completed stage '{state_machine.current_state.id}'")
state_machine.mark_succeeded()
Expand Down
9 changes: 9 additions & 0 deletions python/state_machine_operator/tracker/kubernetes/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os

import state_machine_operator.defaults as defaults
import state_machine_operator.utils as utils


class Job:
Expand All @@ -23,6 +24,10 @@ def label(self):
def step_name(self):
return self.job.metadata.labels.get("app")

@property
def always_succeed(self):
return self.job.metadata.labels.get("always-succeed") in utils.true_values

def is_active(self):
"""
Determine if a job is active
Expand All @@ -39,12 +44,16 @@ def is_failed(self):
"""
Determine if a job is failed
"""
if self.always_succeed:
return False
return self.job.status.failed == 1

def is_succeeded(self):
"""
Determine if a job has succeeded
"""
if self.always_succeed:
return True
return self.job.status.succeeded == 1


Expand Down
12 changes: 5 additions & 7 deletions python/state_machine_operator/tracker/kubernetes/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,7 @@ def get_node_selector(self):
"""
Node selector is in properties -> node-selector
"""
# Properties can be provided as a string to json load
props = self.job_desc.get("properties", {})
if isinstance(props, str):
props = json.loads(props)
if not props:
return props
return props.get("node-selector")
return self.properties.get("node-selector")

def generate_batch_job(self, step, jobid):
"""
Expand Down Expand Up @@ -202,6 +196,10 @@ def generate_batch_job(self, step, jobid):
},
}

# Should the job always succeed?
if self.always_succeed:
template["metadata"]["labels"]["always-succeed"] = "1"

# Add node selectors? E.g.,
# node.kubernetes.io/instance-type: c7a.4xlarge
node_selector = self.get_node_selector()
Expand Down
32 changes: 32 additions & 0 deletions python/state_machine_operator/tracker/tracker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
import os

import state_machine_operator.utils as utils
from state_machine_operator.tracker.types import SubmissionCode

# Print debug for now
Expand Down Expand Up @@ -36,6 +38,25 @@ def extra_environment(self):
environ.append({"name": key, "value": value})
return environ

@property
def properties(self):
"""
Properties are attributes that are specific to a tracker.
"""
# Properties can be provided as a string to json load
props = self.job_desc.get("properties", {})
if isinstance(props, str):
props = json.loads(props)
return props

@property
def always_succeed(self):
"""
Should the job always be marked as successful?
"""
props = self.properties or {}
return props.get("always-succeed") in utils.true_values or False


class BaseTracker:
"""
Expand Down Expand Up @@ -103,6 +124,17 @@ def name(self):
"""
return self.job_desc["name"]

@property
def properties(self):
"""
Properties are attributes that are specific to a tracker.
"""
# Properties can be provided as a string to json load
props = self.job_desc.get("properties", {})
if isinstance(props, str):
props = json.loads(props)
return props

@property
def registry_host(self):
"""
Expand Down
2 changes: 2 additions & 0 deletions python/state_machine_operator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import yaml

true_values = ["1", "yes", 1, "true", True, "True"]


def read_json(filename):
"""
Expand Down