Skip to content

Commit 8db182f

Browse files
committed
feat: support for custom metrics
In this example, the user is allowed to provide a custom script that will be used against the log, and it needs to return a dictionary of values (the custom metrics). These are passed back to the manager from the state machine step and can influence workflow behavior (e.g., stop early, grow, or shrink. Signed-off-by: vsoch <[email protected]>
1 parent 84ff173 commit 8db182f

File tree

9 files changed

+87
-70
lines changed

9 files changed

+87
-70
lines changed

examples/test/lammps-custom-metrics.yaml

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,17 @@ metadata:
55
spec:
66
manager:
77
pullPolicy: Never
8-
interactive: true
98
workflow:
109
completed: 10
11-
1210
events:
13-
# Custom metric - derived from parsing lammps log
11+
# Custom metric - derived from parsing lammps log
12+
# Max decreases down to 1 (default), with 3 breaks in between, and 3 times
1413
- metric: mean.lammps.lammps-walltime
15-
when: ">= 12"
16-
action: grow
17-
# Size 4 cluster, max decreases down to 1
14+
when: "<= 10"
15+
action: shrink
1816
repetitions: 3
19-
# Require checks between before doing again
20-
# TODO check if this is working correctly.
21-
# also check the .get()
2217
backoff: 3
23-
maxSize: 10
18+
minSize: 1
2419

2520
cluster:
2621
maxSize: 2

python/state_machine_operator/config/config.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import importlib
21
import os
3-
import shutil
42
import sys
53

64
import jsonschema
@@ -179,31 +177,8 @@ def load_jobs(self):
179177
job["config"]["name"] = job["name"]
180178

181179
# Parse custom event functions on job
182-
self.add_custom_events(job)
183180
self.jobs[job["name"]] = job
184181

185-
def add_custom_events(self, job):
186-
"""
187-
Add (parse) custom job events.
188-
"""
189-
tmpdir = utils.get_tmpdir()
190-
script_path = os.path.join(tmpdir, job["name"] + ".py")
191-
192-
# Parse custom job functions.
193-
event = job.get("events") or {}
194-
script = event.get("script")
195-
if not script:
196-
return
197-
198-
utils.write_file(script, script_path)
199-
200-
# module will have custom functions
201-
spec = importlib.util.spec_from_file_location(job["name"], script_path)
202-
module = importlib.util.module_from_spec(spec)
203-
spec.loader.exec_module(module)
204-
job["events"]["module"] = module
205-
shutil.rmtree(tmpdir)
206-
207182
def validate(self):
208183
jsonschema.validate(self.cfg, schema=schema.state_machine_config_schema)
209184

python/state_machine_operator/machine/machine.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,15 @@ def on_change(self):
157157
tracker = self.trackers[self.current_state.id]
158158
tracker.submit_job(self.jobid)
159159

160+
160161
def metrics(self):
161162
"""
162163
Yield (pop) current metrics.
163164
"""
164-
tracker = self.trackers[self.current_state.id]
165-
while tracker.metrics:
166-
yield tracker.metrics.pop(0)
167-
165+
for step_name, step in self.trackers.items():
166+
while step.metrics:
167+
yield step.metrics.pop(0)
168+
168169

169170
def cleanup(self):
170171
"""

python/state_machine_operator/manager/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -553,12 +553,12 @@ def load_custom_metrics(self, state_machine):
553553
"""
554554
# This takes the current step
555555
# {"job_name": job_name, "step_name": pod.metadata.labels["app"], "metrics": events}
556-
for message in state_machine.metrics():
556+
for m in state_machine.metrics():
557+
print(f"Loading custom metric {m}")
557558
try:
558-
m = json.loads(message)
559559
self.metrics.add_custom_metric(m["metrics"], m["job_name"], m["step_name"])
560560
except Exception as e:
561-
print(f"Issue parsing custom metric {message}: {e}")
561+
print(f"Issue parsing custom metric {m}: {e}")
562562

563563
def add_timestamp_first_seen(self, label):
564564
"""

python/state_machine_operator/tracker/kubernetes/event.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from kubernetes import client, config, watch
99

10-
import state_machine_operator.defaults as defaults
1110
import state_machine_operator.utils as utils
1211

1312
from .job import Job
@@ -29,11 +28,6 @@ def stream_events():
2928
for event in w.stream(batch_v1.list_namespaced_job, namespace=get_namespace()):
3029
job = event["object"]
3130
event = Job(job)
32-
33-
# This is a job updated with annotated metrics to inform the manager
34-
if job.metadata.annotations and defaults.metrics_key in job.metadata.annotations:
35-
event._is_event = True
36-
3731
yield event
3832

3933

@@ -67,7 +61,7 @@ def prepare_watchers(self):
6761
"""
6862
Prepare watchers for pods and nodes.
6963
"""
70-
for function in ["watch_nodes", "receive_metrics"]:
64+
for function in ["watch_nodes"]:
7165
thread = threading.Thread(target=getattr(self, function))
7266
thread.daemon = True
7367
self.threads[function] = thread
@@ -158,6 +152,7 @@ def receive_metrics(self):
158152
for event in w.stream(v1.list_namespaced_event, namespace=get_namespace()):
159153
e = event["object"]
160154
if e.reason == "CustomMetric":
155+
print(f"Found custom metric event {e.message}")
161156
# The message has the metric, the job name, and step name
162157
self.metrics.append(e.message)
163158

python/state_machine_operator/tracker/kubernetes/job.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import os
2-
31
import state_machine_operator.defaults as defaults
42
import state_machine_operator.utils as utils
53
from state_machine_operator.tracker.job import BaseJob

python/state_machine_operator/tracker/kubernetes/tracker.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
from jinja2 import Template
88
from kubernetes import client, config
9-
from .utils import get_manager_pod
109

1110
import state_machine_operator.defaults as defaults
1211
import state_machine_operator.utils as utils
@@ -403,13 +402,49 @@ def get_metric_events(self, pod, log):
403402
except Exception as e:
404403
print(f"Error parsing custom metric for {pod.metadata.name}: {e}")
405404
return
406-
405+
407406
# Metadata about the Job to associate to
408407
job_name = pod.metadata.labels["job-name"]
409408

410409
# In the message we send the metrics, job name, and step name
411410
return {"job_name": job_name, "step_name": self.job_desc["name"], "metrics": events}
412411

412+
def send_kubernetes_event(self, pod, metrics):
413+
"""
414+
Send a kubernetes event instead of metrics via the state machine.
415+
416+
This function is currently not used.
417+
"""
418+
v1 = client.CoreV1Api()
419+
now = datetime.datetime.utcnow()
420+
job_name = pod.metadata.labels["job-name"]
421+
job_uid = pod.metadata.labels["controller-uid"]
422+
423+
event = client.CoreV1Event(
424+
metadata=client.V1ObjectMeta(
425+
generate_name=job_name,
426+
),
427+
involved_object=client.V1ObjectReference(
428+
kind="Job",
429+
api_version="batch/v1",
430+
namespace=pod.metadata.namespace,
431+
name=job_name,
432+
uid=job_uid,
433+
),
434+
reason="CustomMetric",
435+
message=json.dumps(metrics),
436+
first_timestamp=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
437+
last_timestamp=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
438+
type="Normal",
439+
source=client.V1EventSource(component="state-machine"),
440+
)
441+
442+
try:
443+
v1.create_namespaced_event(pod.metadata.namespace, event)
444+
except Exception as e:
445+
print(f"Error creating event: {e}")
446+
return metrics
447+
413448
def cancel_jobs(self, joblist):
414449
"""
415450
For the given job list, cancel each job. This is not currently use,
@@ -466,7 +501,7 @@ def save_log(self, job=None):
466501
Save a log identifier for a finished pod (job)
467502
"""
468503
# No job, no purpose to save
469-
if not Job:
504+
if not job:
470505
return
471506
api = client.CoreV1Api()
472507

@@ -486,7 +521,6 @@ def save_log(self, job=None):
486521
# For metrics, we assume logs coming from main (index 0) pod
487522
# This can change if needed
488523
for i, pod in enumerate(pods):
489-
print(f"Saving log for {pod.metadata.name}")
490524
try:
491525
log = api.read_namespaced_pod_log(
492526
name=pod.metadata.name,
@@ -495,6 +529,8 @@ def save_log(self, job=None):
495529
timestamps=True,
496530
)
497531
if i == 0:
532+
print(f"Saving log for {pod.metadata.name}")
533+
498534
# We assume lead pod (index 0) is of interest
499535
metrics = self.adapter.get_metric_events(pod, log)
500536
if metrics:

python/state_machine_operator/tracker/kubernetes/utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import os
12

23
from kubernetes import client
3-
import os
4+
45

56
def get_namespace():
67
"""
@@ -10,7 +11,8 @@ def get_namespace():
1011
if os.path.exists(ns_path):
1112
with open(ns_path) as f:
1213
return f.read().strip()
13-
14+
15+
1416
def get_manager_pod():
1517
"""
1618
Get the currently running manager pod.
@@ -22,4 +24,3 @@ def get_manager_pod():
2224
pods = v1.list_namespaced_pod(namespace=get_namespace(), label_selector=label_selector)
2325
assert len(pods.items) == 1
2426
return pods.items[0]
25-

python/state_machine_operator/tracker/tracker.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import importlib
12
import json
23
import logging
34
import os
5+
import shutil
46

57
import state_machine_operator.utils as utils
68
from state_machine_operator.tracker.types import SubmissionCode
@@ -16,15 +18,12 @@ class Job:
1618
"""
1719

1820
def __init__(self, job_desc, workflow, **kwargs):
21+
# The copy is important since we modify the dict
1922
self.job_desc = job_desc
2023
self.workflow = workflow
2124

22-
# Remove the module from the job descr
23-
# TODO check if this was deleted and only working once?
24-
self.module = None
25-
if "module" in self.job_desc.get("events", {}):
26-
self.module = self.job_desc["events"]["module"]
27-
del self.job_desc["events"]["module"]
25+
# Load custom module
26+
self.load_custom_events()
2827

2928
# Allow for arbitrary extra key value arguments
3029
for key, value in kwargs.items():
@@ -45,6 +44,29 @@ def extra_environment(self):
4544
environ.append({"name": key, "value": value})
4645
return environ
4746

47+
def load_custom_events(self):
48+
"""
49+
Add (parse) custom job events.
50+
"""
51+
tmpdir = utils.get_tmpdir()
52+
script_path = os.path.join(tmpdir, self.job_desc["name"] + ".py")
53+
54+
# Parse custom job functions.
55+
event = self.job_desc.get("events") or {}
56+
script = event.get("script")
57+
self.module = None
58+
if not script:
59+
return
60+
61+
utils.write_file(script, script_path)
62+
63+
# module will have custom functions
64+
spec = importlib.util.spec_from_file_location(self.job_desc["name"], script_path)
65+
module = importlib.util.module_from_spec(spec)
66+
spec.loader.exec_module(module)
67+
self.module = module
68+
shutil.rmtree(tmpdir)
69+
4870
@property
4971
def properties(self):
5072
"""
@@ -80,12 +102,6 @@ def __init__(self, job_name, workflow):
80102
# We retrieve custom metrics from the log and deliver to the manager
81103
self.metrics = []
82104

83-
# TODO this envrionment variable has the max nodes we will allow to autoscale to
84-
# We can use this later...
85-
self.max_nodes_autoscale = (
86-
os.environ.get("STATE_MACHINE_MAX_NODES", self.total_nodes) or self.total_nodes
87-
)
88-
89105
@property
90106
def total_nodes(self):
91107
return self.workflow.get("cluster", {}).get("max_nodes") or 1

0 commit comments

Comments
 (0)