Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1alpha1/statemachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type JobConfig struct {
// +optional
Nodes int32 `json:"nodes,omitempty"`

// Number of tasks
// +optional
Tasks int32 `json:"tasks,omitempty"`

// Cores per task per job
// 6 frontier / 3 summit / 5 on lassen (vsoch: this used to be 6 default)
// +kubebuilder:default=3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ spec:
because we assume the initial data is bad.
RetryFailure
type: boolean
tasks:
description: Number of tasks
format: int32
type: integer
walltime:
description: Walltime (in string format) for the job
type: string
Expand Down
4 changes: 4 additions & 0 deletions examples/dist/state-machine-operator-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ spec:
because we assume the initial data is bad.
RetryFailure
type: boolean
tasks:
description: Number of tasks
format: int32
type: integer
walltime:
description: Walltime (in string format) for the job
type: string
Expand Down
13 changes: 5 additions & 8 deletions examples/test/lammps-metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ metadata:
spec:
manager:
pullPolicy: Never
interactive: true
workflow:
completed: 10

events:
# Two studies:
# If lammps runtime is > 3:00 add a node (do with autoscaler)
# If lammps runtime is > 30 add a node (do with autoscaler)
# what is the base case
- metric: mean.lammps.duration
when: ">= 360"
when: ">= 30"
action: grow
repetitions: 3
repetitions: 2
# Require checks between before doing again
# TODO check if this is working correctly.
# also check the .get()
backoff: 3
maxSize: 10
maxSize: 5

cluster:
maxSize: 2
Expand All @@ -34,16 +33,14 @@ spec:
image: rockylinux:9
script: echo This is a setup for lammps

# Note that this step always fails and we never make it to C
# This should end the workflow early
- name: lammps
properties:
minicluster: "yes"
config:
nodes: 4
image: ghcr.io/converged-computing/metric-lammps-cpu:zen4
workdir: /opt/lammps/examples/reaxff/HNS/
script: lmp -v x 4 -v y 4 -v z 4 -in ./in.reaxff.hns -nocite
script: flux run -N $nodes lmp -v x 4 -v y 4 -v z 4 -in ./in.reaxff.hns -nocite

- name: job_c
config:
Expand Down
3 changes: 3 additions & 0 deletions internal/controller/manager/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ cd -
jobid="{{ jobid }}"
outpath="{{ workdir }}"
registry="{{ registry }}"
{% if cores_per_task %}cores_per_task={{ cores_per_task }}{% endif %}
{% if nodes %}nodes={{ nodes }}{% endif %}
{% if tasks %}tasks={{ tasks }}{% endif %}
{% if pull %}pull_tag={{ pull }}{% endif %}
{% if push %}push_tag={{ push }}{% endif %}

Expand Down
1 change: 1 addition & 0 deletions internal/controller/manager/jobs/templates/components.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ config:
nnodes: {{ if .Job.Config.Nodes }}{{ .Job.Config.Nodes }}{{ else }}1{{ end }}
cores_per_task: {{ if .Job.Config.CoresPerTask }}{{ .Job.Config.CoresPerTask }}{{ else }}6{{ end }}
ngpus: {{ .Job.Config.Gpus }}
{{ if .Job.Config.Tasks }}tasks: {{ .Job.Config.Tasks }}{{ end }}
{{ if .Job.Config.Walltime }}walltime: '{{ .Job.Config.Walltime }}'{{ end }}
# Kubernetes specific settings
{{ if .Job.Config.GPULabel }}gpulabel: {{ .Job.Config.GPULabel }}{{ end }}
Expand Down
11 changes: 9 additions & 2 deletions python/state_machine_operator/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,14 @@ def check_metrics(self, job):

def trigger_grow(self, trigger, step_name, value):
"""
Trigger the job to grow
Trigger the job to grow.

Note that this is more of a static grow - subsequent jobs will be given more
nodes. It doesn't give currently running jobs more.
"""
previous = self.workflow.jobs[step_name]["config"]["nnodes"]
max_size = trigger.action.max_size
if max_size >= previous + 1:
if previous + 1 >= max_size:
LOGGER.info(
f"Grow triggered: {trigger.action.metric} {trigger.when} ({value}), already >= max size {max_size}"
)
Expand Down Expand Up @@ -523,9 +526,13 @@ def trigger_workflow_action(self, trigger, step_name, value):
)
self.complete_workflow()

# TODO: think about use case / mechanism for dynamic grow.
# It would likely need to be requested by the application.
# Static grow increases subsequent nodes for a job
if trigger.action.name == "grow":
self.trigger_grow(trigger, step_name, value)

# Static shrink decreases subsequent nodes for a job
if trigger.action.name == "shrink":
self.trigger_shrink(trigger, step_name, value)

Expand Down
5 changes: 5 additions & 0 deletions python/state_machine_operator/tracker/kubernetes/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def submit_minicluster_job(self, step, jobid):
"workingDir": step.workdir,
"name": container_name,
"pullAlways": pull_always,
"launcher": True,
"volumes": {
step.name: {
"configMapName": step.name,
Expand Down Expand Up @@ -570,6 +571,7 @@ def create_step(self, jobid):
cores_per_task=self.ncores,
gpus=self.ngpus,
workdir=workdir,
tasks=self.tasks,
)

if "script" in self.job_desc:
Expand All @@ -584,6 +586,9 @@ def create_step(self, jobid):
"push": self.push_to,
"registry": self.registry_host,
"plain_http": self.registry_plain_http,
"nodes": step.nodes,
"cores_per_task": self.ncores,
"tasks": step.tasks,
}
step.script = Template(self.job_desc["script"]).render(**kwargs)

Expand Down
6 changes: 6 additions & 0 deletions python/state_machine_operator/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def type(self):
def nnodes(self):
return int(self.config.get("nnodes", 1))

@property
def tasks(self):
tasks = self.config.get("tasks")
if tasks is not None:
return int(tasks)

@property
def ncores(self):
return int(self.config.get("cores_per_task", 1))
Expand Down
1 change: 1 addition & 0 deletions python/state_machine_operator/tracker/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class JobSetup:
nodes: int
cores_per_task: int
script: str = None
tasks: int = None
walltime: str = None
gpus: int = 0
workdir: str = None