Skip to content

Commit ca6de8c

Browse files
committed
feat: allow multiple node jobs
There is a bug in the kubernetes tracker that we treat the failed/succeeded as boolean (0/1) when it is actually a count of indices. We have not done experiments with >1 nodes so this has not been an issue (or caught). This change will fix it. Signed-off-by: vsoch <[email protected]>
1 parent bdc85ae commit ca6de8c

File tree

7 files changed

+95
-25
lines changed

7 files changed

+95
-25
lines changed

examples/test/multiple-pods.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
apiVersion: state-machine.converged-computing.org/v1alpha1
2+
kind: StateMachine
3+
metadata:
4+
name: state-machine
5+
spec:
6+
manager:
7+
pullPolicy: Never
8+
interactive: true
9+
workflow:
10+
completed: 2
11+
cluster:
12+
maxSize: 2
13+
14+
jobs:
15+
- name: job_a
16+
properties:
17+
save-path: "/opt"
18+
config:
19+
nodes: 1
20+
coresPerTask: 1
21+
image: rockylinux:9
22+
script: echo This is the first
23+
24+
- name: job_b
25+
properties:
26+
save-path: "/opt"
27+
config:
28+
nodes: 4
29+
coresPerTask: 1
30+
image: rockylinux:9
31+
script: echo This is the second
32+
33+
- name: job_c
34+
config:
35+
nodes: 1
36+
coresPerTask: 1
37+
properties:
38+
save-path: "/opt"
39+
image: rockylinux:9
40+
script: echo This is the third

examples/test/save-logs.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ metadata:
55
spec:
66
manager:
77
pullPolicy: Never
8-
8+
99
workflow:
1010
completed: 2
1111
cluster:
@@ -20,7 +20,7 @@ spec:
2020
coresPerTask: 1
2121
image: rockylinux:9
2222
script: echo This is the first
23-
23+
2424
- name: job_b
2525
properties:
2626
save-path: "/opt"
@@ -29,7 +29,7 @@ spec:
2929
coresPerTask: 1
3030
image: rockylinux:9
3131
script: echo This is the second
32-
32+
3333
- name: job_c
3434
config:
3535
nodes: 1

python/state_machine_operator/manager/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ def get_parser():
6060
"--config-dir",
6161
help="Directory with configuration files.",
6262
)
63+
start.add_argument(
64+
"--quiet",
65+
help="Don't print progress",
66+
default=False,
67+
action="store_true",
68+
)
6369
start.add_argument(
6470
"--plain-http",
6571
help="Use plain http for the registry.",
@@ -108,6 +114,7 @@ def help(return_code=0):
108114
# Will overwrite what is set in config
109115
workdir=args.workdir,
110116
plain_http=args.plain_http,
117+
quiet=args.quiet,
111118
)
112119
manager.start()
113120

python/state_machine_operator/manager/manager.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
workdir=None,
3232
registry=None,
3333
plain_http=False,
34+
quiet=False,
3435
):
3536
"""
3637
Initialize the WorkflowManager. Much of this logic used to be in setup,
@@ -47,6 +48,7 @@ def __init__(
4748

4849
# Working directory = first preference to command line
4950
self.workflow.set_workdir(workdir)
51+
self.quiet = quiet
5052

5153
# Running modes (we only allow kubernetes for now)
5254
LOGGER.info(f" Job Prefix: [{self.prefix}]")
@@ -137,7 +139,7 @@ def get_current_state(self):
137139
jobs = self.list_jobs_by_status()
138140

139141
# Give a warning about unknown jobs
140-
# In practice, I don't know why this would happen.
142+
# In practice, this is a state not properly accounted for
141143
if jobs["unknown"]:
142144
LOGGER.warning(f"Found {len(jobs['unknown'])} unknown jobs to investigate.")
143145

@@ -265,6 +267,9 @@ def check_complete(self):
265267
self.watcher.save(self.save_dir)
266268

267269
self.save_times()
270+
271+
# For extra files to write
272+
time.sleep(5)
268273
sys.exit(0)
269274

270275
@property
@@ -313,19 +318,23 @@ def new_jobs(self):
313318
# submit_n negative would be OK, a 0-> negative range is empty
314319
submit_n = max(submit_n, 0)
315320

316-
LOGGER.info(f"\n> 🌀 Starting step {step['name']}")
317-
LOGGER.info("> Workflow needs")
318-
LOGGER.info(f" > total completions {self.workflow.completions_needed} ")
319-
LOGGER.info(f" > max nodes allowed use {self.workflow.max_size}\n")
320-
LOGGER.info("> Current state")
321-
LOGGER.info(f" > nodes / step {nodes_needed} ")
322-
LOGGER.info(f" > jobs needed {jobs_needed} ")
323-
LOGGER.info(f" > nodes allowed {nodes_allowed} ")
324-
LOGGER.info(f" > jobs allowed {jobs_allowed}\n")
325-
LOGGER.info("> Workflow progress")
326-
LOGGER.info(f" > Completions {completions}")
327-
LOGGER.info(f" > In progress {active_jobs}")
328-
LOGGER.info(f" > New job sequences submit {submit_n} ")
321+
logfn = LOGGER.info
322+
if self.quiet:
323+
logfn = LOGGER.debug
324+
325+
logfn(f"\n> 🌀 Starting step {step['name']}")
326+
logfn("> Workflow needs")
327+
logfn(f" > total completions {self.workflow.completions_needed} ")
328+
logfn(f" > max nodes allowed use {self.workflow.max_size}\n")
329+
logfn("> Current state")
330+
logfn(f" > nodes / step {nodes_needed} ")
331+
logfn(f" > jobs needed {jobs_needed} ")
332+
logfn(f" > nodes allowed {nodes_allowed} ")
333+
logfn(f" > jobs allowed {jobs_allowed}\n")
334+
logfn("> Workflow progress")
335+
logfn(f" > Completions {completions}")
336+
logfn(f" > In progress {active_jobs}")
337+
logfn(f" > New job sequences submit {submit_n} ")
329338

330339
# If submit is > than completions needed, we don't need that many
331340
# TODO we would also downscale the cluster here
@@ -416,7 +425,6 @@ def watch(self):
416425
Watch is an event driven means to watch for changes and update job states
417426
accordingly.
418427
"""
419-
# TODO we should have some kind of check that does not rely on an event
420428
for job in self.tracker.stream_events():
421429

422430
# Not a job associated with the workflow, or is ignored

python/state_machine_operator/tracker/kubernetes/job.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@ def is_completed(self):
4040

4141
def is_failed(self):
4242
"""
43-
Determine if a job is failed
43+
Determine if a job is failed.
4444
"""
45-
return self.job.status.failed == 1
45+
return not self.is_succeeded()
4646

4747
def is_succeeded(self):
4848
"""
4949
Determine if a job has succeeded
50+
We need to have a completion time and no failed indices.
5051
"""
51-
return self.job.status.succeeded == 1
52+
return self.is_completed and not self.job.status.failed
5253

5354

5455
def get_namespace():

python/state_machine_operator/tracker/kubernetes/state.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,23 +58,33 @@ def list_jobs_by_status(label_name="app", label_value=None):
5858
states = {"success": [], "failed": [], "running": [], "queued": [], "unknown": []}
5959

6060
for job in jobs:
61+
62+
# These are *counts* of job indices, not boolean 0/1
63+
succeeded = job.status.succeeded
64+
failed = job.status.failed
65+
active = job.status.active
66+
not_active = active in [0, None]
67+
68+
# This is a completion time for the job
69+
completion_time = job.status.completion_time
70+
6171
# Success means we finished with succeeded condition
62-
if job.status.succeeded == 1 and job.status.completion_time is not None:
72+
if succeeded is not None and succeeded > 0 and completion_time is not None:
6373
states["success"].append(Job(job))
6474
continue
6575

6676
# Failure means we finished with failed condition
67-
if job.status.failed == 1:
77+
if failed is not None and failed > 0:
6878
states["failed"].append(Job(job))
6979
continue
7080

7181
# Not active, and not finished is queued
72-
if not job.status.active and not job.status.completion_time:
82+
if not_active and not completion_time:
7383
states["queued"].append(Job(job))
7484
continue
7585

7686
# Active, and not finished is running
77-
if job.status.active == 1 and not job.status.completion_time:
87+
if active and not completion_time:
7888
states["running"].append(Job(job))
7989
continue
8090

python/state_machine_operator/tracker/kubernetes/tracker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ def save_log(self, job=None):
349349

350350
# We might have one pod, but can't assume
351351
for i, pod in enumerate(pods):
352+
print(f"Saving log for {pod.metadata.name}")
352353
try:
353354
logs = api.read_namespaced_pod_log(
354355
name=pod.metadata.name,
@@ -363,7 +364,10 @@ def save_log(self, job=None):
363364
)
364365
# Don't write twice
365366
if not os.path.exists(log_file):
367+
print(f"Saving log file {log_file}")
366368
utils.write_file(logs, log_file)
369+
else:
370+
print(f"Log file {log_file} already exists")
367371

368372
except client.exceptions.ApiException as e:
369373
print(f"Error getting logs: {e}")

0 commit comments

Comments
 (0)