22#
33# SPDX-License-Identifier: LGPL-2.1-or-later
44#
5- # Copyright (C) 2021, 2022, 2023 Collabora Limited
5+ # Copyright (C) 2021-2025 Collabora Limited
66# Author: Guillaume Tucker <[email protected] > 77# Author: Jeny Sadadia <[email protected] > 88
@@ -105,11 +105,16 @@ def _cleanup_paths(self):
105105 # ToDo: if stat != 0 then report error to API?
106106
107107 def _setup (self , args ):
108- return self ._api .subscribe ('node' )
109-
110- def _stop (self , sub_id ):
111- if sub_id :
112- self ._api_helper .unsubscribe_filters (sub_id )
108+ node_sub_id = self ._api .subscribe ('node' )
109+ self .log .debug (f"Node channel sub id: { node_sub_id } " )
110+ retry_sub_id = self ._api .subscribe ('retry' )
111+ self .log .debug (f"Retry channel sub id: { retry_sub_id } " )
112+ return [node_sub_id , retry_sub_id ]
113+
114+ def _stop (self , sub_ids ):
115+ for sub_id in sub_ids :
116+ if sub_id :
117+ self ._api_helper .unsubscribe_filters (sub_id )
113118 self ._cleanup_paths ()
114119
115120 def backup_cleanup (self ):
@@ -144,11 +149,11 @@ def backup_job(self, filename, nodeid):
144149 except Exception as e :
145150 self .log .error (f"Failed to backup { filename } to { new_filename } : { e } " )
146151
147- def _run_job (self , job_config , runtime , platform , input_node ):
152+ def _run_job (self , job_config , runtime , platform , input_node , retry_counter ):
148153 try :
149154 node = self ._api_helper .create_job_node (job_config ,
150155 input_node ,
151- runtime , platform )
156+ runtime , platform , retry_counter )
152157 except KeyError as e :
153158 self .log .error (' ' .join ([
154159 input_node ['id' ],
@@ -162,6 +167,7 @@ def _run_job(self, job_config, runtime, platform, input_node):
162167
163168 if not node :
164169 return
170+ self .log .debug (f"Job node created: { node ['id' ]} . Parent: f{ node ['parent' ]} " )
165171 # Most of the time, the artifacts we need originate from the parent
166172 # node. Import those into the current node, working on a copy so the
167173 # original node doesn't get "polluted" with useless artifacts when we
@@ -371,7 +377,17 @@ def _verify_architecture_filter(self, job, node):
371377 return False
372378 return True
373379
374- def _run (self , sub_id ):
380+ def _run (self , sub_ids ):
381+ threads = []
382+ for sub_id in sub_ids :
383+ thread = threading .Thread (target = self ._run_scheduler , args = (sub_id ,))
384+ threads .append (thread )
385+ thread .start ()
386+
387+ for thread in threads :
388+ thread .join ()
389+
390+ def _run_scheduler (self , sub_id ):
375391 self .log .info ("Listening for available checkout events" )
376392 self .log .info ("Press Ctrl-C to stop." )
377393 subscribe_retries = 0
@@ -381,33 +397,38 @@ def _run(self, sub_id):
381397 event = None
382398 try :
383399 event = self ._api_helper .receive_event_data (sub_id , block = False )
400+ if not event :
401+ # If we received a keep-alive event, just continue
402+ continue
384403 except Exception as e :
385404 self .log .error (f"Error receiving event: { e } , re-subscribing in 10 seconds" )
386- time .sleep (10 )
387- sub_id = self ._api .subscribe ('node' )
388- subscribe_retries += 1
389- if subscribe_retries > 3 :
390- self .log .error ("Failed to re-subscribe to node events" )
391- return False
392- continue
393- if not event :
394- # If we received a keep-alive event, just continue
405+ # time.sleep(10)
406+ # sub_id = self._api.subscribe('node')
407+ # subscribe_retries += 1
408+ # if subscribe_retries > 3:
409+ # self.log.error("Failed to re-subscribe to node events")
410+ # return False
395411 continue
396- subscribe_retries = 0
412+ # subscribe_retries = 0
413+ self .log .debug (f"Event received: { sub_id } :{ event ['id' ]} :{ event .get ('debug' )} :{ event .get ('retry_counter' )} " )
397414 for job , runtime , platform , rules in self ._sched .get_schedule (event ):
398415 input_node = self ._api .node .get (event ['id' ])
399416 jobfilter = event .get ('jobfilter' )
400417 # Add to node data the jobfilter if it exists in event
401418 if jobfilter and isinstance (jobfilter , list ):
402419 input_node ['jobfilter' ] = jobfilter
420+ platform_filter = event .get ('platform_filter' )
421+ if platform_filter and isinstance (platform_filter , list ):
422+ input_node ['platform_filter' ] = platform_filter
403423 # we cannot use rules, as we need to have info about job too
404424 if job .params .get ('frequency' , None ):
405425 if not self ._verify_frequency (job , input_node , platform ):
406426 continue
407427 if not self ._verify_architecture_filter (job , input_node ):
408428 continue
409429 if self ._api_helper .should_create_node (rules , input_node ):
410- self ._run_job (job , runtime , platform , input_node )
430+ retry_counter = event .get ('retry_counter' , 0 )
431+ self ._run_job (job , runtime , platform , input_node , retry_counter )
411432
412433 return True
413434
0 commit comments