@@ -85,6 +85,12 @@ def __init__(self, configs, args):
8585 self ._storage_config , storage_cred
8686 )
8787 self ._job_tmp_dirs = {}
88+ self ._threads = []
89+ self ._api_helper_lock = threading .Lock ()
90+ self ._stop_thread_lock = threading .Lock ()
91+ self ._context_lock = threading .Lock ()
92+ self ._context = {}
93+ self ._stop_thread = False
8894
8995 def _get_runtimes_configs (self , configs , runtimes ):
9096 runtimes_configs = {}
@@ -105,11 +111,19 @@ def _cleanup_paths(self):
105111 # ToDo: if stat != 0 then report error to API?
106112
107113 def _setup (self , args ):
108- return self ._api .subscribe ('node' )
109-
110- def _stop (self , sub_id ):
111- if sub_id :
112- self ._api_helper .unsubscribe_filters (sub_id )
114+ node_sub_id = self ._api .subscribe ('node' )
115+ self .log .debug (f"Node channel sub id: { node_sub_id } " )
116+ retry_sub_id = self ._api .subscribe ('retry' )
117+ self .log .debug (f"Retry channel sub id: { retry_sub_id } " )
118+ self ._context = {"node" : node_sub_id , "retry" : retry_sub_id }
119+ return {"node" : node_sub_id , "retry" : retry_sub_id }
120+
121+ def _stop (self , context ):
122+ self ._stop_thread = True
123+ for _ , sub_id in self ._context .items ():
124+ if sub_id :
125+ self .log .info (f"Unsubscribing: { sub_id } " )
126+ self ._api_helper .unsubscribe_filters (sub_id )
113127 self ._cleanup_paths ()
114128
115129 def backup_cleanup (self ):
@@ -144,11 +158,11 @@ def backup_job(self, filename, nodeid):
144158 except Exception as e :
145159 self .log .error (f"Failed to backup { filename } to { new_filename } : { e } " )
146160
147- def _run_job (self , job_config , runtime , platform , input_node ):
161+ def _run_job (self , job_config , runtime , platform , input_node , retry_counter ):
148162 try :
149163 node = self ._api_helper .create_job_node (job_config ,
150164 input_node ,
151- runtime , platform )
165+ runtime , platform , retry_counter )
152166 except KeyError as e :
153167 self .log .error (' ' .join ([
154168 input_node ['id' ],
@@ -162,6 +176,7 @@ def _run_job(self, job_config, runtime, platform, input_node):
162176
163177 if not node :
164178 return
179+ self .log .debug (f"Job node created: { node ['id' ]} . Parent: { node ['parent' ]} " )
165180 # Most of the time, the artifacts we need originate from the parent
166181 # node. Import those into the current node, working on a copy so the
167182 # original node doesn't get "polluted" with useless artifacts when we
@@ -371,43 +386,66 @@ def _verify_architecture_filter(self, job, node):
371386 return False
372387 return True
373388
374- def _run (self , sub_id ):
389+ def _run (self , context ):
390+ for channel , sub_id in self ._context .items ():
391+ thread = threading .Thread (target = self ._run_scheduler , args = (channel , sub_id ,))
392+ self ._threads .append (thread )
393+ thread .start ()
394+
395+ for thread in self ._threads :
396+ thread .join ()
397+
398+ def _run_scheduler (self , channel , sub_id ):
375399 self .log .info ("Listening for available checkout events" )
376400 self .log .info ("Press Ctrl-C to stop." )
377401 subscribe_retries = 0
378402
379403 while True :
404+ with self ._stop_thread_lock :
405+ if self ._stop_thread :
406+ break
380407 last_heartbeat ['time' ] = time .time ()
381408 event = None
382409 try :
383410 event = self ._api_helper .receive_event_data (sub_id , block = False )
411+ if not event :
412+ # If we received a keep-alive event, just continue
413+ continue
384414 except Exception as e :
385- self .log .error (f"Error receiving event: { e } , re-subscribing in 10 seconds" )
386- time .sleep (10 )
387- sub_id = self ._api .subscribe ('node' )
415+ with self ._stop_thread_lock :
416+ if self ._stop_thread :
417+ break
418+ self .log .error (f"Error receiving event: { e } " )
419+ self .log .debug (f"Re-subscribing to channel: { channel } " )
420+ sub_id = self ._api .subscribe (channel )
421+ with self ._context_lock :
422+ self ._context [channel ] = sub_id
388423 subscribe_retries += 1
389424 if subscribe_retries > 3 :
390- self .log .error ("Failed to re-subscribe to node events " )
425+ self .log .error (f "Failed to re-subscribe to channel: { channel } " )
391426 return False
392427 continue
393- if not event :
394- # If we received a keep-alive event, just continue
395- continue
396428 subscribe_retries = 0
397429 for job , runtime , platform , rules in self ._sched .get_schedule (event ):
398430 input_node = self ._api .node .get (event ['id' ])
399431 jobfilter = event .get ('jobfilter' )
400432 # Add to node data the jobfilter if it exists in event
401433 if jobfilter and isinstance (jobfilter , list ):
402434 input_node ['jobfilter' ] = jobfilter
435+ platform_filter = event .get ('platform_filter' )
436+ if platform_filter and isinstance (platform_filter , list ):
437+ input_node ['platform_filter' ] = platform_filter
403438 # we cannot use rules, as we need to have info about job too
404439 if job .params .get ('frequency' , None ):
405440 if not self ._verify_frequency (job , input_node , platform ):
406441 continue
407442 if not self ._verify_architecture_filter (job , input_node ):
408443 continue
409- if self ._api_helper .should_create_node (rules , input_node ):
410- self ._run_job (job , runtime , platform , input_node )
444+ with self ._api_helper_lock :
445+ flag = self ._api_helper .should_create_node (rules , input_node )
446+ if flag :
447+ retry_counter = event .get ('retry_counter' , 0 )
448+ self ._run_job (job , runtime , platform , input_node , retry_counter )
411449
412450 return True
413451
0 commit comments