@@ -110,6 +110,10 @@ def set_options(self, options: dict) -> None:
110
110
@param options: machine manager options dict.
111
111
"""
112
112
self .options = options
113
+ # Using "scale_sets" here instead of "machines" to avoid KeyError
114
+ mmanager_opts = self .options .get (self .module_name )
115
+ if not isinstance (mmanager_opts ["scale_sets" ], list ):
116
+ mmanager_opts ["scale_sets" ] = str (mmanager_opts ["scale_sets" ]).strip ().split ("," )
113
117
114
118
def _initialize (self ):
115
119
"""
@@ -118,11 +122,7 @@ def _initialize(self):
118
122
@param module_name: module name
119
123
@raise CuckooDependencyError: if there is a problem with the dependencies call
120
124
"""
121
- # Using "scale_sets" here instead of "machines" to avoid KeyError
122
125
mmanager_opts = self .options .get (self .module_name )
123
- if not isinstance (mmanager_opts ["scale_sets" ], list ):
124
- mmanager_opts ["scale_sets" ] = str (mmanager_opts ["scale_sets" ]).strip ().split ("," )
125
-
126
126
# Replace a list of IDs with dictionary representations
127
127
scale_sets = mmanager_opts .pop ("scale_sets" )
128
128
mmanager_opts ["scale_sets" ] = {}
@@ -150,7 +150,7 @@ def _initialize(self):
150
150
# Insert the scale_set_opts into the module.scale_sets attribute
151
151
mmanager_opts ["scale_sets" ][scale_set_id ] = scale_set_opts
152
152
153
- except (AttributeError , CuckooOperationalError ) as e :
153
+ except (AttributeError , CuckooCriticalError ) as e :
154
154
log .warning (f"Configuration details about scale set { scale_set_id .strip ()} are missing: { e } " )
155
155
continue
156
156
@@ -168,19 +168,21 @@ def _initialize_check(self):
168
168
169
169
# We will be using this as a source of truth for the VMSS configs
170
170
self .required_vmsss = {
171
- vmss_name : {"exists" : False , "image" : None , "platform" : None , "tag" : None , "initial_pool_size" : None }
171
+ vmss_name : {
172
+ "exists" : False , "image" : None , "platform" : None , "tag" : None , "initial_pool_size" : None , "retries" : self .options .az .init_retries
173
+ }
172
174
for vmss_name in self .options .az .scale_sets
173
175
}
174
176
175
177
# Starting the thread that sets API clients periodically
176
178
self ._thr_refresh_clients ()
177
179
178
- # Starting the thread that scales the machine pools periodically
179
- self ._thr_machine_pool_monitor ()
180
-
181
180
# Initialize the VMSSs that we will be using and not using
182
181
self ._set_vmss_stage ()
183
182
183
+ # Starting the thread that scales the machine pools periodically
184
+ self ._thr_machine_pool_monitor ()
185
+
184
186
# Set the flag that indicates that the system is not initializing
185
187
self .initializing = False
186
188
@@ -189,7 +191,6 @@ def _get_credentials(self):
189
191
Used to instantiate the Azure ClientSecretCredential object.
190
192
@return: an Azure ClientSecretCredential object
191
193
"""
192
-
193
194
credentials = None
194
195
if self .options .az .secret and self .options .az .secret != "<secret>" :
195
196
# Instantiates the ClientSecretCredential object using
@@ -279,6 +280,19 @@ def _set_vmss_stage(self):
279
280
elif required_vmss_values ["initial_pool_size" ] is None :
280
281
raise CuckooCriticalError (f"The VMSS '{ required_vmss_name } ' does not have an initial pool size." )
281
282
283
+ self ._process_existing_vmsss ()
284
+ self ._process_pre_existing_vmsss (self .required_vmsss )
285
+ self ._create_batch_threads ()
286
+
287
+ def _process_pre_existing_vmsss (self ):
288
+ """
289
+ Delete a VMSS if it does NOT have:
290
+ - the expected tag AND has one of the required names for a VMSS we plan to create
291
+ - one of the required names AND has the expected tag AND az.config's multiple_capes_in_sandbox_rg is 'false'
292
+ Update a VMSS if it:
293
+ - does not have the required image reference
294
+ - has a capacity (current size) different from its required 'initial_pool_size'
295
+ """
282
296
# Get all VMSSs in Resource Group
283
297
existing_vmsss = Azure ._azure_api_call (
284
298
self .options .az .sandbox_resource_group ,
@@ -292,7 +306,7 @@ def _set_vmss_stage(self):
292
306
# Cuckoo (AUTO_SCALE_CAPE key-value pair), ignore
293
307
if not vmss .tags or not vmss .tags .get (Azure .AUTO_SCALE_CAPE_KEY ) == Azure .AUTO_SCALE_CAPE_VALUE :
294
308
295
- # Unless ! They have one of the required names of the VMSSs that we are going to create
309
+ # Ignoring... unless ! They have one of the required names of the VMSSs that we are going to create
296
310
if vmss .name in self .required_vmsss .keys ():
297
311
async_delete_vmss = Azure ._azure_api_call (
298
312
self .options .az .sandbox_resource_group ,
@@ -352,22 +366,9 @@ def _set_vmss_stage(self):
352
366
operation = self .compute_client .virtual_machine_scale_sets .begin_delete ,
353
367
)
354
368
355
- try :
356
- self .subnet_id = Azure ._azure_api_call (
357
- self .options .az .vnet_resource_group ,
358
- self .options .az .vnet ,
359
- self .options .az .subnet ,
360
- operation = self .network_client .subnets .get ,
361
- ).id # note the id attribute here
362
- except CuckooMachineError :
363
- raise CuckooCriticalError (
364
- f"Subnet '{ self .options .az .subnet } ' does not exist in Virtual Network '{ self .options .az .vnet } '"
365
- )
366
-
367
369
# Initialize the platform scaling state monitor
368
370
is_platform_scaling .update ({Azure .WINDOWS_PLATFORM : False , Azure .LINUX_PLATFORM : False })
369
371
370
- # Let's get the number of CPUs associated with the SKU (instance_type)
371
372
# If we want to programmatically determine the number of cores for the sku
372
373
if self .options .az .find_number_of_cores_for_sku or self .options .az .instance_type_cores == 0 :
373
374
resource_skus = Azure ._azure_api_call (
@@ -394,10 +395,14 @@ def _set_vmss_stage(self):
394
395
else :
395
396
self .instance_type_cpus = self .options .az .instance_type_cores
396
397
397
- # Create required VMSSs that don't exist yet
398
+ def _get_or_upsert_vmsss (self , vmsss_dict ):
399
+ """
400
+ Reimage or scale up existing VMSSs. Create non-existant required VMSSs.
401
+ """
402
+
398
403
vmss_creation_threads = []
399
404
vmss_reimage_threads = []
400
- for vmss , vals in self . required_vmsss .items ():
405
+ for vmss , vals in vmsss_dict .items ():
401
406
if vals ["exists" ] and not self .options .az .just_start :
402
407
if machine_pools [vmss ]["size" ] == 0 :
403
408
self ._thr_scale_machine_pool (self .options .az .scale_sets [vmss ].pool_tag , True if vals ["platform" ] else False ),
@@ -419,6 +424,10 @@ def _set_vmss_stage(self):
419
424
for thr in vmss_reimage_threads + vmss_creation_threads :
420
425
thr .join ()
421
426
427
+ def _create_batch_threads (self ):
428
+ """
429
+ Create batch reimage and delete threads.
430
+ """
422
431
# Initialize the batch reimage threads. We want at most 4 batch reimaging threads
423
432
# so that if no VMSS scaling or batch deleting is taking place (aka we are receiving constant throughput of
424
433
# tasks and have the appropriate number of VMs created) then we'll perform batch reimaging at an optimal rate.
@@ -583,8 +592,8 @@ def _add_machines_to_db(self, vmss_name):
583
592
resultserver_port = self .options .az .resultserver_port ,
584
593
reserved = False ,
585
594
)
586
- # When we aren't initializing the system, the machine will immediately become available in DB
587
- # When we are initializing, we're going to wait for the machine to be have the Cuckoo agent all set up
595
+ # We always wait for Cuckoo agent to finish setting up if 'wait_for_agent_before_starting' is true or if we are initializing.
596
+ # Else, the machine should become immediately available in DB.
588
597
if self .initializing or self .options .az .wait_for_agent_before_starting :
589
598
thr = threading .Thread (
590
599
target = Azure ._thr_wait_for_ready_machine ,
@@ -607,6 +616,24 @@ def _add_machines_to_db(self, vmss_name):
607
616
except Exception as e :
608
617
log .error (repr (e ), exc_info = True )
609
618
619
+ # If no machines on any VMSSs are in the db when we leave this method, CAPE will crash.
620
+ if not self .machines () and self .required_vmsss [vmss_name ]["retries" ] > 0 :
621
+ log .warning (f"No available VMs after initializing { vmss_name } . Attempting to reinitialize VMSS." )
622
+ self .required_vmsss [vmss_name ]["retries" ] -= 1
623
+ start_time = timeit .default_timer ()
624
+
625
+ while ((timeit .default_timer () - start_time ) < 120 ):
626
+ with vms_currently_being_deleted_lock :
627
+ if any (
628
+ failed_vm in vms_currently_being_deleted
629
+ for failed_vm in ready_vmss_vm_threads
630
+ ):
631
+ # VMs not deleted from VMSS yet.
632
+ continue
633
+ self ._get_or_upsert_vmsss (vmsss_dict = {vmss_name : self .required_vmsss [vmss_name ]})
634
+ return
635
+ log .debug (f"{ vmss_name } initialize retry failed. Timed out waiting for VMs to be deleted." )
636
+
610
637
def _delete_machines_from_db_if_missing (self , vmss_name ):
611
638
"""
612
639
Delete machine from database if it does not exist in the VMSS.
@@ -636,8 +663,8 @@ def delete_machine(self, label, delete_from_vmss=True):
636
663
super (Azure , self ).delete_machine (label )
637
664
638
665
if delete_from_vmss :
639
- vmss_name , instance_id = label .split ("_" )
640
666
# Only add vm to the lists if it isn't there already
667
+ vmss_name , instance_id = label .split ("_" )
641
668
with vms_currently_being_deleted_lock :
642
669
if not label in vms_currently_being_deleted :
643
670
vms_currently_being_deleted .append (label )
@@ -665,13 +692,13 @@ def _thr_wait_for_ready_machine(machine_name, machine_ip):
665
692
log .debug (f"{ machine_name } : Initializing..." )
666
693
except socket .error :
667
694
log .debug (f"{ machine_name } : Initializing..." )
668
- time . sleep ( 10 )
669
-
670
- if timeit . default_timer () - start >= timeout :
671
- # We didn't do it : (
672
- raise CuckooGuestCriticalTimeout (
673
- f"Machine { machine_name } : the guest initialization hit the critical " "timeout, analysis aborted."
674
- )
695
+ else :
696
+ if ( timeit . default_timer () - start ) >= timeout :
697
+ # We didn't do it :(
698
+ raise CuckooGuestCriticalTimeout (
699
+ f"Machine { machine_name } : the guest initialization hit the critical timeout, analysis aborted."
700
+ )
701
+ time . sleep ( 10 )
675
702
log .debug (f"Machine { machine_name } was created and available in { round (timeit .default_timer () - start )} s" )
676
703
677
704
@staticmethod
@@ -723,6 +750,18 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
723
750
@param vmss_tag: the tag used that represents the OS image
724
751
"""
725
752
753
+ try :
754
+ self .subnet_id = Azure ._azure_api_call (
755
+ self .options .az .vnet_resource_group ,
756
+ self .options .az .vnet ,
757
+ self .options .az .subnet ,
758
+ operation = self .network_client .subnets .get ,
759
+ ).id # note the id attribute here
760
+ except CuckooMachineError :
761
+ raise CuckooCriticalError (
762
+ f"Subnet '{ self .options .az .subnet } ' does not exist in Virtual Network '{ self .options .az .vnet } '"
763
+ )
764
+
726
765
vmss_managed_disk = models .VirtualMachineScaleSetManagedDiskParameters (
727
766
storage_account_type = self .options .az .storage_account_type
728
767
)
@@ -800,6 +839,7 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
800
839
"is_scaling_down" : False ,
801
840
"wait" : False ,
802
841
}
842
+ self .required_vmsss [vmss_name ]["exists" ] = True
803
843
with self .db .session .begin ():
804
844
self ._add_machines_to_db (vmss_name )
805
845
@@ -986,7 +1026,7 @@ def _scale_machine_pool(self, tag, per_platform=False):
986
1026
)
987
1027
988
1028
# We don't want to be stuck in this for longer than the timeout specified
989
- if timeit .default_timer () - start_time > AZURE_TIMEOUT :
1029
+ if ( timeit .default_timer () - start_time ) > AZURE_TIMEOUT :
990
1030
log .debug (f"Breaking out of the while loop within the scale down section for { vmss_name } ." )
991
1031
break
992
1032
# Get the updated number of relevant machines required
@@ -1250,9 +1290,12 @@ def _thr_reimage_list_reader(self):
1250
1290
vms_currently_being_reimaged .remove (f"{ vmss_to_reimage } _{ instance_id } " )
1251
1291
continue
1252
1292
1293
+ reimaged = True
1253
1294
# We wait because we want the machine to be fresh before another task is assigned to it
1254
1295
while not async_reimage_some_machines .done ():
1255
1296
if (timeit .default_timer () - start_time ) > AZURE_TIMEOUT :
1297
+ reimaged = False
1298
+
1256
1299
log .debug (
1257
1300
f"Reimaging machines { instance_ids } in { vmss_to_reimage } took too long, deleting them from the DB and the VMSS."
1258
1301
)
@@ -1270,7 +1313,7 @@ def _thr_reimage_list_reader(self):
1270
1313
with current_operations_lock :
1271
1314
current_vmss_operations -= 1
1272
1315
timediff = timeit .default_timer () - start_time
1273
- log .debug (f"Reimaging instances { instance_ids } in { vmss_to_reimage } took { round (timediff )} s" )
1316
+ log .debug (f"{ 'S' if reimaged else 'Uns' } uccessfully reimaging instances { instance_ids } in { vmss_to_reimage } took { round (timediff )} s" )
1274
1317
except Exception as e :
1275
1318
log .error (f"Exception occurred in the reimage thread: { e } . Trying again..." )
1276
1319
@@ -1297,8 +1340,8 @@ def _thr_delete_list_reader(self):
1297
1340
for vmss_name , count in vmss_vm_delete_counts .items ():
1298
1341
if count > max :
1299
1342
max = count
1300
- vmss_to_delete = vmss_name
1301
- vms_to_delete_from_same_vmss = [vm for vm in delete_vm_list if vm ["vmss" ] == vmss_to_delete ]
1343
+ vmss_to_delete_from = vmss_name
1344
+ vms_to_delete_from_same_vmss = [vm for vm in delete_vm_list if vm ["vmss" ] == vmss_to_delete_from ]
1302
1345
1303
1346
for vm in vms_to_delete_from_same_vmss :
1304
1347
delete_vm_list .remove (vm )
@@ -1309,7 +1352,7 @@ def _thr_delete_list_reader(self):
1309
1352
start_time = timeit .default_timer ()
1310
1353
async_delete_some_machines = Azure ._azure_api_call (
1311
1354
self .options .az .sandbox_resource_group ,
1312
- vmss_to_delete ,
1355
+ vmss_to_delete_from ,
1313
1356
models .VirtualMachineScaleSetVMInstanceIDs (instance_ids = instance_ids ),
1314
1357
polling_interval = 1 ,
1315
1358
operation = self .compute_client .virtual_machine_scale_sets .begin_delete_instances ,
@@ -1320,24 +1363,30 @@ def _thr_delete_list_reader(self):
1320
1363
current_vmss_operations -= 1
1321
1364
with vms_currently_being_deleted_lock :
1322
1365
for instance_id in instance_ids :
1323
- vms_currently_being_deleted .remove (f"{ vmss_to_delete } _{ instance_id } " )
1366
+ vms_currently_being_deleted .remove (f"{ vmss_to_delete_from } _{ instance_id } " )
1324
1367
continue
1325
1368
1326
1369
# We wait because we want the machine to be fresh before another task is assigned to it
1327
1370
while not async_delete_some_machines .done ():
1371
+ deleted = True
1328
1372
if (timeit .default_timer () - start_time ) > AZURE_TIMEOUT :
1329
- log .debug (f"Deleting machines { instance_ids } in { vmss_to_delete } took too long." )
1373
+ log .debug (f"Deleting machines { instance_ids } in { vmss_to_delete_from } took too long." )
1374
+ deleted = False
1330
1375
break
1331
1376
time .sleep (2 )
1332
1377
1378
+ if self .initializing and deleted :
1379
+ # All machines should have been removed from the db and the VMSS at this point.
1380
+ # To force the VMSS to scale to initial_pool_size, set the size to zero here.
1381
+ log .debug (f"Setting size to 0 for VMSS { vmss_to_delete_from } after successful deletion" )
1382
+ machine_pools [vmss_to_delete_from ]["size" ] = 0
1383
+
1333
1384
with vms_currently_being_deleted_lock :
1334
1385
for instance_id in instance_ids :
1335
- vms_currently_being_deleted .remove (f"{ vmss_to_delete } _{ instance_id } " )
1386
+ vms_currently_being_deleted .remove (f"{ vmss_to_delete_from } _{ instance_id } " )
1336
1387
1337
1388
with current_operations_lock :
1338
1389
current_vmss_operations -= 1
1339
- log .debug (
1340
- f"Deleting instances { instance_ids } in { vmss_to_delete } took { round (timeit .default_timer () - start_time )} s"
1341
- )
1390
+ log .debug (f"{ 'S' if deleted else 'Uns' } uccessfully deleting instances { instance_ids } in { vmss_to_delete_from } took { round (timeit .default_timer () - start_time )} s" )
1342
1391
except Exception as e :
1343
1392
log .error (f"Exception occurred in the delete thread: { e } . Trying again..." )
0 commit comments