Skip to content

Commit 5fa41f6

Browse files
committed
move utils for running jobs for scoring to its own folder
1 parent aa1e995 commit 5fa41f6

File tree

6 files changed

+528
-0
lines changed

6 files changed

+528
-0
lines changed

scoring/utils/package_logs.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""Script to package logs from experiment directory.
2+
Example usage:
3+
4+
python3 package_logs.py --experiment_dir <experiment_dir> --destination_dir <destination_dir>
5+
"""
6+
import os
7+
import shutil
8+
9+
from absl import app
10+
from absl import flags
11+
12+
flags.DEFINE_string('experiment_dir', None, 'Path to experiment.')
13+
flags.DEFINE_string('destination_dir', None, 'Path to save submission logs')
14+
15+
FLAGS = flags.FLAGS
16+
17+
18+
def move_logs(experiment_dir, destination_dir):
19+
"""Copy files from experiment path to destination directory.
20+
Args:
21+
experiment_dir: Path to experiment dir.
22+
destination_dir: Path to destination dir.
23+
"""
24+
if not os.path.exists(experiment_dir):
25+
raise IOError(f'Directory does not exist {destination_dir}')
26+
27+
for root, dirnames, filenames in os.walk(experiment_dir):
28+
for filename in filenames:
29+
if 'checkpoint' not in filename:
30+
source_path = os.path.join(root, filename)
31+
relative_path = os.path.relpath(source_path, experiment_dir)
32+
destination_path = os.path.join(destination_dir, relative_path)
33+
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
34+
print(f'Moving {source_path} to {destination_path}')
35+
shutil.copy(source_path, destination_path)
36+
37+
38+
def main(_):
39+
flags.mark_flag_as_required('destination_dir')
40+
flags.mark_flag_as_required('experiment_dir')
41+
move_logs(FLAGS.experiment_dir, FLAGS.destination_dir)
42+
43+
44+
if __name__ == '__main__':
45+
app.run(main)

scoring/utils/run_workloads.py

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
"""
2+
Example Usage:
3+
python run_workloads.py --framework jax \
4+
--experiment_name my_first_experiment \
5+
--docker_image_url <url_for_docker_image> \
6+
--tag <some_docker_tag> \
7+
--run_percentage 10 \
8+
--submission_path <path_to_submission_py_file> \
9+
--tuning_search_space <path_to_tuning_search_space_json>
10+
"""
11+
12+
import datetime
13+
import json
14+
import os
15+
import struct
16+
import subprocess
17+
import time
18+
19+
from absl import app
20+
from absl import flags
21+
from absl import logging
22+
23+
from algoperf import random_utils as prng
24+
from algoperf.workloads.workloads import get_base_workload_name
25+
import docker
26+
27+
flags.DEFINE_string(
28+
'docker_image_url',
29+
'europe-west4-docker.pkg.dev/mlcommons-algoperf/algoperf-docker-repo/algoperf_jax_dev',
30+
'URL to docker image')
31+
flags.DEFINE_integer(
32+
'run_percentage',
33+
100,
34+
'Percentage of max num steps to run for.'
35+
'Must set the flag enable_step_budget to True for this to take effect.')
36+
flags.DEFINE_string('experiment_name',
37+
'my_experiment',
38+
'Name of top sub directory in experiment dir.')
39+
flags.DEFINE_boolean('rsync_data',
40+
True,
41+
'Whether or not to transfer the data from GCP w rsync.')
42+
flags.DEFINE_boolean('local', False, 'Mount local algorithmic-efficiency repo.')
43+
flags.DEFINE_string(
44+
'submission_path',
45+
'prize_qualification_baselines/external_tuning/jax_nadamw_full_budget.py',
46+
'Path to reference submission.')
47+
flags.DEFINE_string(
48+
'tuning_search_space',
49+
'prize_qualification_baselines/external_tuning/tuning_search_space.json',
50+
'Path to tuning search space.')
51+
flags.DEFINE_string('framework', 'jax', 'Can be either PyTorch or JAX.')
52+
flags.DEFINE_boolean(
53+
'dry_run',
54+
False,
55+
'Whether or not to actually run the docker containers. '
56+
'If False, simply print the docker run commands. ')
57+
flags.DEFINE_enum(
58+
'tuning_ruleset',
59+
'external',
60+
enum_values=['external', 'self'],
61+
help='Can be either external of self.')
62+
flags.DEFINE_integer('num_studies', 5, 'Number of studies to run')
63+
flags.DEFINE_integer('study_start_index', None, 'Start index for studies.')
64+
flags.DEFINE_integer('study_end_index', None, 'End index for studies.')
65+
flags.DEFINE_integer('num_tuning_trials', 5, 'Number of tuning trials.')
66+
flags.DEFINE_integer('hparam_start_index',
67+
None,
68+
'Start index for tuning trials.')
69+
flags.DEFINE_integer('hparam_end_index', None, 'End index for tuning trials.')
70+
flags.DEFINE_integer('seed', None, 'Random seed for evaluating a submission.')
71+
flags.DEFINE_integer('submission_id',
72+
0,
73+
'Submission ID to generate study and hparam seeds.')
74+
flags.DEFINE_string('held_out_workloads_config_path',
75+
None,
76+
'Path to config containing held-out workloads')
77+
flags.DEFINE_string(
78+
'workload_metadata_path',
79+
None,
80+
'Path to config containing dataset and maximum number of steps per workload.'
81+
'The default values of these are set to the full budgets as determined '
82+
'via the target-setting procedure. '
83+
'We provide workload_metadata_external_tuning.json and '
84+
'workload_metadata_self_tuning.json as references.'
85+
'Note that training will be interrupted at either the set maximum number '
86+
'of steps or the fixed workload maximum run time, whichever comes first. '
87+
'If your algorithm has a smaller per step time than our baselines '
88+
'you may want to increase the number of steps per workload.')
89+
flags.DEFINE_string(
90+
'workloads',
91+
None,
92+
'String representing a comma separated list of workload names.'
93+
'If not None, only run this workload, else run all workloads in workload_metadata_path.'
94+
)
95+
flags.DEFINE_string('additional_requirements_path',
96+
None,
97+
'Path to requirements.txt if any.')
98+
flags.DEFINE_integer(
99+
'max_steps',
100+
None,
101+
'Maximum number of steps to run. Must set flag enable_step_budget.'
102+
'This flag takes precedence over the run_percentage flag.')
103+
flags.DEFINE_bool(
104+
'enable_step_budget',
105+
False,
106+
'Flag that has to be explicitly set to override time budgets to step budget percentage.'
107+
)
108+
109+
FLAGS = flags.FLAGS
110+
111+
112+
def read_held_out_workloads(filename):
113+
with open(filename, "r") as f:
114+
held_out_workloads = json.load(f)
115+
return held_out_workloads
116+
117+
118+
def container_running():
119+
docker_client = docker.from_env()
120+
containers = docker_client.containers.list()
121+
if len(containers) == 0:
122+
return False
123+
else:
124+
return True
125+
126+
127+
def kill_containers():
128+
docker_client = docker.from_env()
129+
containers = docker_client.containers.list()
130+
for container in containers:
131+
container.kill()
132+
133+
134+
def gpu_is_active():
135+
output = subprocess.check_output([
136+
'nvidia-smi',
137+
'--query-gpu=utilization.gpu',
138+
'--format=csv,noheader,nounits'
139+
])
140+
return any(int(x) > 0 for x in output.decode().splitlines())
141+
142+
143+
def wait_until_container_not_running(sleep_interval=5 * 60):
144+
# check gpu util
145+
# if the gpu has not been utilized for 30 minutes kill the
146+
gpu_last_active = datetime.datetime.now().timestamp()
147+
148+
while container_running():
149+
# check if gpus have been inactive > 45 min and if so terminate container
150+
if gpu_is_active():
151+
gpu_last_active = datetime.datetime.now().timestamp()
152+
if (datetime.datetime.now().timestamp() - gpu_last_active) > 45 * 60:
153+
kill_containers(
154+
"Killing container: GPUs have been inactive > 45 minutes...")
155+
time.sleep(sleep_interval)
156+
return
157+
158+
159+
def main(_):
160+
framework = FLAGS.framework
161+
experiment_name = FLAGS.experiment_name
162+
docker_image_url = FLAGS.docker_image_url
163+
submission_path = FLAGS.submission_path
164+
tuning_search_space = FLAGS.tuning_search_space
165+
num_studies = FLAGS.num_studies
166+
num_tuning_trials = FLAGS.num_tuning_trials
167+
hparam_start_index_flag = ''
168+
hparam_end_index_flag = ''
169+
if FLAGS.hparam_start_index:
170+
hparam_start_index_flag = f'--hparam_start_index {FLAGS.hparam_start_index} '
171+
if FLAGS.hparam_end_index:
172+
hparam_end_index_flag = f'--hparam_end_index {FLAGS.hparam_end_index} '
173+
study_start_index = FLAGS.study_start_index if FLAGS.study_start_index else 0
174+
if FLAGS.study_end_index is not None:
175+
study_end_index = FLAGS.study_end_index
176+
else:
177+
study_end_index = num_studies - 1
178+
179+
additional_requirements_path_flag = ''
180+
if FLAGS.additional_requirements_path:
181+
additional_requirements_path_flag = f'--additional_requirements_path {FLAGS.additional_requirements_path} '
182+
183+
submission_id = FLAGS.submission_id
184+
185+
rng_seed = FLAGS.seed
186+
187+
if not rng_seed:
188+
rng_seed = struct.unpack('I', os.urandom(4))[0]
189+
190+
logging.info('Using RNG seed %d', rng_seed)
191+
rng_key = (prng.fold_in(prng.PRNGKey(rng_seed), hash(submission_id)))
192+
193+
with open(FLAGS.workload_metadata_path) as f:
194+
workload_metadata = json.load(f)
195+
196+
# Get list of all possible workloads
197+
workloads = [w for w in workload_metadata.keys()]
198+
199+
# Read heldout workloads
200+
if FLAGS.held_out_workloads_config_path:
201+
held_out_workloads = read_held_out_workloads(
202+
FLAGS.held_out_workloads_config_path)
203+
workloads = workloads + held_out_workloads
204+
205+
# Filter workloads if explicit workloads specified
206+
if FLAGS.workloads is not None:
207+
workloads = list(
208+
filter(lambda x: x in FLAGS.workloads.split(','), workloads))
209+
if len(workloads) != len(FLAGS.workloads.split(',')):
210+
unmatched_workloads = set(FLAGS.workloads.split(',')) - set(workloads)
211+
raise ValueError(f'Invalid workload name {unmatched_workloads}')
212+
213+
rng_subkeys = prng.split(rng_key, num_studies)
214+
215+
for study_index, rng_subkey in zip(range(study_start_index, study_end_index + 1), rng_subkeys):
216+
print('-' * 100)
217+
print('*' * 40, f'Starting study {study_index + 1}/{num_studies}', '*' * 40)
218+
print('-' * 100)
219+
study_dir = os.path.join(experiment_name, f'study_{study_index}')
220+
221+
# For each runnable workload check if there are any containers running and if not launch next container command
222+
for workload in workloads:
223+
run_key = prng.fold_in(rng_subkey, hash(workload))
224+
run_seed = run_key[0] # arbitrary
225+
base_workload_name = get_base_workload_name(workload)
226+
wait_until_container_not_running()
227+
os.system(
228+
"sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'") # clear caches
229+
print('=' * 100)
230+
dataset = workload_metadata[base_workload_name]['dataset']
231+
max_steps_flag = ''
232+
if FLAGS.enable_step_budget:
233+
run_fraction = FLAGS.run_percentage / 100.
234+
if FLAGS.max_steps is None:
235+
max_steps = int(workload_metadata[base_workload_name]['max_steps'] *
236+
run_fraction)
237+
else:
238+
max_steps = FLAGS.max_steps
239+
max_steps_flag = f'-m {max_steps}'
240+
241+
mount_repo_flag = ''
242+
if FLAGS.local:
243+
mount_repo_flag = '-v /home/kasimbeg/algorithmic-efficiency:/algorithmic-efficiency '
244+
command = ('docker run -t -d -v /home/kasimbeg/data/:/data/ '
245+
'-v /home/kasimbeg/experiment_runs/:/experiment_runs '
246+
'-v /home/kasimbeg/experiment_runs/logs:/logs '
247+
f'{mount_repo_flag}'
248+
'--gpus all --ipc=host '
249+
f'{docker_image_url} '
250+
f'-d {dataset} '
251+
f'-f {framework} '
252+
f'-s {submission_path} '
253+
f'-w {workload} '
254+
f'-e {study_dir} '
255+
f'{max_steps_flag} '
256+
f'--num_tuning_trials {num_tuning_trials} '
257+
f'--rng_seed {run_seed} '
258+
f'{additional_requirements_path_flag}'
259+
'-c false '
260+
'-o true '
261+
'-i true ')
262+
263+
# Append tuning ruleset flags
264+
tuning_ruleset_flags = ''
265+
if FLAGS.tuning_ruleset == 'external':
266+
tuning_ruleset_flags += f'--tuning_ruleset {FLAGS.tuning_ruleset} '
267+
tuning_ruleset_flags += f'-t {tuning_search_space} '
268+
tuning_ruleset_flags += f'{hparam_start_index_flag} '
269+
tuning_ruleset_flags += f'{hparam_end_index_flag} '
270+
else:
271+
tuning_ruleset_flags += f'--tuning_ruleset {FLAGS.tuning_ruleset} '
272+
273+
command += tuning_ruleset_flags
274+
275+
if not FLAGS.dry_run:
276+
print('Running docker container command')
277+
print('Container ID: ')
278+
return_code = os.system(command)
279+
else:
280+
return_code = 0
281+
if return_code == 0:
282+
print(
283+
f'SUCCESS: container for {framework} {workload} launched successfully'
284+
)
285+
print(f'Command: {command}')
286+
print(f'Results will be logged to {experiment_name}')
287+
else:
288+
print(
289+
f'Failed: container for {framework} {workload} failed with exit code {return_code}.'
290+
)
291+
print(f'Command: {command}')
292+
wait_until_container_not_running()
293+
os.system(
294+
"sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'") # clear caches
295+
296+
print('=' * 100)
297+
298+
299+
if __name__ == '__main__':
300+
flags.mark_flag_as_required('workload_metadata_path')
301+
app.run(main)

scoring/utils/slurm/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
This folder contains a SLURM batch script that can be used to run jobs where each job corresponds to a training run on a given workload, training algorithm, random seed and tuning trial (if on external tuning ruleset)

0 commit comments

Comments
 (0)