Skip to content

Commit 828f096

Browse files
committed
feat: shrink with flux minicluster example working.
Signed-off-by: vsoch <[email protected]>
1 parent bd2fd0e commit 828f096

File tree

10 files changed

+135
-32
lines changed

10 files changed

+135
-32
lines changed

api/v1alpha1/statemachine_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ type WorkflowEvent struct {
8888
// Backoff and repetitions to respond to event
8989
Backoff int32 `json:"backoff,omitempty"`
9090
Repetitions int32 `json:"repetitions,omitempty"`
91+
MinCompletions int32 `json:"minCompletions,omitempty"`
92+
MaxSize int32 `json:"maxSize,omitempty"`
93+
MinSize int32 `json:"minSize,omitempty"`
9194
}
9295

9396
// A JobSequence is a list of JobSteps

config/crd/bases/state-machine.converged-computing.org_statemachines.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,19 @@ spec:
250250
description: Backoff and repetitions to respond to event
251251
format: int32
252252
type: integer
253+
maxSize:
254+
format: int32
255+
type: integer
253256
metric:
254257
description: Name of metric, indexed into model lookup (e.g.,
255258
count.job_b.failed)
256259
type: string
260+
minCompletions:
261+
format: int32
262+
type: integer
263+
minSize:
264+
format: int32
265+
type: integer
257266
repetitions:
258267
format: int32
259268
type: integer

examples/dist/state-machine-operator-dev.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,19 @@ spec:
258258
description: Backoff and repetitions to respond to event
259259
format: int32
260260
type: integer
261+
maxSize:
262+
format: int32
263+
type: integer
261264
metric:
262265
description: Name of metric, indexed into model lookup (e.g.,
263266
count.job_b.failed)
264267
type: string
268+
minCompletions:
269+
format: int32
270+
type: integer
271+
minSize:
272+
format: int32
273+
type: integer
265274
repetitions:
266275
format: int32
267276
type: integer

examples/test/flux-minicluster.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ spec:
77
pullPolicy: Never
88
workflow:
99
completed: 10
10+
events:
11+
# When more than half of job b's fail, finish the entire workflow
12+
- metric: mean.job_b.duration
13+
when: "<= 60"
14+
action: shrink
15+
# Size 4 cluster, max decreases down to 1
16+
repetitions: 3
17+
# Require checks between before doing again
18+
# TODO check if this is working correctly.
19+
# also check the .get()
20+
backoff: 3
1021

1122
cluster:
1223
maxSize: 2
@@ -22,7 +33,7 @@ spec:
2233
# Note that this step always fails and we never make it to C
2334
# This should end the workflow early
2435
- name: job_b
25-
properties:
36+
properties:
2637
minicluster: "yes"
2738
config:
2839
nodes: 4

internal/controller/manager/templates/entrypoint.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ workflow:
1212
events: {{ range .Spec.Workflow.Events }}
1313
- action: {{ .Action }}
1414
when: "{{ if .When }}{{ .When }}{{ else }}> 0{{ end }}"
15+
minCompletions: {{ if .MinCompletions }}{{ .MinCompetions }}{{ else }}1{{ end }}
16+
minSize: {{ if .MinSize }}{{ .MinSize }}{{ else }}1{{ end }}
17+
maxSize: {{ if .MaxSize }}{{ .MaxSize }}{{ else }}null{{ end }}
18+
repetitions: {{ if .Repetitions }}{{ .Repetitions }}{{ else }}1{{ end }}
1519
metric: {{ .Metric }}
20+
backoff: {{ if .Backoff }}{{ .Backoff }}{{ else }}null{{ end }}
1621
{{ end }}{{ end }}
1722
cluster:
1823
max_size: {{ .Spec.Cluster.MaxSize }}

python/state_machine_operator/config/types.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def should_trigger(self, value):
2323
if self.when is None:
2424
return True
2525

26+
# This ensures we check backoff, etc.
27+
if not self.action.perform():
28+
return False
29+
2630
# If we have a direct value, we check for equality
2731
number = (int, float)
2832
if isinstance(self.when, number) and value != self.when:
@@ -156,6 +160,18 @@ def perform_backoff(self):
156160
def name(self):
157161
return self._action["action"]
158162

163+
@property
164+
def min_completions(self):
165+
return self._action.get("minCompletions")
166+
167+
@property
168+
def max_size(self):
169+
return self._action.get("maxSize")
170+
171+
@property
172+
def min_size(self):
173+
return self._action.get("minSize")
174+
159175
@property
160176
def metric(self):
161177
return self._action["metric"]

python/state_machine_operator/manager/manager.py

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -469,18 +469,66 @@ def check_metrics(self, job):
469469

470470
# And check against the condition
471471
if trigger.should_trigger(value):
472-
self.trigger_workflow_action(trigger.action.name)
472+
self.trigger_workflow_action(trigger, step_name, value)
473473

474-
def trigger_workflow_action(self, action_name):
474+
def trigger_grow(self, trigger, step_name, value):
475+
"""
476+
Trigger the job to grow
477+
"""
478+
previous = self.workflow.jobs[step_name]["config"]["nnodes"]
479+
max_size = trigger.action.max_size
480+
if max_size >= previous + 1:
481+
LOGGER.info(
482+
f"Grow triggered: {trigger.action.metric} {trigger.when} ({value}), already >= max size {max_size}"
483+
)
484+
return
485+
486+
self.workflow.jobs[step_name]["config"]["nnodes"] += 1
487+
updated = self.workflow.jobs[step_name]["config"]["nnodes"]
488+
LOGGER.info(
489+
f"Grow triggered: {trigger.action.metric} {trigger.when} ({value}), nodes {previous}=>{updated}"
490+
)
491+
492+
def trigger_shrink(self, trigger, step_name, value):
493+
"""
494+
Trigger the job to shrink, down to a min size of 1
495+
"""
496+
previous = self.workflow.jobs[step_name]["config"]["nnodes"]
497+
min_size = trigger.action.min_size or 1
498+
if previous <= min_size:
499+
LOGGER.info(
500+
f"Shrink triggered: {trigger.action.metric} {trigger.when} ({value}), already at min size {min_size}"
501+
)
502+
return
503+
self.workflow.jobs[step_name]["config"]["nnodes"] -= 1
504+
updated = self.workflow.jobs[step_name]["config"]["nnodes"]
505+
LOGGER.info(
506+
f"Shrink triggered: {trigger.action.metric} {trigger.when} ({value}), nodes {previous}=>{updated}"
507+
)
508+
509+
def trigger_workflow_action(self, trigger, step_name, value):
475510
"""
476511
Given an action name, issue it for the workflow
477512
"""
478-
if action_name == "finish-workflow":
513+
# This action has a minimum number of total completions
514+
if trigger.action.min_completions:
515+
completions = len(self.get_current_state()["completed"])
516+
if completions < trigger.action.min_completions:
517+
return
518+
519+
# Check if we have enough completions
520+
if trigger.action.name == "finish-workflow":
479521
# TODO: add more detail in calling function to event trigger
480-
LOGGER.info("Workflow completion triggered, ending workflow.")
522+
LOGGER.info(
523+
f"Workflow completion triggered: {trigger.action.metric} {trigger.when} ({value})"
524+
)
481525
self.complete_workflow()
482526

483-
# TODO add support for grow, shrink
527+
if trigger.action.name == "grow":
528+
self.trigger_grow(trigger, step_name, value)
529+
530+
if trigger.action.name == "shrink":
531+
self.trigger_shrink(trigger, step_name, value)
484532

485533
def update_metrics(self, job):
486534
"""
@@ -495,7 +543,7 @@ def update_metrics(self, job):
495543
# Add job duration if supported by tracker
496544
duration = job.duration()
497545
if job.is_completed() and duration is not None:
498-
self.metrics.add_model_entry("duration", duration)
546+
self.metrics.add_model_entry("duration", duration, step=job.step_name)
499547

500548
def add_timestamp_first_seen(self, label):
501549
"""

python/state_machine_operator/manager/metrics.py

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from river import stats
1+
import json
22

3-
import state_machine_operator.utils as utils
3+
from river import stats
44

55
model_inits = {
66
"variance": stats.Var,
@@ -28,29 +28,21 @@ def __init__(self):
2828
# Counters are separate
2929
self.models["count"] = {}
3030

31-
# Cache for all group model keys
32-
self.keys = set()
33-
3431
def summarize_all(self):
3532
"""
3633
Summarize all models
3734
"""
38-
for key in self.keys:
39-
self.summary(key)
40-
41-
def summary(self, key):
42-
"""
43-
Summarize currently known models under a key
44-
"""
45-
print(f"🌊 Streaming ML Model Summary {key}: ", end="")
35+
print("🌊 Streaming ML Model Summary: ", end="")
4636
items = {}
47-
for model_name in model_inits:
48-
models = self.models[model_name]
49-
if key not in models:
50-
continue
51-
model = models[key]
52-
items[model_name] = model.get()
53-
print(utils.pretty_print_list(items))
37+
for model_name, models in self.models.items():
38+
if model_name not in items:
39+
items[model_name] = {}
40+
for step_name, keys in models.items():
41+
if step_name not in items[model_name]:
42+
items[model_name][step_name] = {}
43+
for key, model in keys.items():
44+
items[model_name][step_name] = round(model.get(), 3)
45+
print(json.dumps(items))
5446
return items
5547

5648
def increment_counter(self, key, step=None):
@@ -68,20 +60,22 @@ def increment_counter(self, key, step=None):
6860
self.models["count"][step][key] = stats.Count()
6961
self.models["count"][step][key].update()
7062

71-
def add_model_entry(self, key, value, model_name=None):
63+
def add_model_entry(self, key, value, step=None, model_name=None):
7264
"""
7365
Record a datum for one or more models.
7466
7567
If model_name is not set, add to add models.
7668
"""
77-
self.keys.add(key)
69+
step = step or "global"
7870

7971
# This should be all models except for counts
8072
model_names = list(model_inits)
8173
if model_name is not None:
8274
model_names = [model_name]
8375

8476
for model_name in model_names:
85-
if key not in self.models[model_name]:
86-
self.models[model_name][key] = model_inits[model_name]()
87-
self.models[model_name][key].update(value)
77+
if step not in self.models[model_name]:
78+
self.models[model_name][step] = {}
79+
if key not in self.models[model_name][step]:
80+
self.models[model_name][step][key] = model_inits[model_name]()
81+
self.models[model_name][step][key].update(value)

python/state_machine_operator/schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
"action": {"type": "string"},
3131
"when": {"type": "string"},
3232
"metric": {"type": "string"},
33+
"minCompletions": {"type": ["number", "null"]},
34+
"maxSize": {"type": ["number", "null"]},
35+
"minSize": {"type": ["number", "null"]},
36+
"repetitions": {"type": ["number", "null"]},
37+
"backoff": {"type": ["number", "null"]},
3338
},
3439
},
3540
},

python/state_machine_operator/tracker/kubernetes/tracker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,9 @@ def submit_minicluster_job(self, step, jobid):
342342
"maxSize": step.nodes + 100,
343343
"size": step.nodes,
344344
"tasks": step.cores_per_task,
345+
"network": {
346+
"headlessName": step.name,
347+
},
345348
}
346349
if walltime:
347350
spec["deadlineSeconds"] = int(walltime)

0 commit comments

Comments
 (0)