22#
33# SPDX-License-Identifier: LGPL-2.1-or-later
44#
5- # Copyright (C) 2021, 2022, 2023 Collabora Limited
5+ # Copyright (C) 2021-2025 Collabora Limited
66# Author: Guillaume Tucker <[email protected] > 77# Author: Jeny Sadadia <[email protected] > 88
@@ -105,11 +105,17 @@ def _cleanup_paths(self):
105105 # ToDo: if stat != 0 then report error to API?
106106
107107 def _setup (self , args ):
108- return self ._api .subscribe ('node' )
109-
110- def _stop (self , sub_id ):
111- if sub_id :
112- self ._api_helper .unsubscribe_filters (sub_id )
108+ # return self._api.subscribe('node')
109+ node_sub_id = self ._api .subscribe ('node' )
110+ self .log .debug (f"Node channel sub id: { node_sub_id } " )
111+ retry_sub_id = self ._api .subscribe ('retry' )
112+ self .log .debug (f"Retry channel sub id: { retry_sub_id } " )
113+ return [node_sub_id , retry_sub_id ]
114+
115+ def _stop (self , sub_ids ):
116+ for sub_id in sub_ids :
117+ if sub_id :
118+ self ._api_helper .unsubscribe_filters (sub_id )
113119 self ._cleanup_paths ()
114120
115121 def backup_cleanup (self ):
@@ -162,6 +168,7 @@ def _run_job(self, job_config, runtime, platform, input_node, retry_counter):
162168
163169 if not node :
164170 return
171+ self .log .debug (f"Job node created: { node ['id' ]} . Parent: f{ node ['parent' ]} " )
165172 # Most of the time, the artifacts we need originate from the parent
166173 # node. Import those into the current node, working on a copy so the
167174 # original node doesn't get "polluted" with useless artifacts when we
@@ -371,7 +378,17 @@ def _verify_architecture_filter(self, job, node):
371378 return False
372379 return True
373380
374- def _run (self , sub_id ):
381+ def _run (self , sub_ids ):
382+ threads = []
383+ for sub_id in sub_ids :
384+ thread = threading .Thread (target = self ._run_scheduler , args = (sub_id ,))
385+ threads .append (thread )
386+ thread .start ()
387+
388+ for thread in threads :
389+ thread .join ()
390+
391+ def _run_scheduler (self , sub_id ):
375392 self .log .info ("Listening for available checkout events" )
376393 self .log .info ("Press Ctrl-C to stop." )
377394 subscribe_retries = 0
@@ -381,25 +398,29 @@ def _run(self, sub_id):
381398 event = None
382399 try :
383400 event = self ._api_helper .receive_event_data (sub_id , block = False )
401+ if not event :
402+ # If we received a keep-alive event, just continue
403+ continue
384404 except Exception as e :
385405 self .log .error (f"Error receiving event: { e } , re-subscribing in 10 seconds" )
386- time .sleep (10 )
387- sub_id = self ._api .subscribe ('node' )
388- subscribe_retries += 1
389- if subscribe_retries > 3 :
390- self .log .error ("Failed to re-subscribe to node events" )
391- return False
392- continue
393- if not event :
394- # If we received a keep-alive event, just continue
406+ # time.sleep(10)
407+ # sub_id = self._api.subscribe('node')
408+ # subscribe_retries += 1
409+ # if subscribe_retries > 3:
410+ # self.log.error("Failed to re-subscribe to node events")
411+ # return False
395412 continue
396- subscribe_retries = 0
413+ # subscribe_retries = 0
414+ self .log .debug (f"Event received: { sub_id } :{ event ['id' ]} :{ event .get ('debug' )} :{ event .get ('retry_counter' )} " )
397415 for job , runtime , platform , rules in self ._sched .get_schedule (event ):
398416 input_node = self ._api .node .get (event ['id' ])
399417 jobfilter = event .get ('jobfilter' )
400418 # Add to node data the jobfilter if it exists in event
401419 if jobfilter and isinstance (jobfilter , list ):
402420 input_node ['jobfilter' ] = jobfilter
421+ platform_filter = event .get ('platform_filter' )
422+ if platform_filter and isinstance (platform_filter , list ):
423+ input_node ['platform_filter' ] = platform_filter
403424 # we cannot use rules, as we need to have info about job too
404425 if job .params .get ('frequency' , None ):
405426 if not self ._verify_frequency (job , input_node , platform ):
@@ -448,5 +469,11 @@ def __call__(self, configs, args):
448469 opts = parse_opts ('scheduler' , globals ())
449470 yaml_configs = opts .get_yaml_configs () or 'config'
450471 configs = kernelci .config .load (yaml_configs )
472+ # sub_ids = sch.setup(opts)
473+ # threads = []
474+ # for sub_id in sub_ids:
475+ # thread = threading.Thread(target=sch._run_scheduler, args=(sub_id,))
476+ # threads.append(thread)
477+ # thread.start()
451478 status = opts .command (configs , opts )
452479 sys .exit (0 if status is True else 1 )
0 commit comments