diff --git a/src/cmd/flux-hop.py b/src/cmd/flux-hop.py new file mode 100755 index 00000000..0a25aeb6 --- /dev/null +++ b/src/cmd/flux-hop.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import random +import sys + +try: + import flux +except ImportError: + sys.exit("'flux hop' must be run in the context of a Flux instance") + +import flux_k8s.operator.minicluster as flux_operator + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + +def generate_job_name(): + """ + Generates a random, two-word name from predefined lists. + + This is akin to Docker, and I have a dumb function like this in libraries + back to 2017. + """ + adjectives = [ + "agile", + "brave", + "clever", + "curious", + "dreamy", + "fast", + "glowing", + "happy", + "jolly", + "keen", + "lucky", + "noble", + "serene", + "sparkling", + "sunny", + "vivid", + "whimsical", + "zesty", + "stinky", + "luxurious", + "cloudy", + "dusty", + "electric", + "asynchronous", + "atomic", + "binary", + "cached", + "converged", + "distributed", + "elastic", + "giga", + "ephemeral", + "headless", + "linked", + "parallel", + "photonic", + "peta", + "quantum", + "recursive", + "serverless", + "stateless", + "streaming", + "tera", + "threaded", + "turbo", + "virtual", + "vector", + "farty", + "shiny", + "squeaky", + "cozy", + "wooden", + "plastic", + "bouncing", + "clumsy", + "dapper", + "goofy", + "gossipy", + "wandering", + "wobbly", + "zany", + "giggling", + "sleepy", + ] + + nouns = [ + "breeze", + "comet", + "dragon", + "griffin", + "kitten", + "meadow", + "nova", + "ocean", + "otter", + "panda", + "phoenix", + "pixel", + "quokka", + "quasar", + "rabbit", + "river", + "robot", + "star", + "willow", + "accelerator", + "algorithm", + "cluster", + "container", + "core", + "daemon", + "dongle", + "firewall", + "gateway", + "gpu", + "kernel", + "lambda", + "node", + "packet", + "pipeline", + "scheduler", + "socket", + "switch", + "token", + "iterator", + "tensor", + "pancakes", + "corgi", + "capybara", + "gecko", + "lemur", + "narwhal", + "wombat", + "potato", + "pancake", + "waffle", + "noodles", + "blender", + "cushion", + "doorknob", + "fork", + "kettle", + "lamp", + "router", + "sofa", + "spatula", + "stapler", + "toaster", + "dustbunny", + ] + + adjective = random.choice(adjectives) + noun = random.choice(nouns) + + return f"{adjective}-{noun}" + + +def get_parser(): + """ + Parse command-line arguments. + + These will be used to populate the Flux MiniCluster. + """ + parser = argparse.ArgumentParser(description="Rabbit MPI MiniCluster Generator") + + # Group for Rabbit MPI specific options for better help output + group = parser.add_argument_group("Rabbit MPI Options") + + group.add_argument( + "--image", + default=flux_operator.default_container, + help=f"Container image to use for the MPI job. Defaults to '{flux_operator.default_container}'.", + ) + group.add_argument( + "--workdir", + default=None, + help="Working directory to set inside the container. Defaults to the container's WORKDIR.", + ) + group.add_argument( + "--command", + default=None, + help="Command to execute in the container. If unset, the MiniCluster will be interactive.", + ) + group.add_argument( + "--name", + default=None, + help="Name for the MiniCluster", + ) + group.add_argument( + "--namespace", + default="default", + help="Namespace for the MiniCluster", + ) + group.add_argument( + "--tasks", + type=int, + default=None, + help="Total number of MPI tasks to request for the job (at your discretion)", + ) + group.add_argument( + "--nodes", + type=int, + default=None, + help="Number of nodes to request for the job. Defaults to using selected rabbit length.", + ) + group.add_argument( + "--rabbits", + default=None, + help="Comma-separated list of specific rabbit nodes to use (e.g., 'hetchy201,hetchy202').", + ) + group.add_argument( + "--add-flux", + action=argparse.BooleanOptionalAction, + default=True, + help="Add the Flux view to the container. Use --no-add-flux if your container has Flux.", + ) + group.add_argument( + "--succeed", + action="store_true", + help="Force the job to always succeed, regardless of the command's exit code.", + ) + group.add_argument( + "-e", + "--env", + action="append", + help="Set an environment variable in the container (e.g., --env KEY=VALUE). Can be specified multiple times.", + ) + return parser + + +def parse_env(environ): + """ + Parse the provided environment. + """ + env_dict = {} + for item in environ: + if "=" not in item: + LOGGER.error( + f"Invalid environment variable format: '{item}'. Use KEY=VALUE." + ) + sys.exit(1) + key, value = item.split("=", 1) + env_dict[key] = value + + +def main(): + """ + Main entrypoint for the client script. + """ + parser = get_parser() + args = parser.parse_args() + + # --rabbits are required for now. + if not args.rabbits: + LOGGER.error("List of rabbit nodes --rabbits is required. Exiting.") + sys.exit(0) + + # Generate a faux jobspec - for now we just need attributes (and not resources) + wabbits = [x.strip() for x in args.rabbits.split(",")] + mpi_attributes = {"rabbits": wabbits} + if args.image: + mpi_attributes["image"] = args.image + if args.workdir: + mpi_attributes["workdir"] = args.workdir + if args.command: + mpi_attributes["command"] = args.command + if args.tasks: + mpi_attributes["tasks"] = args.tasks + if args.nodes: + mpi_attributes["nodes"] = args.nodes + if not args.nodes: + mpi_attributes["nodes"] = len(wabbits) + if args.succeed: + # The class checks for presence, so any non-None value works. + mpi_attributes["succeed"] = True + + if args.add_flux is False: + mpi_attributes["add_flux"] = args.add_flux + + # Parse environment variables from KEY=VALUE strings into a dictionary + if args.env: + mpi_attributes["env"] = parse_env(args.env) + + # Construct the final jobspec + jobspec = { + "attributes": { + "system": { + "rabbit": { + "mpi": mpi_attributes, + } + } + } + } + + # We can now generate a RabbitMPI job akin to if we had a normal JobSpec + rabbit_job = flux_operator.RabbitMPI(jobspec) + + # You can now use the rabbit_job object to get the processed attributes. + print("--- Generated Rabbit MPI Configuration ---") + print(f"Working Directory: {rabbit_job.workdir}") + print(f" Container Image: {rabbit_job.container}") + print(f" Always Succeed: {rabbit_job.always_succeed}") + print(f" Target Rabbits: {rabbit_job.rabbits}") + print(f" Add Flux View: {rabbit_job.add_flux()}") + print(f" Interactive: {rabbit_job.interactive}") + print(f" Environment: {rabbit_job.environment}") + print(f" Enabled: {rabbit_job.is_enabled()}") + print(f" Command: {rabbit_job.command}") + print(f" Tasks: {rabbit_job.tasks}") + print(f" Nodes: {rabbit_job.nodes}") + + # Make a funny name! + name = args.name or generate_job_name() + + # Generate a regular MiniCluster + handle = flux.Flux() + minicluster = flux_operator.MiniCluster(handle, name=name, namespace=args.namespace) + minicluster.generate(rabbit_job) + + LOGGER.info("Rabbit MPI jobspec processed successfully.") + + +if __name__ == "__main__": + main() diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 673ae649..322ed2f8 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -30,8 +30,8 @@ import urllib3 import flux +import flux.job from flux.hostlist import Hostlist -from flux.job.JobID import id_parse from flux.constants import FLUX_MSGTYPE_REQUEST from flux.future import Future import flux_k8s @@ -48,6 +48,8 @@ WorkflowState, ) +import flux_k8s.operator.minicluster as flux_operator + LOGGER = logging.getLogger(__name__) WORKFLOWS_IN_TC = {} # tc for TransientCondition @@ -260,6 +262,7 @@ def create_cb(handle, _t, msg, k8s_api): msg.payload["resources"], int(msg.payload["failure_tolerance"]), ) + # submit a memo providing the name of the workflow handle.rpc( "job-manager.memo", @@ -328,6 +331,8 @@ def setup_cb(handle, _t, msg, k8s_api): desiredState. """ jobid = msg.payload["jobid"] + info = flux.job.get_job(handle, flux.job.JobID(jobid)) + userid = info["userid"] hlist = Hostlist(msg.payload["R"]["execution"]["nodelist"]).uniq() workflow_name = WorkflowInfo.get_name(jobid) workflow = k8s_api.get_namespaced_custom_object(*crd.WORKFLOW_CRD, workflow_name) @@ -335,6 +340,9 @@ def setup_cb(handle, _t, msg, k8s_api): for hostname in hlist: nnf_name = storage.HOSTNAMES_TO_RABBITS[hostname] nodes_per_nnf[nnf_name] = nodes_per_nnf.get(nnf_name, 0) + 1 + + # We need to give the rabbit nodes to the Flux Operator + rabbits = list(nodes_per_nnf.keys()) handle.rpc( "job-manager.memo", payload={ @@ -375,6 +383,25 @@ def setup_cb(handle, _t, msg, k8s_api): args=(handle, k8s_api, winfo, hlist, lustre), ).start() + # Create the MiniCluster post rabbit success. + # Note that we don't need a Flux handle here if we modify the jobtap plugin to + # add additional info. It's redundant to get resources again. + jobspec = flux.job.kvslookup.job_kvs_lookup(handle, jobid)["jobspec"] + + # --setattr=rabbit.mpi being set to anything triggers a minicluster + # A rabbit minicluster is expected to be created with a workflow + is_requested = flux_operator.MiniCluster.is_requested(jobspec) + is_allowed = flux_operator.MiniCluster.is_allowed(jobspec) + if is_requested and is_allowed: + minicluster = flux_operator.RabbitMiniCluster( + handle=handle, + jobid=jobid, + userid=userid, + name=workflow["metadata"]["name"], + namespace=workflow["metadata"].get("namespace"), + ) + minicluster.generate(jobspec, rabbits) + def drain_nodes_with_mounts(handle, k8s_api, winfo): """Drain all nodes that have not yet unmounted.""" @@ -1053,7 +1080,7 @@ def raise_self_exception(handle): implemented/closed, this can be replaced with that solution. """ try: - jobid = id_parse(os.environ["FLUX_JOB_ID"]) + jobid = flux.job.id_parse(os.environ["FLUX_JOB_ID"]) except KeyError: return Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get() diff --git a/src/python/flux_k8s/operator/__init__.py b/src/python/flux_k8s/operator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/python/flux_k8s/operator/defaults.py b/src/python/flux_k8s/operator/defaults.py new file mode 100644 index 00000000..3136e0e8 --- /dev/null +++ b/src/python/flux_k8s/operator/defaults.py @@ -0,0 +1,54 @@ +# Defaults +# https://github.com/converged-computing/flux-slingshot/blob/main/minicluster.yaml + +# This has flux built with cxi for slingshot +container = "ghcr.io/converged-computing/flux-slingshot:ubuntu2404" + +# Flux Operator versions +group = "flux-framework.org" +version = "v1alpha2" +namespace = "default" +plural = "miniclusters" + +# Devices and volumes +device_path = "/sys/devices" +net_path = "/sys/class/net" + +# CXI device plugin label for request +cxi_device_label = "beta.hpe.com/cxi" + +# Persistent Volume and Persistent Volume Claim defaults +storage_capacity = "1Gi" +fs_type = "lustre" +csi_driver = "lustre-csi.hpe.com" +storage_class_name = "nnf-lustre-fs" +volume_reclaim_policy = "Retain" +rabbit_mount = "/mnt/nnf/rabbit" + +# TODO ask James if we can put this in config somewhere +volume_handle = "51@kfi:32@kfi:/lslide" + +# Default environment for network +environment = { + "OMPI_MCA_orte_base_help_aggregate": "0", + "OMPI_MCA_btl": "^openib", +} + +# Default volumes for network +volumes = { + "devices": {"hostPath": device_path, "path": device_path}, + "net": {"hostPath": net_path, "path": net_path}, +} + +# Not sure there is any reason to change this +podspec = { + "pod": {"nodeSelector": {"cray.nnf.node": "true"}}, + "tolerations": [ + { + "effect": "NoSchedule", + "key": "cray.nnf.node", + "operator": "Equal", + "value": "true", + } + ], +} diff --git a/src/python/flux_k8s/operator/minicluster.py b/src/python/flux_k8s/operator/minicluster.py new file mode 100644 index 00000000..cefb1abb --- /dev/null +++ b/src/python/flux_k8s/operator/minicluster.py @@ -0,0 +1,480 @@ +import copy +import logging +import random +import os + +import flux_k8s.operator.defaults as defaults +from flux_k8s import cleanup +from flux_k8s.operator.rabbits import RabbitMPI +from flux_k8s.operator.volumes import VolumeManager, teardown_rabbit_volumes +from flux_k8s.crd import DEFAULT_NAMESPACE, WORKFLOW_CRD +from kubernetes import client, config + + +import flux +import flux.job + +LOGGER = logging.getLogger(__name__) + + +def teardown_minicluster(handle, winfo): + """ + Tear down the MiniCluster, first saving the lead broker log to KVS + """ + LOGGER.warning("START OF TEARDOWN MINICLUSTER") + minicluster = RabbitMiniCluster( + handle=handle, + jobid=winfo.jobid, + name=winfo.name, + namespace=DEFAULT_NAMESPACE, + ) + + k8s_api = client.CoreV1Api( + config.new_client_from_config(handle.conf_get("rabbit.kubeconfig")) + ) + + # Cut out early if we don't exist. + LOGGER.warning("CHECKING EXIST") + if not minicluster.exists(): + return teardown_rabbit_volumes(k8s_api, winfo.jobid, DEFAULT_NAMESPACE) + LOGGER.warning("I AM EXISTS") + + # Get the lead broker logs and save to KVS + log = minicluster.logs() + if log: + LOGGER.warning(log) + LOGGER.warning("WE HAAWDSADAD LOG") + with flux.job.job_kvs(handle, winfo.jobid) as kvsdir: + kvsdir["rabbitmpi_container_log"] = log[-50000:] + + # And finally, cleanup + minicluster.delete() + return teardown_rabbit_volumes(k8s_api, winfo.jobid, namespace=DEFAULT_NAMESPACE) + + +def delete_minicluster(k8s_api, name, namespace): + """ + Delete a MiniCluster by name and namespace. + + We can make this asynchronous with retry in a loop if needed. + Kubernetes should not need that, so let's test without first. + """ + # No grace period - be ruthless! + delete_options = client.V1DeleteOptions( + propagation_policy="Background", grace_period_seconds=0 + ) + try: + k8s_api.delete_namespaced_custom_object( + group=defaults.group, + version=defaults.version, + namespace=namespace, + plural=defaults.plural, + name=name, + body=delete_options, + ) + LOGGER.warning( + f"MiniCluster '{name}' in namespace '{namespace}' deleted successfully." + ) + except client.rest.ApiException as e: + if e.status == 404: + # Not found (never created or already cleaned up) + return + else: + LOGGER.warning( + f"API Error deleting MiniCluster '{name}' in '{namespace}': {e}" + ) + + +class MiniCluster: + """ + A MiniClusterBase does not require an official workflow. + + We can launch it directly on the rabbit nodes separately from a Flux Job. + This is likely for testing (or fun) - "flux hop" - and we just require + a populated job. + """ + + def __init__( + self, handle, name, namespace=defaults.namespace, jobid=None, userid=None + ): + """ + A MiniCluster Family object is used for a scoped session in a Flux instance. + + We assume it is associated with a specific, top level jobid + """ + self.handle = handle + self.jobid = jobid + self.userid = userid + self.name = name + self.namespace = namespace + self.k8s_api = cleanup.get_k8s_api(self.handle.conf_get("rabbit.kubeconfig")) + + @classmethod + def is_requested(cls, jobspec): + """ + Determine if a MiniCluster is required via the rabbit.mpi attribute. + """ + return RabbitMPI(jobspec).is_enabled() + + @classmethod + def is_allowed(cls, jobspec): + """ + Determine if a MiniCluster is allowed. + + TODO: we need to check the user, specifics of the jobspec, etc. + """ + allowed_users = ["corbett8", "sochat1", "milroy1", "mcfadden8"] + assert allowed_users + return True + + @property + def podspec(self): + """ + Generate the PodSpec for the MiniCluster. E.g., + + pod: + nodeSelector: + cray.nnf.node: "true" + tolerations: + - effect: NoSchedule + key: cray.nnf.node + operator: Equal + value: "true" + """ + # Not necessary, but being pedantic and paranoid + podspec = copy.deepcopy(defaults.podspec) + sc = self.security_context + sc["fsGroup"] = self.userid + sc["fsGroupChangePolicy"] = "OnRootMismatch" + podspec["securityContext"] = sc + return podspec + + def generate(self, job): + return self._generate(job) + + @property + def security_context(self): + return { + "privileged": False, + "runAsUser": self.userid, + "runAsGroup": self.userid, + "runAsNonRoot": True, + } + + @property + def resources(self): + """ + Resources are required to request a cxi device via the device driver. + https://github.com/converged-computing/cxi-k8s-device-plugin + """ + return { + "limits": {defaults.cxi_device_label: 1}, + "requests": {defaults.cxi_device_label: 1}, + } + + def volumes(self, **kwargs): + """ + Prepare volumes for job. + + A default set of MiniCluster volumes assumes wanting the network fabric, but not + rabbits. + + volumes: + devices: + hostPath: /sys/devices + path: /sys/devices + net: + hostPath: /sys/class/net + path: /sys/class/net + """ + return defaults.volumes + + def _generate(self, job): + """ + Generate a standalone MiniCluster. + + This is shared by both types of MiniCluster. The main difference is that + an official rabbit job will be populated by an actual JobSpec and workflow, + and a "flux hop" standalone cluster is generated artificially without one. + """ + metadata = client.V1ObjectMeta(name=self.name, namespace=self.namespace) + + # TODO: I think we can leave duration unset, assuming it will be + # cleaned up by job deletion. If this cannot be assumed, we need to set + # something to be slightly less than the job (but I don't like this). + + # For resources, we don't need to do anything complicated. + # We assign to number of rabbit nodes (or fewer). + # This returns a list of rabbit names, accounting for + # user requests for count and specific nodes + nodes = self.calculate_nodes(job) + + # The container security context is akin to pod, but we drop all caps. + # This is what the NNF container does - we should check if needed + sc = self.security_context + sc["capabilities"] = { + "add": [ + "NET_BIND_SERVICE", + "SYS_CHROOT", + "AUDIT_WRITE", + "SETUID", + "SETGID", + ], + "drop": ["all"], + } + + # The application container with or without Flux + # If the view is disabled, Flux must be installed. + container = { + "command": job.command, + "image": job.container, + "workingDir": job.workdir, + "name": self.name, + "launcher": False, + "environment": job.environment, + "volumes": self.volumes(job), + "resources": self.resources, + "imagePullPolicy": job.pull_policy, + "securityContext": sc, + } + LOGGER.warning(container) + + # Should the job always succeed? + # TODO: add other labels for query + labels = {} + if job.always_succeed: + labels["always-succeed"] = "1" + + # Add node affinity to ensure we schedule to subset of rabbit nodes + podspec = self.podspec + podspec["nodeAffinity"] = {"kubernetes.io/hostname": nodes} + + # The main spec needs the job container, sizes, and the podspec + LOGGER.warning(f"Generating spec for {nodes} nodes and {job.tasks} tasks.") + spec = { + "containers": [container], + "interactive": job.interactive, + "jobLabels": labels, + # TODO: can we allow autoscaling? + "maxSize": len(nodes), + "size": len(nodes), + "tasks": job.tasks, + "pod": podspec, + "flux": {}, + } + + # Flux container with required permissions + flux_container = {"securityContext": self.security_context} + + if not job.add_flux(): + flux_container["disable"] = True + spec["flux"]["container"] = flux_container + + # Ask for exclusive nodes + if job.exclusive: + spec["flux"]["optionFlags"] = "--exclusive" + + # Make this bad boi. + minicluster = { + "kind": "MiniCluster", + "metadata": metadata, + "apiVersion": "flux-framework.org/v1alpha2", + "spec": spec, + } + return self.create(minicluster) + + def create(self, minicluster): + """ + Wrapper to create the MiniCluster with the CustomObject API + """ + try: + return self.k8s_api.create_namespaced_custom_object( + **self.crd_info, body=minicluster + ) + except client.rest.ApiException as e: + name = minicluster["metadata"].name + if e.reason == "Conflict": + LOGGER.warning( + f"MiniCluster job for {name} exists, assuming resumed: {e.reason}" + ) + else: + LOGGER.warning( + f"There was a create MiniCluster error for {name}: {e.reason}, {e}" + ) + + def delete(self): + """ + Basic deletion function for an instance + """ + return delete_minicluster(self.k8s_api, self.name, self.namespace) + + @property + def crd_info(self): + """ + Shared MiniCluster information for requests + """ + return { + "group": defaults.group, + "version": defaults.version, + "namespace": self.namespace, + "plural": defaults.plural, + } + + def exists(self): + """ + Determine if a MiniCluster exists. + """ + try: + found = self.k8s_api.get_namespaced_custom_object( + name=self.name, **self.crd_info + ) + LOGGER.warning(found) + return True + except Exception as e: + LOGGER.warning(f"Exception: {e}") + return False + + def logs(self): + """ + Get the lead broker log. + """ + k8s_api = client.CoreV1Api( + config.new_client_from_config(self.handle.conf_get("rabbit.kubeconfig")) + ) + + # Get pods associated with the jobid + selector = f"batch.kubernetes.io/job-name={self.name}" + LOGGER.warning(selector) + pods = k8s_api.list_namespaced_pod( + label_selector=selector, namespace=self.namespace + ).items + + # Just save the lead broker for now - should be the first one + lead_broker = [x for x in pods if f"{self.name}-0-" in x.metadata.name] + if not lead_broker: + LOGGER.warning(f"Cannot find pods for {selector}") + return + lead_broker = lead_broker[0] + + try: + return k8s_api.read_namespaced_pod_log( + name=lead_broker.metadata.name, + namespace=self.namespace, + follow=False, + timestamps=True, + ) + except client.rest.ApiException as e: + LOGGER.warning(f"Error getting logs: {e}") + + def calculate_nodes(self, job): + """ + Calculate the desired number of nodes. + + We assume a size == the number of rabbits. + We allow the user to specify fewer. + """ + # This is the number of nodes, and rabbit node names + nnodes = job.nodes + LOGGER.warning(job.wabbits) + LOGGER.warning(type(job.wabbits)) + wabbits = copy.deepcopy(job.wabbits) + + # Not allowed to request more than the number of rabbits! + nnodes = len(wabbits) if not nnodes else min(len(wabbits), nnodes) + + # Let's assume we want random selection, unless the user requests differently + random.shuffle(wabbits) + + # Check if we have node names. Note that if there are fewer names + # requested than total rabbits, we only provision the subset. + rabbit_names = job.rabbits + if rabbit_names is not None: + wabbits = list(set(wabbits).intersection(set(rabbit_names))) + + # To be more efficient, return the node names here + return wabbits[0:nnodes] + + +class RabbitMiniCluster(MiniCluster): + """ + Handle to interact with and generate Flux Operator MiniClusters. + + This MiniCluster is created with an official rabbit job and Flux. + We have a volume manager added to the standard MiniCluster class. + """ + + def volumes(self, job): + """ + Prepare volumes for job. + + A default set of MiniCluster volumes assumes wanting the network fabric, but not + rabbits. + + volumes: + devices: + hostPath: /sys/devices + path: /sys/devices + net: + hostPath: /sys/class/net + path: /sys/class/net + """ + # TODO this needs an index (and number that increments up with count) + # e.g., + volumes = defaults.volumes + rabbit_path = os.path.join("/mnt", "nnf", self.get_rabbit_volume_name()) + volumes["rabbit"] = {"hostPath": rabbit_path, "path": job.rabbit_mount} + return volumes + + def get_rabbit_volume_name(self, count=0): + """ + Get the rabbit volume name, which is the UID associated with the workflow object. + TODO: how would this represent multiple rabbits? + """ + workflow = self.k8s_api.get_namespaced_custom_object( + name=self.name, + namespace=self.namespace, + group=WORKFLOW_CRD.group, + version=WORKFLOW_CRD.version, + plural=WORKFLOW_CRD.plural, + ) + LOGGER.warning(workflow) + return workflow["metadata"]["uid"] + f"-{count}" + + def generate(self, jobspec, wabbits): + """ + Submit a minicluster job to Kubernetes on the rabbits. + + We receive the Workflow CRD to retrieve metadata for. If necessary, + we should get and interact with other CRD abstractions related to the + rabbits. We need to inspect what the workflow gives us (and what is missing) + to decide. + """ + # If we don't have a jobid, we can't continue + if not self.jobid: + LOGGER.warning( + "A jobid is required for a RabbitMiniCluster to associate rabbit nodes." + ) + return + + # We need to get the rabbit nodes from the job + LOGGER.warning(f"handle: {self.handle}") + LOGGER.warning(f"jobid: {self.jobid}") + LOGGER.warning(f"wabbits: {wabbits}") + + # Generate the volumes first, oriented for the manager + k8s_api = client.CoreV1Api( + config.new_client_from_config(self.handle.conf_get("rabbit.kubeconfig")) + ) + manager = VolumeManager(k8s_api, jobid=self.jobid, namespace=self.namespace) + + client.CoreV1Api( + config.new_client_from_config(self.handle.conf_get("rabbit.kubeconfig")) + ) + + # A PV is a "persistent volume" and a pvc is a "persistent volume claim" + manager.create_persistent_volume(wabbits) + manager.create_persistent_volume_claim() + + # This serves as easy access to job metadata + job = RabbitMPI(jobspec, wabbits) + return self._generate(job) diff --git a/src/python/flux_k8s/operator/rabbits.py b/src/python/flux_k8s/operator/rabbits.py new file mode 100644 index 00000000..2d3596e0 --- /dev/null +++ b/src/python/flux_k8s/operator/rabbits.py @@ -0,0 +1,237 @@ +import logging + +import flux_k8s.operator.defaults as defaults + +LOGGER = logging.getLogger(__name__) + + +class RabbitMPI: + """ + A RabbitMPI Job is a Flux MiniCluster job running on the rabbits. + + This class serves as a wrapper around a Flux jobspec to easily get + rabbit MPI attributes for the MiniCluster. "Bwing in... the wabbits!" + It can be a real jobspec, or a faux one with attributes populated. + """ + + def __init__(self, jobspec, wabbits=None): + self.jobspec = jobspec + + # We must have the wabbit nodes. + self.set_wabbits(wabbits) + + def is_enabled(self): + """ + Having the MPI attribute (with anything) indicates being enabled. + """ + if not self.jobspec.get("attributes", {}).get("system", {}).get("rabbit"): + return False + return ( + self.jobspec.get("attributes", {}) + .get("system", {}) + .get("rabbit") + .get("mpi") + is not None + ) + + def is_false(self, value): + """ + Determine if a directive is false. + + We assume unset (None) defaults to whatever the field default is. + """ + # So many ways to say no! + if isinstance(value, str): + value = value.lower() + return value in ["false", False, "no", "0", 0] + + def get_attribute(self, name): + """ + Get an attribute from the jobspec for the rabbit.mpi + + This needs to return None if there is an empty string or similar. + I'd ideally like to put this under attributes (and not system) but likely + system is more correct :) + """ + rabbit_directive = ( + self.jobspec.get("attributes", {}).get("system", {}).get("rabbit") + ) + if not rabbit_directive or not isinstance(rabbit_directive, dict): + return None + + mpi_directive = rabbit_directive.get("mpi") + if not mpi_directive or not isinstance(mpi_directive, dict): + return None + + return ( + self.jobspec.get("attributes", {}) + .get("system", {}) + .get("rabbit") + .get("mpi", {}) + .get(name) + or None + ) + + @property + def container(self): + """ + User specified container image, or use flux slingshot (cxi) base + + Example: + --setattr=rabbit.mpi.image="ghcr.io/converged-computing/lammps-reax:ubuntu2404-cxi" + """ + return self.get_attribute("image") or defaults.container + + @property + def workdir(self): + """ + User specified working directory, or honor what container has set. + + Example: + --setattr=rabbit.mpi.workdir="/opt/lammps/examples/reaxff/HNS" + """ + return self.get_attribute("workdir") + + @property + def pull_policy(self): + """ + User specified pull policy (defaults to IfNotPresent) + + Example: + --setattr=rabbit.mpi.pull_policy=Always + """ + return self.get_attribute("pull_policy") + + @property + def command(self): + """ + User specified command (tested, this works)! + + Example: + --setattr=rabbit.mpi.command='lmp -x1 -x2 -x3' + """ + return self.get_attribute("command") + + @property + def exclusive(self): + """ + User requested exclusive nodes + + Example: + --setattr=rabbit.mpi.exclusive=true + """ + return self.get_attribute("exclusive") is not None + + def add_flux(self): + """ + A directive that says "I already have Flux in my container" + We really only care if this is set to some variant of NO. + + Example: + --setattr=rabbit.mpi.add_flux=false + """ + # If we are using default container, we don't add flux + if self.container == defaults.container: + return False + + # Nothing set, we assume adding the flux view + value = self.get_attribute("add_flux") + if not value: + return True + + # Otherwise, we only care if it's set to False + return not self.is_false(value) + + def set_wabbits(self, wabbits): + """ + This is a hard requirement to have a list of rabbit nodes. + """ + self.wabbits = wabbits + + @property + def interactive(self): + """ + Determine if the minicluster should be interactive. + This is based on a command being set or not. + """ + return self.command is None + + @property + def always_succeed(self): + """ + Always succeed the job. + + Example: + --setattr=rabbit.mpi.succeed=true + """ + return self.get_attribute("succeed") is not None + + @property + def tasks(self): + """ + Number of tasks to request for the job. + Assume for now the user knows what they are doing. The job + will not be satisfiable if not, and that is user error. + + Example: + --setattr=rabbit.mpi.tasks=96 + """ + return self.get_attribute("tasks") or 0 + + @property + def environment(self): + """ + Derive the default environment plus whatever extra the user has asked for. + + If these are bad/wrong, we might not want to add by default. + + Example: + --setattr=rabbit.mpi.env.one=ketchup --setattr=rabbit.mpi.env.two=mustard + """ + environ = defaults.environment + + # User preferences override + environ.update(self.get_attribute("env") or {}) + return environ + + @property + def rabbits(self): + """ + The user can request specific rabbit names, e.g., a subset. + + Example: + --setattr=rabbit.mpi.rabbits=hetchy201,hetchy202 + """ + return self.get_attribute("rabbits") + + @property + def rabbit_mount(self): + """ + The user can request specific rabbit mount for inside the container + + Example: + --setattr=rabbit.mpi.rabbit_mount=/mnt/wabbit + """ + return self.get_attribute("rabbit_mount") or defaults.rabbit_mount + + @property + def nodes(self): + """ + Number of nodes for the job + Request MUST be less than or equal to number of rabbits. + + Example: + --setattr=rabbit.mpi.nodes=4 + """ + nnodes = self.get_attribute("nodes") + + # This must parse, otherwise we fall back to the number of rabbits requested + if nnodes is not None: + try: + nnodes = int(nnodes) + except ValueError: + LOGGER.warning( + f"Cannot parse user directive for rabbit.mpi.nodes: {nnodes}" + ) + nnodes = None + return nnodes diff --git a/src/python/flux_k8s/operator/volumes.py b/src/python/flux_k8s/operator/volumes.py new file mode 100644 index 00000000..cb570e06 --- /dev/null +++ b/src/python/flux_k8s/operator/volumes.py @@ -0,0 +1,175 @@ +from kubernetes import client +from kubernetes.client.rest import ApiException +import flux_k8s.operator.defaults as defaults +import logging + +LOGGER = logging.getLogger(__name__) + + +def teardown_rabbit_volumes(k8s_api, jobid, namespace): + """ + Common function to teardown rabbits. + + We need to do this after a minicluster is deleted. If a MiniCluster is + not found, assume something might have gone wrong and try to cleanup + anyway. No harm done with 404 response. + """ + # Cleanup PV/PVC last. The order needs to be: + # pods using it + # persistent volume claim + # persistent volume + manager = VolumeManager(k8s_api, jobid=jobid, namespace=namespace) + manager.delete_persistent_volume() + manager.delete_persistent_volume_claim() + + +class VolumeManager: + """ + A class to manage Kubernetes volumes for specific jobs. + """ + + def __init__(self, k8s_api, jobid, namespace="default"): + """ + Initializes the manager with a job ID and namespace. + """ + self.jobid = jobid + self.namespace = namespace + self.k8s_api = k8s_api + + @property + def name(self): + return str(self.jobid) + + def create_persistent_volume(self, rabbits, storage_capacity=None): + """ + Creates a PersistentVolume specifically for a Lustre CSI driver. + + This function converts the provided YAML manifest into Python objects + using the Kubernetes client library. + """ + # TODO can we allow customizing storage capacity? + storage_capacity = storage_capacity or defaults.storage_capacity + + # Claim Reference + # This reserves the PV for a specific PVC. TODO: will we have >1 for a job? + # If so, we can't name based on just the jobid, we need more. + claim_ref = client.V1ObjectReference( + kind="PersistentVolumeClaim", name=self.name, namespace=self.namespace + ) + + # CSI (Container Storage Interface) source + csi_source = client.V1CSIPersistentVolumeSource( + driver=defaults.csi_driver, + volume_handle=defaults.volume_handle, + fs_type=defaults.fs_type, + ) + + # Ensure we bind to the right rabbit nodes + node_affinity = client.V1VolumeNodeAffinity( + required=client.V1NodeSelector( + node_selector_terms=[ + client.V1NodeSelectorTerm( + match_expressions=[ + client.V1NodeSelectorRequirement( + key="kubernetes.io/hostname", + operator="In", + values=rabbits, + ) + ] + ) + ] + ) + ) + + # PersistentVolume Spec + pv_spec = client.V1PersistentVolumeSpec( + capacity={"storage": storage_capacity}, + volume_mode="Filesystem", + access_modes=["ReadWriteMany"], + storage_class_name=defaults.storage_class_name, + persistent_volume_reclaim_policy=defaults.volume_reclaim_policy, + claim_ref=claim_ref, + csi=csi_source, + node_affinity=node_affinity, + ) + + # Final PersistentVolume object + persistent_volume = client.V1PersistentVolume( + metadata=client.V1ObjectMeta(name=self.name), + api_version="v1", + kind="PersistentVolume", + spec=pv_spec, + ) + + # 6. Call the Kubernetes API to create the PersistentVolume + try: + return self.k8s_api.create_persistent_volume(body=persistent_volume) + except ApiException as e: + if e.reason == "AlreadyExists": + print("PV already exists. No action taken.") + else: + print(f"Error creating PersistentVolume {self.name}': {e}") + raise + + def create_persistent_volume_claim(self, storage_capacity=None): + """ + Creates a PersistentVolumeClaim to bind to a specific PersistentVolume. + """ + storage_capacity = storage_capacity or defaults.storage_capacity + + # TODO this assumes 1 rabbit per jobid, which is unlikely. + metadata = client.V1ObjectMeta(name=self.name, namespace=self.namespace) + + # Define the resource requests + resources = client.V1ResourceRequirements( + requests={"storage": defaults.storage_capacity} + ) + + # Define the PVC spec. The key is setting `volume_name` to bind to our specific PV. + pvc_spec = client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteMany"], + storage_class_name=defaults.storage_class_name, + volume_name=self.name, + resources=resources, + ) + + persistent_volume_claim = client.V1PersistentVolumeClaim( + api_version="v1", + kind="PersistentVolumeClaim", + metadata=metadata, + spec=pvc_spec, + ) + + try: + return self.k8s_api.create_namespaced_persistent_volume_claim( + namespace=self.namespace, body=persistent_volume_claim + ) + except ApiException as e: + if e.reason != "AlreadyExists": + print(f"Error creating PersistentVolumeClaim '{self.name}': {e}") + raise + + def delete_persistent_volume_claim(self): + """ + Delete the PersistentVolumeClaim associated with the job. + """ + try: + self.k8s_api.delete_namespaced_persistent_volume_claim( + name=self.name, namespace=self.namespace + ) + except ApiException as e: + if e.status != 404: + print(f"Error deleting PersistentVolumeClaim '{self.name}': {e}") + raise + + def delete_persistent_volume(self): + """ + Deletes the PersistentVolume associated with the job. + Note: The associated PVC should be deleted first. + """ + try: + self.k8s_api.delete_persistent_volume(name=self.name) + except ApiException as e: + if e.status != 404: + print(f"Error deleting PersistentVolume '{self.name}': {e}") + raise diff --git a/src/python/flux_k8s/workflow.py b/src/python/flux_k8s/workflow.py index 930f971f..845ab037 100644 --- a/src/python/flux_k8s/workflow.py +++ b/src/python/flux_k8s/workflow.py @@ -9,7 +9,7 @@ from flux.hostlist import Hostlist from flux_k8s import cleanup, crd - +import flux_k8s.operator.minicluster as flux_operator LOGGER = logging.getLogger(__name__) @@ -102,6 +102,7 @@ def move_to_teardown(self, handle, k8s_api, workflow=None): datamovements = self._get_datamovements(k8s_api) save_workflow_to_kvs(handle, self.jobid, workflow, datamovements) cleanup.teardown_workflow(workflow) + flux_operator.teardown_minicluster(handle, self) self.toredown = True def _get_datamovements(self, k8s_api):