69
69
# This is hard cap of 4 given the maximum preemption chain length of 4
70
70
MAX_CONCURRENT_VMSS_OPERATIONS = 4
71
71
72
+ # These global lists will be used for maintaining lists of machines that failed during reimaging
73
+ vms_absent_from_vmss = []
74
+ vms_timed_out_being_reimaged = []
75
+
72
76
# These global lists will be used for maintaining lists of ongoing operations on specific machines
73
77
vms_currently_being_reimaged = []
74
78
vms_currently_being_deleted = []
@@ -104,10 +108,6 @@ class Azure(Machinery):
104
108
WINDOWS_PLATFORM = "windows"
105
109
LINUX_PLATFORM = "linux"
106
110
107
- # Statuses for machines
108
- ABSENT = "absent_vm"
109
- REIMAGE_FAILING = "failed_reimaging"
110
-
111
111
def set_options (self , options : dict ) -> None :
112
112
"""Set machine manager options.
113
113
@param options: machine manager options dict.
@@ -118,11 +118,20 @@ def set_options(self, options: dict) -> None:
118
118
if not isinstance (mmanager_opts ["scale_sets" ], list ):
119
119
mmanager_opts ["scale_sets" ] = str (mmanager_opts ["scale_sets" ]).strip ().split ("," )
120
120
121
+ def initialize (self ):
122
+ """
123
+ Overloading abstracts.py:_initialize()
124
+ """
125
+ # Load.
126
+ self ._initialize ()
127
+
128
+ # Run initialization checks.
129
+ self ._initialize_check ()
130
+
121
131
def _initialize (self ):
122
132
"""
123
133
Overloading abstracts.py:_initialize()
124
134
Read configuration.
125
- @param module_name: module name
126
135
@raise CuckooDependencyError: if there is a problem with the dependencies call
127
136
"""
128
137
mmanager_opts = self .options .get (self .module_name )
@@ -285,7 +294,8 @@ def _set_vmss_stage(self):
285
294
286
295
self ._process_pre_existing_vmsss ()
287
296
self ._check_cpu_cores ()
288
- self ._get_or_upsert_vmsss (self .required_vmsss )
297
+ self ._update_or_create_vmsss (self .required_vmsss )
298
+ self ._check_locked_machines ()
289
299
self ._create_batch_threads ()
290
300
291
301
def _process_pre_existing_vmsss (self ):
@@ -403,7 +413,7 @@ def _check_cpu_cores(self):
403
413
else :
404
414
self .instance_type_cpus = self .options .az .instance_type_cores
405
415
406
- def _get_or_upsert_vmsss (self , vmsss_dict ):
416
+ def _update_or_create_vmsss (self , vmsss_dict ):
407
417
"""
408
418
Reimage or scale up existing VMSSs. Create non-existant required VMSSs.
409
419
"""
@@ -432,6 +442,17 @@ def _get_or_upsert_vmsss(self, vmsss_dict):
432
442
for thr in vmss_reimage_threads + vmss_creation_threads :
433
443
thr .join ()
434
444
445
+ def _check_locked_machines (self ):
446
+ """
447
+ In the case of CAPE unexpectedly restarting, release any locked machines.
448
+ They will have been reimaged and their tasks rescheduled before reaching this code.
449
+ """
450
+ running = self .running ()
451
+ if len (running ) > 0 :
452
+ log .info ("%d machines found locked on initialize, unlocking." , len (running ))
453
+ for machine in running :
454
+ self .db .unlock_machine (machine )
455
+
435
456
def _create_batch_threads (self ):
436
457
"""
437
458
Create batch reimage and delete threads.
@@ -491,9 +512,13 @@ def release(self, machine: Machine):
491
512
@param label: machine label.
492
513
"""
493
514
vmss_name = machine .label .split ("_" )[0 ]
494
- if machine .status == Azure . ABSENT :
515
+ if machine .label in vms_absent_from_vmss :
495
516
self .delete_machine (machine .label , delete_from_vmss = False )
496
- elif machine .status == Azure .REIMAGE_FAILING or machine_pools [vmss_name ]["is_scaling_down" ]:
517
+ vms_absent_from_vmss .remove (machine .label )
518
+ elif machine .label in vms_timed_out_being_reimaged :
519
+ self .delete_machine (machine .label )
520
+ vms_timed_out_being_reimaged .remove (machine .label )
521
+ elif machine_pools [vmss_name ]["is_scaling_down" ]:
497
522
self .delete_machine (machine .label )
498
523
else :
499
524
_ = super (Azure , self ).release (machine )
@@ -640,7 +665,7 @@ def _add_machines_to_db(self, vmss_name):
640
665
):
641
666
# VMs not deleted from VMSS yet.
642
667
continue
643
- self ._get_or_upsert_vmsss (vmsss_dict = {vmss_name : self .required_vmsss [vmss_name ]})
668
+ self ._update_or_create_vmsss (vmsss_dict = {vmss_name : self .required_vmsss [vmss_name ]})
644
669
return
645
670
log .debug (f"{ vmss_name } initialize retry failed. Timed out waiting for VMs to be deleted." )
646
671
@@ -1288,7 +1313,7 @@ def _thr_reimage_list_reader(self):
1288
1313
vms_currently_being_deleted .append (f"{ vmss_to_reimage } _{ instance_id } " )
1289
1314
with delete_lock :
1290
1315
delete_vm_list .append ({"vmss" : vmss_to_reimage , "id" : instance_id , "time_added" : time .time ()})
1291
- self . set_status (f"{ vmss_to_reimage } _{ instance_id } " , Azure . ABSENT )
1316
+ vms_absent_from_vmss . append (f"{ vmss_to_reimage } _{ instance_id } " )
1292
1317
vms_currently_being_reimaged .remove (f"{ vmss_to_reimage } _{ instance_id } " )
1293
1318
instance_ids .remove (instance_id )
1294
1319
@@ -1309,7 +1334,7 @@ def _thr_reimage_list_reader(self):
1309
1334
)
1310
1335
# That sucks, now we have mark each one for deletion
1311
1336
for instance_id in instance_ids :
1312
- self . set_status (f"{ vmss_to_reimage } _{ instance_id } " , Azure . REIMAGE_FAILING )
1337
+ vms_timed_out_being_reimaged . append (f"{ vmss_to_reimage } _{ instance_id } " )
1313
1338
break
1314
1339
time .sleep (2 )
1315
1340
0 commit comments