Skip to content

Commit 9ce7f0d

Browse files
author
Jeny Sadadia
committed
src/scheduler: schedule a job retry
Implement threading to listen to 2 channels simultaneously including `node` and `retry`. Schedule a job retry by passing `retry_counter`and `platform_filter` while creating a job node. Signed-off-by: Jeny Sadadia <[email protected]>
1 parent 528a548 commit 9ce7f0d

File tree

1 file changed

+33
-13
lines changed

1 file changed

+33
-13
lines changed

src/scheduler.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
# SPDX-License-Identifier: LGPL-2.1-or-later
44
#
5-
# Copyright (C) 2021, 2022, 2023 Collabora Limited
5+
# Copyright (C) 2021-2025 Collabora Limited
66
# Author: Guillaume Tucker <[email protected]>
77
# Author: Jeny Sadadia <[email protected]>
88

@@ -105,11 +105,16 @@ def _cleanup_paths(self):
105105
# ToDo: if stat != 0 then report error to API?
106106

107107
def _setup(self, args):
108-
return self._api.subscribe('node')
109-
110-
def _stop(self, sub_id):
111-
if sub_id:
112-
self._api_helper.unsubscribe_filters(sub_id)
108+
node_sub_id = self._api.subscribe('node')
109+
self.log.debug(f"Node channel sub id: {node_sub_id}")
110+
retry_sub_id = self._api.subscribe('retry')
111+
self.log.debug(f"Retry channel sub id: {retry_sub_id}")
112+
return [node_sub_id, retry_sub_id]
113+
114+
def _stop(self, sub_ids):
115+
for sub_id in sub_ids:
116+
if sub_id:
117+
self._api_helper.unsubscribe_filters(sub_id)
113118
self._cleanup_paths()
114119

115120
def backup_cleanup(self):
@@ -144,11 +149,11 @@ def backup_job(self, filename, nodeid):
144149
except Exception as e:
145150
self.log.error(f"Failed to backup {filename} to {new_filename}: {e}")
146151

147-
def _run_job(self, job_config, runtime, platform, input_node):
152+
def _run_job(self, job_config, runtime, platform, input_node, retry_counter):
148153
try:
149154
node = self._api_helper.create_job_node(job_config,
150155
input_node,
151-
runtime, platform)
156+
runtime, platform, retry_counter)
152157
except KeyError as e:
153158
self.log.error(' '.join([
154159
input_node['id'],
@@ -162,6 +167,7 @@ def _run_job(self, job_config, runtime, platform, input_node):
162167

163168
if not node:
164169
return
170+
self.log.debug(f"Job node created: {node['id']}. Parent: f{node['parent']}")
165171
# Most of the time, the artifacts we need originate from the parent
166172
# node. Import those into the current node, working on a copy so the
167173
# original node doesn't get "polluted" with useless artifacts when we
@@ -371,7 +377,17 @@ def _verify_architecture_filter(self, job, node):
371377
return False
372378
return True
373379

374-
def _run(self, sub_id):
380+
def _run(self, sub_ids):
381+
threads = []
382+
for sub_id in sub_ids:
383+
thread = threading.Thread(target=self._run_scheduler, args=(sub_id,))
384+
threads.append(thread)
385+
thread.start()
386+
387+
for thread in threads:
388+
thread.join()
389+
390+
def _run_scheduler(self, sub_id):
375391
self.log.info("Listening for available checkout events")
376392
self.log.info("Press Ctrl-C to stop.")
377393

@@ -380,26 +396,30 @@ def _run(self, sub_id):
380396
event = None
381397
try:
382398
event = self._api_helper.receive_event_data(sub_id, block=False)
399+
if not event:
400+
# If we received a keep-alive event, just continue
401+
continue
383402
except Exception as e:
384403
self.log.error(f"Error receiving event: {e}")
385404
continue
386-
if not event:
387-
# If we received a keep-alive event, just continue
388-
continue
389405
for job, runtime, platform, rules in self._sched.get_schedule(event):
390406
input_node = self._api.node.get(event['id'])
391407
jobfilter = event.get('jobfilter')
392408
# Add to node data the jobfilter if it exists in event
393409
if jobfilter and isinstance(jobfilter, list):
394410
input_node['jobfilter'] = jobfilter
411+
platform_filter = event.get('platform_filter')
412+
if platform_filter and isinstance(platform_filter, list):
413+
input_node['platform_filter'] = platform_filter
395414
# we cannot use rules, as we need to have info about job too
396415
if job.params.get('frequency', None):
397416
if not self._verify_frequency(job, input_node, platform):
398417
continue
399418
if not self._verify_architecture_filter(job, input_node):
400419
continue
401420
if self._api_helper.should_create_node(rules, input_node):
402-
self._run_job(job, runtime, platform, input_node)
421+
retry_counter = event.get('retry_counter', 0)
422+
self._run_job(job, runtime, platform, input_node, retry_counter)
403423

404424
return True
405425

0 commit comments

Comments
 (0)