31
31
CuckooDependencyError ,
32
32
CuckooGuestCriticalTimeout ,
33
33
CuckooMachineError ,
34
- CuckooOperationalError ,
35
34
)
36
35
from lib .cuckoo .core .database import TASK_PENDING , Machine
37
36
@@ -110,6 +109,10 @@ def set_options(self, options: dict) -> None:
110
109
@param options: machine manager options dict.
111
110
"""
112
111
self .options = options
112
+ # Using "scale_sets" here instead of "machines" to avoid KeyError
113
+ mmanager_opts = self .options .get (self .module_name )
114
+ if not isinstance (mmanager_opts ["scale_sets" ], list ):
115
+ mmanager_opts ["scale_sets" ] = str (mmanager_opts ["scale_sets" ]).strip ().split ("," )
113
116
114
117
def _initialize (self ):
115
118
"""
@@ -118,11 +121,7 @@ def _initialize(self):
118
121
@param module_name: module name
119
122
@raise CuckooDependencyError: if there is a problem with the dependencies call
120
123
"""
121
- # Using "scale_sets" here instead of "machines" to avoid KeyError
122
124
mmanager_opts = self .options .get (self .module_name )
123
- if not isinstance (mmanager_opts ["scale_sets" ], list ):
124
- mmanager_opts ["scale_sets" ] = str (mmanager_opts ["scale_sets" ]).strip ().split ("," )
125
-
126
125
# Replace a list of IDs with dictionary representations
127
126
scale_sets = mmanager_opts .pop ("scale_sets" )
128
127
mmanager_opts ["scale_sets" ] = {}
@@ -150,7 +149,7 @@ def _initialize(self):
150
149
# Insert the scale_set_opts into the module.scale_sets attribute
151
150
mmanager_opts ["scale_sets" ][scale_set_id ] = scale_set_opts
152
151
153
- except (AttributeError , CuckooOperationalError ) as e :
152
+ except (AttributeError , CuckooCriticalError ) as e :
154
153
log .warning (f"Configuration details about scale set { scale_set_id .strip ()} are missing: { e } " )
155
154
continue
156
155
@@ -168,19 +167,21 @@ def _initialize_check(self):
168
167
169
168
# We will be using this as a source of truth for the VMSS configs
170
169
self .required_vmsss = {
171
- vmss_name : {"exists" : False , "image" : None , "platform" : None , "tag" : None , "initial_pool_size" : None }
170
+ vmss_name : {
171
+ "exists" : False , "image" : None , "platform" : None , "tag" : None , "initial_pool_size" : None , "retries" : self .options .az .init_retries
172
+ }
172
173
for vmss_name in self .options .az .scale_sets
173
174
}
174
175
175
176
# Starting the thread that sets API clients periodically
176
177
self ._thr_refresh_clients ()
177
178
178
- # Starting the thread that scales the machine pools periodically
179
- self ._thr_machine_pool_monitor ()
180
-
181
179
# Initialize the VMSSs that we will be using and not using
182
180
self ._set_vmss_stage ()
183
181
182
+ # Starting the thread that scales the machine pools periodically
183
+ self ._thr_machine_pool_monitor ()
184
+
184
185
# Set the flag that indicates that the system is not initializing
185
186
self .initializing = False
186
187
@@ -189,7 +190,6 @@ def _get_credentials(self):
189
190
Used to instantiate the Azure ClientSecretCredential object.
190
191
@return: an Azure ClientSecretCredential object
191
192
"""
192
-
193
193
credentials = None
194
194
if self .options .az .secret and self .options .az .secret != "<secret>" :
195
195
# Instantiates the ClientSecretCredential object using
@@ -279,6 +279,19 @@ def _set_vmss_stage(self):
279
279
elif required_vmss_values ["initial_pool_size" ] is None :
280
280
raise CuckooCriticalError (f"The VMSS '{ required_vmss_name } ' does not have an initial pool size." )
281
281
282
+ self ._process_existing_vmsss ()
283
+ self ._process_pre_existing_vmsss (self .required_vmsss )
284
+ self ._create_batch_threads ()
285
+
286
+ def _process_pre_existing_vmsss (self ):
287
+ """
288
+ Delete a VMSS if it does NOT have:
289
+ - the expected tag AND has one of the required names for a VMSS we plan to create
290
+ - one of the required names AND has the expected tag AND az.config's multiple_capes_in_sandbox_rg is 'false'
291
+ Update a VMSS if it:
292
+ - does not have the required image reference
293
+ - has a capacity (current size) different from its required 'initial_pool_size'
294
+ """
282
295
# Get all VMSSs in Resource Group
283
296
existing_vmsss = Azure ._azure_api_call (
284
297
self .options .az .sandbox_resource_group ,
@@ -292,7 +305,7 @@ def _set_vmss_stage(self):
292
305
# Cuckoo (AUTO_SCALE_CAPE key-value pair), ignore
293
306
if not vmss .tags or not vmss .tags .get (Azure .AUTO_SCALE_CAPE_KEY ) == Azure .AUTO_SCALE_CAPE_VALUE :
294
307
295
- # Unless ! They have one of the required names of the VMSSs that we are going to create
308
+ # Ignoring... unless ! They have one of the required names of the VMSSs that we are going to create
296
309
if vmss .name in self .required_vmsss .keys ():
297
310
async_delete_vmss = Azure ._azure_api_call (
298
311
self .options .az .sandbox_resource_group ,
@@ -352,22 +365,9 @@ def _set_vmss_stage(self):
352
365
operation = self .compute_client .virtual_machine_scale_sets .begin_delete ,
353
366
)
354
367
355
- try :
356
- self .subnet_id = Azure ._azure_api_call (
357
- self .options .az .vnet_resource_group ,
358
- self .options .az .vnet ,
359
- self .options .az .subnet ,
360
- operation = self .network_client .subnets .get ,
361
- ).id # note the id attribute here
362
- except CuckooMachineError :
363
- raise CuckooCriticalError (
364
- f"Subnet '{ self .options .az .subnet } ' does not exist in Virtual Network '{ self .options .az .vnet } '"
365
- )
366
-
367
368
# Initialize the platform scaling state monitor
368
369
is_platform_scaling .update ({Azure .WINDOWS_PLATFORM : False , Azure .LINUX_PLATFORM : False })
369
370
370
- # Let's get the number of CPUs associated with the SKU (instance_type)
371
371
# If we want to programmatically determine the number of cores for the sku
372
372
if self .options .az .find_number_of_cores_for_sku or self .options .az .instance_type_cores == 0 :
373
373
resource_skus = Azure ._azure_api_call (
@@ -394,10 +394,14 @@ def _set_vmss_stage(self):
394
394
else :
395
395
self .instance_type_cpus = self .options .az .instance_type_cores
396
396
397
- # Create required VMSSs that don't exist yet
397
+ def _get_or_upsert_vmsss (self , vmsss_dict ):
398
+ """
399
+ Reimage or scale up existing VMSSs. Create non-existant required VMSSs.
400
+ """
401
+
398
402
vmss_creation_threads = []
399
403
vmss_reimage_threads = []
400
- for vmss , vals in self . required_vmsss .items ():
404
+ for vmss , vals in vmsss_dict .items ():
401
405
if vals ["exists" ] and not self .options .az .just_start :
402
406
if machine_pools [vmss ]["size" ] == 0 :
403
407
self ._thr_scale_machine_pool (self .options .az .scale_sets [vmss ].pool_tag , True if vals ["platform" ] else False ),
@@ -419,6 +423,10 @@ def _set_vmss_stage(self):
419
423
for thr in vmss_reimage_threads + vmss_creation_threads :
420
424
thr .join ()
421
425
426
+ def _create_batch_threads (self ):
427
+ """
428
+ Create batch reimage and delete threads.
429
+ """
422
430
# Initialize the batch reimage threads. We want at most 4 batch reimaging threads
423
431
# so that if no VMSS scaling or batch deleting is taking place (aka we are receiving constant throughput of
424
432
# tasks and have the appropriate number of VMs created) then we'll perform batch reimaging at an optimal rate.
@@ -583,8 +591,8 @@ def _add_machines_to_db(self, vmss_name):
583
591
resultserver_port = self .options .az .resultserver_port ,
584
592
reserved = False ,
585
593
)
586
- # When we aren't initializing the system, the machine will immediately become available in DB
587
- # When we are initializing, we're going to wait for the machine to be have the Cuckoo agent all set up
594
+ # We always wait for Cuckoo agent to finish setting up if 'wait_for_agent_before_starting' is true or if we are initializing.
595
+ # Else, the machine should become immediately available in DB.
588
596
if self .initializing or self .options .az .wait_for_agent_before_starting :
589
597
thr = threading .Thread (
590
598
target = Azure ._thr_wait_for_ready_machine ,
@@ -607,6 +615,24 @@ def _add_machines_to_db(self, vmss_name):
607
615
except Exception as e :
608
616
log .error (repr (e ), exc_info = True )
609
617
618
+ # If no machines on any VMSSs are in the db when we leave this method, CAPE will crash.
619
+ if not self .machines () and self .required_vmsss [vmss_name ]["retries" ] > 0 :
620
+ log .warning (f"No available VMs after initializing { vmss_name } . Attempting to reinitialize VMSS." )
621
+ self .required_vmsss [vmss_name ]["retries" ] -= 1
622
+ start_time = timeit .default_timer ()
623
+
624
+ while ((timeit .default_timer () - start_time ) < 120 ):
625
+ with vms_currently_being_deleted_lock :
626
+ if any (
627
+ failed_vm in vms_currently_being_deleted
628
+ for failed_vm in ready_vmss_vm_threads
629
+ ):
630
+ # VMs not deleted from VMSS yet.
631
+ continue
632
+ self ._get_or_upsert_vmsss (vmsss_dict = {vmss_name : self .required_vmsss [vmss_name ]})
633
+ return
634
+ log .debug (f"{ vmss_name } initialize retry failed. Timed out waiting for VMs to be deleted." )
635
+
610
636
def _delete_machines_from_db_if_missing (self , vmss_name ):
611
637
"""
612
638
Delete machine from database if it does not exist in the VMSS.
@@ -636,8 +662,8 @@ def delete_machine(self, label, delete_from_vmss=True):
636
662
super (Azure , self ).delete_machine (label )
637
663
638
664
if delete_from_vmss :
639
- vmss_name , instance_id = label .split ("_" )
640
665
# Only add vm to the lists if it isn't there already
666
+ vmss_name , instance_id = label .split ("_" )
641
667
with vms_currently_being_deleted_lock :
642
668
if not label in vms_currently_being_deleted :
643
669
vms_currently_being_deleted .append (label )
@@ -665,13 +691,13 @@ def _thr_wait_for_ready_machine(machine_name, machine_ip):
665
691
log .debug (f"{ machine_name } : Initializing..." )
666
692
except socket .error :
667
693
log .debug (f"{ machine_name } : Initializing..." )
668
- time . sleep ( 10 )
669
-
670
- if timeit . default_timer () - start >= timeout :
671
- # We didn't do it : (
672
- raise CuckooGuestCriticalTimeout (
673
- f"Machine { machine_name } : the guest initialization hit the critical " "timeout, analysis aborted."
674
- )
694
+ else :
695
+ if ( timeit . default_timer () - start ) >= timeout :
696
+ # We didn't do it :(
697
+ raise CuckooGuestCriticalTimeout (
698
+ f"Machine { machine_name } : the guest initialization hit the critical timeout, analysis aborted."
699
+ )
700
+ time . sleep ( 10 )
675
701
log .debug (f"Machine { machine_name } was created and available in { round (timeit .default_timer () - start )} s" )
676
702
677
703
@staticmethod
@@ -723,6 +749,18 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
723
749
@param vmss_tag: the tag used that represents the OS image
724
750
"""
725
751
752
+ try :
753
+ self .subnet_id = Azure ._azure_api_call (
754
+ self .options .az .vnet_resource_group ,
755
+ self .options .az .vnet ,
756
+ self .options .az .subnet ,
757
+ operation = self .network_client .subnets .get ,
758
+ ).id # note the id attribute here
759
+ except CuckooMachineError :
760
+ raise CuckooCriticalError (
761
+ f"Subnet '{ self .options .az .subnet } ' does not exist in Virtual Network '{ self .options .az .vnet } '"
762
+ )
763
+
726
764
vmss_managed_disk = models .VirtualMachineScaleSetManagedDiskParameters (
727
765
storage_account_type = self .options .az .storage_account_type
728
766
)
@@ -800,6 +838,7 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
800
838
"is_scaling_down" : False ,
801
839
"wait" : False ,
802
840
}
841
+ self .required_vmsss [vmss_name ]["exists" ] = True
803
842
with self .db .session .begin ():
804
843
self ._add_machines_to_db (vmss_name )
805
844
@@ -986,7 +1025,7 @@ def _scale_machine_pool(self, tag, per_platform=False):
986
1025
)
987
1026
988
1027
# We don't want to be stuck in this for longer than the timeout specified
989
- if timeit .default_timer () - start_time > AZURE_TIMEOUT :
1028
+ if ( timeit .default_timer () - start_time ) > AZURE_TIMEOUT :
990
1029
log .debug (f"Breaking out of the while loop within the scale down section for { vmss_name } ." )
991
1030
break
992
1031
# Get the updated number of relevant machines required
@@ -1250,9 +1289,12 @@ def _thr_reimage_list_reader(self):
1250
1289
vms_currently_being_reimaged .remove (f"{ vmss_to_reimage } _{ instance_id } " )
1251
1290
continue
1252
1291
1292
+ reimaged = True
1253
1293
# We wait because we want the machine to be fresh before another task is assigned to it
1254
1294
while not async_reimage_some_machines .done ():
1255
1295
if (timeit .default_timer () - start_time ) > AZURE_TIMEOUT :
1296
+ reimaged = False
1297
+
1256
1298
log .debug (
1257
1299
f"Reimaging machines { instance_ids } in { vmss_to_reimage } took too long, deleting them from the DB and the VMSS."
1258
1300
)
@@ -1270,7 +1312,7 @@ def _thr_reimage_list_reader(self):
1270
1312
with current_operations_lock :
1271
1313
current_vmss_operations -= 1
1272
1314
timediff = timeit .default_timer () - start_time
1273
- log .debug (f"Reimaging instances { instance_ids } in { vmss_to_reimage } took { round (timediff )} s" )
1315
+ log .debug (f"{ 'S' if reimaged else 'Uns' } uccessfully reimaging instances { instance_ids } in { vmss_to_reimage } took { round (timediff )} s" )
1274
1316
except Exception as e :
1275
1317
log .error (f"Exception occurred in the reimage thread: { e } . Trying again..." )
1276
1318
@@ -1297,8 +1339,8 @@ def _thr_delete_list_reader(self):
1297
1339
for vmss_name , count in vmss_vm_delete_counts .items ():
1298
1340
if count > max :
1299
1341
max = count
1300
- vmss_to_delete = vmss_name
1301
- vms_to_delete_from_same_vmss = [vm for vm in delete_vm_list if vm ["vmss" ] == vmss_to_delete ]
1342
+ vmss_to_delete_from = vmss_name
1343
+ vms_to_delete_from_same_vmss = [vm for vm in delete_vm_list if vm ["vmss" ] == vmss_to_delete_from ]
1302
1344
1303
1345
for vm in vms_to_delete_from_same_vmss :
1304
1346
delete_vm_list .remove (vm )
@@ -1309,7 +1351,7 @@ def _thr_delete_list_reader(self):
1309
1351
start_time = timeit .default_timer ()
1310
1352
async_delete_some_machines = Azure ._azure_api_call (
1311
1353
self .options .az .sandbox_resource_group ,
1312
- vmss_to_delete ,
1354
+ vmss_to_delete_from ,
1313
1355
models .VirtualMachineScaleSetVMInstanceIDs (instance_ids = instance_ids ),
1314
1356
polling_interval = 1 ,
1315
1357
operation = self .compute_client .virtual_machine_scale_sets .begin_delete_instances ,
@@ -1320,24 +1362,30 @@ def _thr_delete_list_reader(self):
1320
1362
current_vmss_operations -= 1
1321
1363
with vms_currently_being_deleted_lock :
1322
1364
for instance_id in instance_ids :
1323
- vms_currently_being_deleted .remove (f"{ vmss_to_delete } _{ instance_id } " )
1365
+ vms_currently_being_deleted .remove (f"{ vmss_to_delete_from } _{ instance_id } " )
1324
1366
continue
1325
1367
1326
1368
# We wait because we want the machine to be fresh before another task is assigned to it
1327
1369
while not async_delete_some_machines .done ():
1370
+ deleted = True
1328
1371
if (timeit .default_timer () - start_time ) > AZURE_TIMEOUT :
1329
- log .debug (f"Deleting machines { instance_ids } in { vmss_to_delete } took too long." )
1372
+ log .debug (f"Deleting machines { instance_ids } in { vmss_to_delete_from } took too long." )
1373
+ deleted = False
1330
1374
break
1331
1375
time .sleep (2 )
1332
1376
1377
+ if self .initializing and deleted :
1378
+ # All machines should have been removed from the db and the VMSS at this point.
1379
+ # To force the VMSS to scale to initial_pool_size, set the size to zero here.
1380
+ log .debug (f"Setting size to 0 for VMSS { vmss_to_delete_from } after successful deletion" )
1381
+ machine_pools [vmss_to_delete_from ]["size" ] = 0
1382
+
1333
1383
with vms_currently_being_deleted_lock :
1334
1384
for instance_id in instance_ids :
1335
- vms_currently_being_deleted .remove (f"{ vmss_to_delete } _{ instance_id } " )
1385
+ vms_currently_being_deleted .remove (f"{ vmss_to_delete_from } _{ instance_id } " )
1336
1386
1337
1387
with current_operations_lock :
1338
1388
current_vmss_operations -= 1
1339
- log .debug (
1340
- f"Deleting instances { instance_ids } in { vmss_to_delete } took { round (timeit .default_timer () - start_time )} s"
1341
- )
1389
+ log .debug (f"{ 'S' if deleted else 'Uns' } uccessfully deleting instances { instance_ids } in { vmss_to_delete_from } took { round (timeit .default_timer () - start_time )} s" )
1342
1390
except Exception as e :
1343
1391
log .error (f"Exception occurred in the delete thread: { e } . Trying again..." )
0 commit comments