diff --git a/api/v1alpha1/statemachine_types.go b/api/v1alpha1/statemachine_types.go index 57b3c4c..8e6b2cd 100644 --- a/api/v1alpha1/statemachine_types.go +++ b/api/v1alpha1/statemachine_types.go @@ -165,6 +165,10 @@ type JobConfig struct { // +optional Nodes int32 `json:"nodes,omitempty"` + // Number of tasks + // +optional + Tasks int32 `json:"tasks,omitempty"` + // Cores per task per job // 6 frontier / 3 summit / 5 on lassen (vsoch: this used to be 6 default) // +kubebuilder:default=3 diff --git a/config/crd/bases/state-machine.converged-computing.org_statemachines.yaml b/config/crd/bases/state-machine.converged-computing.org_statemachines.yaml index 2af4011..a71cb14 100644 --- a/config/crd/bases/state-machine.converged-computing.org_statemachines.yaml +++ b/config/crd/bases/state-machine.converged-computing.org_statemachines.yaml @@ -101,6 +101,10 @@ spec: because we assume the initial data is bad. RetryFailure type: boolean + tasks: + description: Number of tasks + format: int32 + type: integer walltime: description: Walltime (in string format) for the job type: string diff --git a/examples/dist/state-machine-operator-dev.yaml b/examples/dist/state-machine-operator-dev.yaml index 6221b13..306aac9 100644 --- a/examples/dist/state-machine-operator-dev.yaml +++ b/examples/dist/state-machine-operator-dev.yaml @@ -109,6 +109,10 @@ spec: because we assume the initial data is bad. RetryFailure type: boolean + tasks: + description: Number of tasks + format: int32 + type: integer walltime: description: Walltime (in string format) for the job type: string diff --git a/examples/test/lammps-metrics.yaml b/examples/test/lammps-metrics.yaml index 1811a68..501cf00 100644 --- a/examples/test/lammps-metrics.yaml +++ b/examples/test/lammps-metrics.yaml @@ -5,23 +5,22 @@ metadata: spec: manager: pullPolicy: Never - interactive: true workflow: completed: 10 events: # Two studies: - # If lammps runtime is > 3:00 add a node (do with autoscaler) + # If lammps runtime is > 30 add a node (do with autoscaler) # what is the base case - metric: mean.lammps.duration - when: ">= 360" + when: ">= 30" action: grow - repetitions: 3 + repetitions: 2 # Require checks between before doing again # TODO check if this is working correctly. # also check the .get() backoff: 3 - maxSize: 10 + maxSize: 5 cluster: maxSize: 2 @@ -34,8 +33,6 @@ spec: image: rockylinux:9 script: echo This is a setup for lammps - # Note that this step always fails and we never make it to C - # This should end the workflow early - name: lammps properties: minicluster: "yes" @@ -43,7 +40,7 @@ spec: nodes: 4 image: ghcr.io/converged-computing/metric-lammps-cpu:zen4 workdir: /opt/lammps/examples/reaxff/HNS/ - script: lmp -v x 4 -v y 4 -v z 4 -in ./in.reaxff.hns -nocite + script: flux run -N $nodes lmp -v x 4 -v y 4 -v z 4 -in ./in.reaxff.hns -nocite - name: job_c config: diff --git a/internal/controller/manager/jobs/jobs.go b/internal/controller/manager/jobs/jobs.go index 05964b9..7bf97b6 100644 --- a/internal/controller/manager/jobs/jobs.go +++ b/internal/controller/manager/jobs/jobs.go @@ -26,6 +26,9 @@ cd - jobid="{{ jobid }}" outpath="{{ workdir }}" registry="{{ registry }}" +{% if cores_per_task %}cores_per_task={{ cores_per_task }}{% endif %} +{% if nodes %}nodes={{ nodes }}{% endif %} +{% if tasks %}tasks={{ tasks }}{% endif %} {% if pull %}pull_tag={{ pull }}{% endif %} {% if push %}push_tag={{ push }}{% endif %} diff --git a/internal/controller/manager/jobs/templates/components.sh b/internal/controller/manager/jobs/templates/components.sh index 735a8bd..33312cc 100644 --- a/internal/controller/manager/jobs/templates/components.sh +++ b/internal/controller/manager/jobs/templates/components.sh @@ -3,6 +3,7 @@ config: nnodes: {{ if .Job.Config.Nodes }}{{ .Job.Config.Nodes }}{{ else }}1{{ end }} cores_per_task: {{ if .Job.Config.CoresPerTask }}{{ .Job.Config.CoresPerTask }}{{ else }}6{{ end }} ngpus: {{ .Job.Config.Gpus }} + {{ if .Job.Config.Tasks }}tasks: {{ .Job.Config.Tasks }}{{ end }} {{ if .Job.Config.Walltime }}walltime: '{{ .Job.Config.Walltime }}'{{ end }} # Kubernetes specific settings {{ if .Job.Config.GPULabel }}gpulabel: {{ .Job.Config.GPULabel }}{{ end }} diff --git a/python/state_machine_operator/manager/manager.py b/python/state_machine_operator/manager/manager.py index 02736f2..1ab836a 100644 --- a/python/state_machine_operator/manager/manager.py +++ b/python/state_machine_operator/manager/manager.py @@ -472,11 +472,14 @@ def check_metrics(self, job): def trigger_grow(self, trigger, step_name, value): """ - Trigger the job to grow + Trigger the job to grow. + + Note that this is more of a static grow - subsequent jobs will be given more + nodes. It doesn't give currently running jobs more. """ previous = self.workflow.jobs[step_name]["config"]["nnodes"] max_size = trigger.action.max_size - if max_size >= previous + 1: + if previous + 1 >= max_size: LOGGER.info( f"Grow triggered: {trigger.action.metric} {trigger.when} ({value}), already >= max size {max_size}" ) @@ -523,9 +526,13 @@ def trigger_workflow_action(self, trigger, step_name, value): ) self.complete_workflow() + # TODO: think about use case / mechanism for dynamic grow. + # It would likely need to be requested by the application. + # Static grow increases subsequent nodes for a job if trigger.action.name == "grow": self.trigger_grow(trigger, step_name, value) + # Static shrink decreases subsequent nodes for a job if trigger.action.name == "shrink": self.trigger_shrink(trigger, step_name, value) diff --git a/python/state_machine_operator/tracker/kubernetes/tracker.py b/python/state_machine_operator/tracker/kubernetes/tracker.py index 2e3875b..eb9073f 100644 --- a/python/state_machine_operator/tracker/kubernetes/tracker.py +++ b/python/state_machine_operator/tracker/kubernetes/tracker.py @@ -315,6 +315,7 @@ def submit_minicluster_job(self, step, jobid): "workingDir": step.workdir, "name": container_name, "pullAlways": pull_always, + "launcher": True, "volumes": { step.name: { "configMapName": step.name, @@ -570,6 +571,7 @@ def create_step(self, jobid): cores_per_task=self.ncores, gpus=self.ngpus, workdir=workdir, + tasks=self.tasks, ) if "script" in self.job_desc: @@ -584,6 +586,9 @@ def create_step(self, jobid): "push": self.push_to, "registry": self.registry_host, "plain_http": self.registry_plain_http, + "nodes": step.nodes, + "cores_per_task": self.ncores, + "tasks": step.tasks, } step.script = Template(self.job_desc["script"]).render(**kwargs) diff --git a/python/state_machine_operator/tracker/tracker.py b/python/state_machine_operator/tracker/tracker.py index 339d67e..b2835a4 100644 --- a/python/state_machine_operator/tracker/tracker.py +++ b/python/state_machine_operator/tracker/tracker.py @@ -124,6 +124,12 @@ def type(self): def nnodes(self): return int(self.config.get("nnodes", 1)) + @property + def tasks(self): + tasks = self.config.get("tasks") + if tasks is not None: + return int(tasks) + @property def ncores(self): return int(self.config.get("cores_per_task", 1)) diff --git a/python/state_machine_operator/tracker/types.py b/python/state_machine_operator/tracker/types.py index 25ca4a0..57b215b 100644 --- a/python/state_machine_operator/tracker/types.py +++ b/python/state_machine_operator/tracker/types.py @@ -28,6 +28,7 @@ class JobSetup: nodes: int cores_per_task: int script: str = None + tasks: int = None walltime: str = None gpus: int = 0 workdir: str = None