diff --git a/config/logger.conf b/config/logger.conf index 41702768c..d242a84e1 100644 --- a/config/logger.conf +++ b/config/logger.conf @@ -18,7 +18,8 @@ keys=root, timeout, timeout-closing, timeout-holdoff, - trigger + trigger, + job_retry [handlers] keys=consoleHandler, timedRotatingHandler @@ -135,3 +136,9 @@ level=DEBUG handlers=consoleHandler, timedRotatingHandler qualname=trigger propagate=0 + +[logger_job_retry] +level=DEBUG +handlers=consoleHandler, timedRotatingHandler +qualname=job_retry +propagate=0 diff --git a/docker-compose.yaml b/docker-compose.yaml index 94e096058..fae991e71 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -201,3 +201,16 @@ services: - './data/src:/home/kernelci/data/src' - './data/output:/home/kernelci/data/output' - './logs:/home/kernelci/pipeline/logs' + + job-retry: + <<: *base-service + container_name: 'kernelci-pipeline-job-retry' + command: + - './src/job_retry.py' + - '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}' + - 'run' + volumes: + - './config:/home/kernelci/config' + - './logs:/home/kernelci/pipeline/logs' + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/src/job_retry.py b/src/job_retry.py new file mode 100755 index 000000000..ed08c7356 --- /dev/null +++ b/src/job_retry.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# Copyright (C) 2025 Collabora Limited +# Author: Jeny Sadadia + +import sys +import kernelci.config +from kernelci.legacy.cli import Args, Command, parse_opts + +from base import Service + + +class JobRetry(Service): + + def __init__(self, configs, args): + super().__init__(configs, args, 'job_retry') + + def _setup(self, args): + return self._api_helper.subscribe_filters({ + "state": "done", + "result": ("fail", "incomplete"), + "kind": "job" + # ToDo: Retry for build jobs + # "kind": ("kbuild", "job") + }) + + def _stop(self, sub_id): + if sub_id: + self._api_helper.unsubscribe_filters(sub_id) + sys.stdout.flush() + + def _find_parent_kind(self, node, api_helper, kind): + parent_id = node.get('parent') + if not parent_id: + return None + parent_node = api_helper.api.node.get(parent_id) + if not parent_node: + return None + if parent_node.get('kind') == kind: + return parent_node + return self._find_parent_kind(parent_node, api_helper, kind) + + def _run(self, sub_id): + self.log.info("Job retry: Listening for events... ") + self.log.info("Press Ctrl-C to stop.") + + while True: + try: + node, _ = self._api_helper.receive_event_node(sub_id) + self.log.debug(f"Event received: {node['id']}") + except Exception as e: + self.log.error(f"Error receiving event: {e}") + continue + + # Check retry count before submitting a retry + retry_counter = node.get("retry_counter", 0) + self.log.debug(f"{node['id']}: Node current retry_counter: {retry_counter}") + if retry_counter >= 3: + self.log.info(f"{node['id']} Job has already retried 3 times. \ +Not submitting a retry.") + continue + + parent_kind = None + if node.get('kind') == 'job': + parent_kind = 'kbuild' + # ToDo: retry build jobs + # if node.get("kind") == "kbuild": + # parent_kind = "checkout" + if parent_kind: + event_data = self._find_parent_kind(node, self._api_helper, parent_kind) + if not event_data: + self.log.error(f"Not able to find parent node for {node['id']}") + continue + event_data["jobfilter"] = [node["name"]] + event_data["platform_filter"] = [node["data"].get("platform")] + event_data["retry_counter"] = retry_counter + 1 + event_data["debug"] = {"retry_by": str(node["id"])} + self.log.debug(f"{node['id']}:Event data retry_counter: {event_data['retry_counter']}") + event = {'data': event_data} + self._api_helper.api.send_event('retry', event) + self.log.info(f"Job retry for node {node['id']} submitted. Parent node: {event_data['id']}") + self.log.debug(f"Event:{event}") + else: + self.log.error(f"Not able to retry the job as parent kind is unknown: {node['id']}") + return True + + +class cmd_run(Command): + help = "Retry failed/incomplete builds and tests" + args = [Args.api_config] + + def __call__(self, configs, args): + return JobRetry(configs, args).run(args) + + +if __name__ == '__main__': + opts = parse_opts('job_retry', globals()) + yaml_configs = opts.get_yaml_configs() or 'config' + configs = kernelci.config.load(yaml_configs) + status = opts.command(configs, opts) + sys.exit(0 if status is True else 1) diff --git a/src/scheduler.py b/src/scheduler.py index 099b45b1d..9845faebf 100755 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -85,6 +85,12 @@ def __init__(self, configs, args): self._storage_config, storage_cred ) self._job_tmp_dirs = {} + self._threads = [] + self._api_helper_lock = threading.Lock() + self._stop_thread_lock = threading.Lock() + self._context_lock = threading.Lock() + self._context = {} + self._stop_thread = False def _get_runtimes_configs(self, configs, runtimes): runtimes_configs = {} @@ -105,11 +111,19 @@ def _cleanup_paths(self): # ToDo: if stat != 0 then report error to API? def _setup(self, args): - return self._api.subscribe('node') - - def _stop(self, sub_id): - if sub_id: - self._api_helper.unsubscribe_filters(sub_id) + node_sub_id = self._api.subscribe('node') + self.log.debug(f"Node channel sub id: {node_sub_id}") + retry_sub_id = self._api.subscribe('retry') + self.log.debug(f"Retry channel sub id: {retry_sub_id}") + self._context = {"node": node_sub_id, "retry": retry_sub_id} + return {"node": node_sub_id, "retry": retry_sub_id} + + def _stop(self, context): + self._stop_thread = True + for _, sub_id in self._context.items(): + if sub_id: + self.log.info(f"Unsubscribing: {sub_id}") + self._api_helper.unsubscribe_filters(sub_id) self._cleanup_paths() def backup_cleanup(self): @@ -144,11 +158,11 @@ def backup_job(self, filename, nodeid): except Exception as e: self.log.error(f"Failed to backup {filename} to {new_filename}: {e}") - def _run_job(self, job_config, runtime, platform, input_node): + def _run_job(self, job_config, runtime, platform, input_node, retry_counter): try: node = self._api_helper.create_job_node(job_config, input_node, - runtime, platform) + runtime, platform, retry_counter) except KeyError as e: self.log.error(' '.join([ input_node['id'], @@ -162,6 +176,7 @@ def _run_job(self, job_config, runtime, platform, input_node): if not node: return + self.log.debug(f"Job node created: {node['id']}. Parent: {node['parent']}") # Most of the time, the artifacts we need originate from the parent # node. Import those into the current node, working on a copy so the # original node doesn't get "polluted" with useless artifacts when we @@ -371,28 +386,45 @@ def _verify_architecture_filter(self, job, node): return False return True - def _run(self, sub_id): + def _run(self, context): + for channel, sub_id in self._context.items(): + thread = threading.Thread(target=self._run_scheduler, args=(channel, sub_id,)) + self._threads.append(thread) + thread.start() + + for thread in self._threads: + thread.join() + + def _run_scheduler(self, channel, sub_id): self.log.info("Listening for available checkout events") self.log.info("Press Ctrl-C to stop.") subscribe_retries = 0 while True: + with self._stop_thread_lock: + if self._stop_thread: + break last_heartbeat['time'] = time.time() event = None try: event = self._api_helper.receive_event_data(sub_id, block=False) + if not event: + # If we received a keep-alive event, just continue + continue except Exception as e: - self.log.error(f"Error receiving event: {e}, re-subscribing in 10 seconds") - time.sleep(10) - sub_id = self._api.subscribe('node') + with self._stop_thread_lock: + if self._stop_thread: + break + self.log.error(f"Error receiving event: {e}") + self.log.debug(f"Re-subscribing to channel: {channel}") + sub_id = self._api.subscribe(channel) + with self._context_lock: + self._context[channel] = sub_id subscribe_retries += 1 if subscribe_retries > 3: - self.log.error("Failed to re-subscribe to node events") + self.log.error(f"Failed to re-subscribe to channel: {channel}") return False continue - if not event: - # If we received a keep-alive event, just continue - continue subscribe_retries = 0 for job, runtime, platform, rules in self._sched.get_schedule(event): input_node = self._api.node.get(event['id']) @@ -400,14 +432,20 @@ def _run(self, sub_id): # Add to node data the jobfilter if it exists in event if jobfilter and isinstance(jobfilter, list): input_node['jobfilter'] = jobfilter + platform_filter = event.get('platform_filter') + if platform_filter and isinstance(platform_filter, list): + input_node['platform_filter'] = platform_filter # we cannot use rules, as we need to have info about job too if job.params.get('frequency', None): if not self._verify_frequency(job, input_node, platform): continue if not self._verify_architecture_filter(job, input_node): continue - if self._api_helper.should_create_node(rules, input_node): - self._run_job(job, runtime, platform, input_node) + with self._api_helper_lock: + flag = self._api_helper.should_create_node(rules, input_node) + if flag: + retry_counter = event.get('retry_counter', 0) + self._run_job(job, runtime, platform, input_node, retry_counter) return True