Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys
from typing import Any

# Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path(__file__).parent
Expand All @@ -12,6 +13,7 @@
import util
import vision_cfg
from ray import tune
from ray.tune.stopper import Stopper


class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg):
Expand Down Expand Up @@ -47,3 +49,21 @@ def __init__(self, cfg: dict = {}):
cfg = util.populate_isaac_ray_cfg_args(cfg)
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
super().__init__(cfg)


class CartpoleEarlyStopper(Stopper):
def __init__(self):
self._bad_trials = set()

def __call__(self, trial_id: str, result: dict[str, Any]) -> bool:
iter = result.get("training_iteration", 0)
out_of_bounds = result.get("Episode/Episode_Termination/cart_out_of_bounds")

# Mark the trial for stopping if conditions are met
if 20 <= iter and out_of_bounds is not None and out_of_bounds > 0.85:
self._bad_trials.add(trial_id)

return trial_id in self._bad_trials

def stop_all(self) -> bool:
return False # only stop individual trials
65 changes: 58 additions & 7 deletions scripts/reinforcement_learning/ray/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
import argparse
import importlib.util
import os
import random
import subprocess
import sys
from time import sleep, time

import ray
import util
from ray import air, tune
from ray.tune import Callback
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.repeater import Repeater
from ray.tune.stopper import CombinedStopper

"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
Expand Down Expand Up @@ -60,7 +63,7 @@
NUM_WORKERS_PER_NODE = 1 # needed for local parallelism
PROCESS_RESPONSE_TIMEOUT = 200.0 # seconds to wait before killing the process when it stops responding
MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS = 1000 # maximum number of lines to read from the training process logs
MAX_LOG_EXTRACTION_ERRORS = 2 # maximum allowed LogExtractionErrors before we abort the whole training
MAX_LOG_EXTRACTION_ERRORS = 10 # maximum allowed LogExtractionErrors before we abort the whole training


class IsaacLabTuneTrainable(tune.Trainable):
Expand Down Expand Up @@ -203,13 +206,38 @@ def stop_all(self):
return False


def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
class ProcessCleanupCallback(Callback):
"""Callback to clean up processes when trials are stopped."""

def on_trial_error(self, iteration, trials, trial, error, **info):
"""Called when a trial encounters an error."""
self._cleanup_trial(trial)

def on_trial_complete(self, iteration, trials, trial, **info):
"""Called when a trial completes."""
self._cleanup_trial(trial)

def _cleanup_trial(self, trial):
"""Clean up processes for a trial using SIGKILL."""
try:
subprocess.run(["pkill", "-9", "-f", f"rid {trial.config['runner_args']['-rid']}"], check=False)
sleep(5)
except Exception as e:
print(f"[ERROR]: Failed to cleanup trial {trial.trial_id}: {e}")


def invoke_tuning_run(
cfg: dict,
args: argparse.Namespace,
stopper: tune.Stopper | None = None,
) -> None:
"""Invoke an Isaac-Ray tuning run.

Log either to a local directory or to MLFlow.
Args:
cfg: Configuration dictionary extracted from job setup
args: Command-line arguments related to tuning.
stopper: Custom stopper, optional.
"""
# Allow for early exit
os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1"
Expand Down Expand Up @@ -237,16 +265,23 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
)
repeat_search = Repeater(searcher, repeat=args.repeat_run_count)

# Configure the stoppers
stoppers: CombinedStopper = CombinedStopper(*[
LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
*([stopper] if stopper is not None else []),
])

if args.run_mode == "local": # Standard config, to file
run_config = air.RunConfig(
storage_path="/tmp/ray",
name=f"IsaacRay-{args.cfg_class}-tune",
callbacks=[ProcessCleanupCallback()],
verbose=1,
checkpoint_config=air.CheckpointConfig(
checkpoint_frequency=0, # Disable periodic checkpointing
checkpoint_at_end=False, # Disable final checkpoint
),
stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
stop=stoppers,
)

elif args.run_mode == "remote": # MLFlow, to MLFlow server
Expand All @@ -260,13 +295,14 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
run_config = ray.train.RunConfig(
name="mlflow",
storage_path="/tmp/ray",
callbacks=[mlflow_callback],
callbacks=[ProcessCleanupCallback(), mlflow_callback],
checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
stop=stoppers,
)
else:
raise ValueError("Unrecognized run mode.")

# RID isn't optimized as it is sampled from, but useful for cleanup later
cfg["runner_args"]["-rid"] = tune.sample_from(lambda _: str(random.randint(int(1e9), int(1e10) - 1)))
# Configure the tuning job
tuner = tune.Tuner(
IsaacLabTuneTrainable,
Expand Down Expand Up @@ -399,6 +435,12 @@ def __init__(self, cfg: dict):
default=MAX_LOG_EXTRACTION_ERRORS,
help="Max number number of LogExtractionError failures before we abort the whole tuning run.",
)
parser.add_argument(
"--stopper",
type=str,
default=None,
help="A stop criteria in the cfg_file, must be a tune.Stopper instance.",
)

args = parser.parse_args()
PROCESS_RESPONSE_TIMEOUT = args.process_response_timeout
Expand Down Expand Up @@ -457,7 +499,16 @@ def __init__(self, cfg: dict):
print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}")
cfg = instance.cfg
print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}")
invoke_tuning_run(cfg, args)
# Load optional stopper config
stopper = None
if args.stopper and hasattr(module, args.stopper):
stopper = getattr(module, args.stopper)
if isinstance(stopper, type) and issubclass(stopper, tune.Stopper):
stopper = stopper()
else:
raise TypeError(f"[ERROR]: Unsupported stop criteria type: {type(stopper)}")
print(f"[INFO]: Loaded custom stop criteria from '{args.stopper}'")
invoke_tuning_run(cfg, args, stopper=stopper)

else:
raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}")
2 changes: 1 addition & 1 deletion scripts/reinforcement_learning/ray/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def process_args(args, target_list, is_hydra=False):
if not is_hydra:
if key.endswith("_singleton"):
target_list.append(value)
elif key.startswith("--"):
elif key.startswith("--") or key.startswith("-"):
target_list.append(f"{key} {value}") # Space instead of = for runner args
else:
target_list.append(f"{value}")
Expand Down
3 changes: 3 additions & 0 deletions scripts/reinforcement_learning/rl_games/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
help="if toggled, this experiment will be tracked with Weights and Biases",
)
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
parser.add_argument(
"--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None."
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
Expand Down
3 changes: 3 additions & 0 deletions scripts/reinforcement_learning/rsl_rl/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
"--distributed", action="store_true", default=False, help="Run training with multiple GPUs or nodes."
)
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
parser.add_argument(
"--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None."
)
# append RSL-RL cli arguments
cli_args.add_rsl_rl_args(parser)
# append AppLauncher cli args
Expand Down
3 changes: 3 additions & 0 deletions scripts/reinforcement_learning/sb3/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
default=False,
help="Use a slower SB3 wrapper but keep all the extra training info.",
)
parser.add_argument(
"--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None."
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
Expand Down
4 changes: 3 additions & 1 deletion scripts/reinforcement_learning/skrl/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
choices=["AMP", "PPO", "IPPO", "MAPPO"],
help="The RL algorithm used for training the skrl agent.",
)

parser.add_argument(
"--ray-proc-id", "-rid", type=int, default=None, help="Automatically configured by Ray integration, otherwise None."
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
Expand Down