Skip to content

Commit 66c0ce7

Browse files
authored
Merge pull request #6881 from grondo/issue#2414
add `flux multi-prog` for MPMD support
2 parents 984102b + 5e7ca0e commit 66c0ce7

File tree

8 files changed

+401
-0
lines changed

8 files changed

+401
-0
lines changed

doc/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ MAN1_FILES_PRIMARY = \
3939
man1/flux-bulksubmit.1 \
4040
man1/flux-alloc.1 \
4141
man1/flux-batch.1 \
42+
man1/flux-multi-prog.1 \
4243
man1/flux-job.1 \
4344
man1/flux-version.1 \
4445
man1/flux-jobs.1 \

doc/man1/flux-multi-prog.rst

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
==================
2+
flux-multi-prog(1)
3+
==================
4+
5+
6+
SYNOPSIS
7+
========
8+
9+
**flux** **multi-prog** [OPTIONS] CONFIG
10+
11+
DESCRIPTION
12+
===========
13+
14+
.. program:: flux multi-prog
15+
16+
:program:`flux multi-prog` allows a parallel job to run a different
17+
executable and arguments for each task, also known as multiple program,
18+
multiple data (MPMD). It is used with :man1:`flux-run` or :man1:`flux-submit`
19+
in place of the parallel command and args like::
20+
21+
flux run -N4 flux multi-prog myapp.conf
22+
23+
The configuration file format is described in the :ref:`CONFIGURATION`
24+
section below.
25+
26+
OPTIONS
27+
=======
28+
29+
.. option:: -n, --dry-run=RANKS
30+
31+
Do not run anything, instead print what would be run on *RANKS* specified
32+
as an idset of task ranks. This option is useful for testing a config file
33+
and should not be used with :man1:`flux-run` or :man1:`flux-submit`.
34+
35+
CONFIGURATION
36+
=============
37+
38+
The :program:`flux multi-prog` configuration file defines the executable
39+
and arguments to be run for each task rank in a Flux job. Each non-empty
40+
line specifies a set of task ranks and the corresponding command to execute.
41+
42+
LINE FORMAT
43+
^^^^^^^^^^^
44+
Each line must begin with an RFC 22-compliant task idset, indicating the
45+
ranks to which the command applies. Alternatively, the special wildcard ``*``
46+
may be used to match any task rank not explicitly handled by other lines.
47+
48+
The task idset is followed by the executable and its arguments. For example:
49+
50+
::
51+
52+
0-1 myapp arg1 arg2
53+
2-3 myapp arg3 arg4
54+
55+
In the above example:
56+
57+
- Tasks 0 and 1 will execute :command:`myapp arg1 arg2`
58+
- Tasks 2 and 3 will execute :command:`myapp arg3 arg4`
59+
60+
.. note::
61+
62+
Each task rank must match at most one line.
63+
64+
If no matching line is found for a task, and * is not present,
65+
the task will have no command assigned and will fail to launch.
66+
67+
68+
Lines are parsed using Python's ``shlex`` module, which supports shell-like
69+
quoting and comments. For example:
70+
::
71+
72+
# this line is a comment
73+
0-1 myapp "quoted arg" arg2 # Inline comment
74+
75+
SUBSTITUTIONS
76+
^^^^^^^^^^^^^
77+
Two special tokens may be used within the command and argument strings:
78+
79+
**%t**
80+
Replaced with the task's global rank (task ID)
81+
82+
**%o**
83+
Replaced with the task's offset within the specified idset
84+
85+
For example:
86+
87+
::
88+
89+
0-1 echo task %t
90+
2-3 echo task %t offset %o
91+
92+
Would produce the following output:
93+
94+
::
95+
96+
0: task 0
97+
1: task 1
98+
2: task 2 offset 0
99+
3: task 3 offset 1
100+
101+
RESOURCES
102+
=========
103+
104+
.. include:: common/resources.rst
105+
106+
SEE ALSO
107+
========
108+
109+
:man1:`flux-run`, :man1:`flux-submit`

doc/manpages.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
('man1/flux-bulksubmit', 'flux-bulksubmit', 'submit jobs in bulk to a Flux instance', [author], 1),
6060
('man1/flux-alloc', 'flux-alloc', 'allocate a new Flux instance for interactive use', [author], 1),
6161
('man1/flux-batch', 'flux-batch', 'submit a batch script to Flux', [author], 1),
62+
('man1/flux-multi-prog', 'flux-multi-prog', 'run a parallel program with a different executable and arguments for each task', [author], 1),
6263
('man1/flux-job', 'flux-job', 'Job Housekeeping Tool', [author], 1),
6364
('man1/flux-module', 'flux-module', 'manage Flux extension modules', [author], 1),
6465
('man1/flux-overlay', 'flux-overlay', 'Show flux overlay network status', [author], 1),

doc/test/spell.en.pws

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,3 +974,6 @@ DCB
974974
cancellability
975975
pthreads
976976
pthread
977+
MPMD
978+
prog
979+
shlex

src/cmd/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ dist_fluxcmd_SCRIPTS = \
105105
flux-run.py \
106106
flux-submit.py \
107107
flux-bulksubmit.py \
108+
flux-multi-prog.py \
108109
flux-jobs.py \
109110
flux-fortune.py \
110111
flux-resource.py \

src/cmd/flux-multi-prog.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
###############################################################
2+
# Copyright 2025 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
import argparse
12+
import logging
13+
import os
14+
import re
15+
import shlex
16+
import sys
17+
18+
import flux
19+
from flux.idset import IDset
20+
from flux.util import CLIMain
21+
22+
23+
class MultiProgLine:
24+
"""Class representing a single "multi-prog" config line"""
25+
26+
def __init__(self, value, lineno=-1):
27+
self.ranks = IDset()
28+
self.all = False
29+
self.args = []
30+
self.lineno = lineno
31+
lexer = shlex.shlex(value, posix=True, punctuation_chars=True)
32+
lexer.whitespace_split = True
33+
lexer.escapedquotes = "\"'"
34+
try:
35+
args = list(lexer)
36+
except ValueError as exc:
37+
raise ValueError(f"line {lineno}: '{value.rstrip()}': {exc}") from None
38+
if not args:
39+
return
40+
41+
targets = args.pop(0)
42+
if targets == "*":
43+
self.all = True
44+
else:
45+
try:
46+
self.ranks = IDset(targets)
47+
except ValueError:
48+
raise ValueError(f"line {lineno}: invalid idset: {targets}") from None
49+
50+
self.args = args
51+
52+
def get_args(self, rank):
53+
"""Return the arguments list with %t and %o substituted for `rank`"""
54+
55+
result = []
56+
index = 0
57+
if not self.all:
58+
index = self.ranks.expand().index(rank)
59+
sub = {"%t": str(rank), "%o": str(index)}
60+
for arg in self.args:
61+
result.append(re.sub(r"(%t)|(%o)", lambda x: sub[x.group(0)], arg))
62+
return result
63+
64+
def __bool__(self):
65+
return bool(self.args)
66+
67+
68+
class MultiProg:
69+
"""Class representing an entire "multi-prog" config file"""
70+
71+
def __init__(self, inputfile):
72+
self.fp = inputfile
73+
self.lines = []
74+
self.fallthru = None
75+
lineno = 0
76+
for line in self.fp:
77+
lineno += 1
78+
try:
79+
mpline = MultiProgLine(line, lineno)
80+
except ValueError as exc:
81+
raise ValueError(f"{self.fp.name}: {exc}") from None
82+
if mpline:
83+
if mpline.all:
84+
self.fallthru = mpline
85+
else:
86+
self.lines.append(mpline)
87+
88+
def find(self, rank):
89+
"""Return line matching 'rank' in the current config"""
90+
for line in self.lines:
91+
if rank in line.ranks:
92+
return line
93+
if self.fallthru is not None:
94+
return self.fallthru
95+
raise ValueError(f"{self.fp.name}: No matching line for rank {rank}")
96+
97+
def exec(self, rank, dry_run=False):
98+
"""Exec configured command line arguments for a task rank"""
99+
args = self.find(rank).get_args(rank)
100+
if dry_run:
101+
args = " ".join(shlex.quote(arg) for arg in args)
102+
print(f"{rank}: {args}")
103+
else:
104+
os.execvp(args[0], args)
105+
106+
107+
def parse_args():
108+
description = """
109+
Run a parallel program with a different executable and arguments for each task
110+
"""
111+
parser = argparse.ArgumentParser(
112+
prog="flux-multi-prog",
113+
usage="flux multi-prog [OPTIONS] CONFIG",
114+
description=description,
115+
formatter_class=flux.util.help_formatter(),
116+
)
117+
parser.add_argument(
118+
"-n",
119+
"--dry-run",
120+
type=IDset,
121+
metavar="IDS",
122+
help="Do not run anything. Instead, print what would be run for"
123+
+ " each rank in IDS",
124+
)
125+
parser.add_argument(
126+
"conf", metavar="CONFIG", type=str, help="multi-prog configuration file"
127+
)
128+
return parser.parse_args()
129+
130+
131+
LOGGER = logging.getLogger("flux-multi-prog")
132+
133+
134+
@CLIMain(LOGGER)
135+
def main():
136+
137+
sys.stdout = open(sys.stdout.fileno(), "w", encoding="utf8")
138+
139+
args = parse_args()
140+
141+
with open(args.conf) as infile:
142+
mp = MultiProg(infile)
143+
144+
if args.dry_run:
145+
for rank in args.dry_run:
146+
mp.exec(rank, dry_run=True)
147+
sys.exit(0)
148+
149+
try:
150+
rank = int(os.getenv("FLUX_TASK_RANK"))
151+
except TypeError:
152+
raise ValueError("FLUX_TASK_RANK not found or invalid")
153+
154+
mp.exec(rank)

t/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ TESTSCRIPTS = \
230230
t2713-python-cli-bulksubmit.t \
231231
t2715-python-cli-cancel.t \
232232
t2716-python-cli-batch-conf.t \
233+
t2720-python-cli-multi-prog.t \
233234
t2800-jobs-cmd.t \
234235
t2800-jobs-cmd-multiuser.t \
235236
t2800-jobs-recursive.t \

0 commit comments

Comments
 (0)