116
116
BUFSIZE = int (cfg .processing .analysis_size_limit )
117
117
118
118
119
- def hash_file (method , path ) :
119
+ def hash_file (method , path : str ) -> str :
120
120
"""Calculates an hash on a file by path.
121
121
@param method: callable hashing method
122
122
@param path: file path
@@ -143,6 +143,17 @@ def convert(data):
143
143
144
144
145
145
def is_duplicated_binary (file_info : dict , cape_file : dict , append_file : bool ) -> bool :
146
+ """
147
+ Determines if a binary file is a duplicate based on various criteria.
148
+
149
+ Args:
150
+ file_info (dict): Information about the file being checked.
151
+ cape_file (dict): Information about the existing CAPE file.
152
+ append_file (bool): Flag indicating whether to append the file.
153
+
154
+ Returns:
155
+ bool: False if the file is determined to be a duplicate, otherwise returns the value of append_file.
156
+ """
146
157
if HAVE_PYDEEP :
147
158
ssdeep_grade = pydeep .compare (file_info ["ssdeep" ].encode (), cape_file ["ssdeep" ].encode ())
148
159
if ssdeep_grade >= ssdeep_threshold :
@@ -162,9 +173,25 @@ def is_duplicated_binary(file_info: dict, cape_file: dict, append_file: bool) ->
162
173
return append_file
163
174
164
175
165
- def static_config_parsers (cape_name , file_path , file_data ):
176
+ def static_config_parsers (cape_name : str , file_path : str , file_data : bytes ) -> dict :
177
+ """
178
+ Process CAPE Yara hits and extract configuration data using various parsers.
179
+
180
+ This function attempts to extract configuration data from a given file using different parsers
181
+ such as CAPE extractors, DC3-MWCP, and Malwareconfigs. The function returns a dictionary containing
182
+ the extracted configuration data.
183
+
184
+ Args:
185
+ cape_name (str): The name of the CAPE parser to use.
186
+ file_path (str): The path to the file being analyzed.
187
+ file_data (bytes): The binary data of the file being analyzed.
188
+
189
+ Returns:
190
+ dict: A dictionary containing the extracted configuration data. If no configuration data is
191
+ extracted, an empty dictionary is returned.
192
+ """
166
193
"""Process CAPE Yara hits"""
167
- cape_config = {cape_name : {} }
194
+ cape_config = {}
168
195
parser_loaded = False
169
196
# CAPE - pure python parsers
170
197
# MWCP
@@ -184,14 +211,14 @@ def static_config_parsers(cape_name, file_path, file_data):
184
211
# python3 map object returns iterator by default, not list and not serializeable in JSON.
185
212
if isinstance (value , map ):
186
213
value = list (value )
187
- cape_config [ cape_name ] .update ({key : [value ]})
214
+ cape_config . setdefault ( cape_name , {}) .update ({key : [value ]})
188
215
parser_loaded = True
189
216
elif isinstance (cape_configraw , dict ):
190
217
for key , value in cape_configraw .items ():
191
218
# python3 map object returns iterator by default, not list and not serializeable in JSON.
192
219
if isinstance (value , map ):
193
220
value = list (value )
194
- cape_config [ cape_name ] .update ({key : [value ]})
221
+ cape_config . setdefault ( cape_name , {}) .update ({key : [value ]})
195
222
parser_loaded = True
196
223
except Exception as e :
197
224
log .exception ("CAPE: parsing error on %s with %s: %s" , file_path , cape_name , e )
@@ -215,7 +242,7 @@ def static_config_parsers(cape_name, file_path, file_data):
215
242
del reportmeta ["other" ]
216
243
217
244
tmp_dict .update (reportmeta )
218
- cape_config [ cape_name ] = convert (tmp_dict )
245
+ cape_config . setdefault ( cape_name , {}). update ( convert (tmp_dict ) )
219
246
log .debug ("CAPE: DC3-MWCP parser for %s completed" , cape_name )
220
247
else :
221
248
error_lines = report .errors [0 ].split ("\n " )
@@ -252,10 +279,10 @@ def static_config_parsers(cape_name, file_path, file_data):
252
279
# ToDo remove
253
280
if isinstance (malwareconfig_config , list ):
254
281
for key , value in malwareconfig_config [0 ].items ():
255
- cape_config [ cape_name ] .update ({key : [value ]})
282
+ cape_config . setdefault ( cape_name , {}) .update ({key : [value ]})
256
283
elif isinstance (malwareconfig_config , dict ):
257
284
for key , value in malwareconfig_config .items ():
258
- cape_config [ cape_name ] .update ({key : [value ]})
285
+ cape_config . setdefault ( cape_name , {}) .update ({key : [value ]})
259
286
except Exception as e :
260
287
if "rules" in str (e ):
261
288
log .warning ("You probably need to compile yara-python with dotnet support" )
@@ -267,9 +294,6 @@ def static_config_parsers(cape_name, file_path, file_data):
267
294
cape_name ,
268
295
str (e ),
269
296
)
270
-
271
- if cape_config .get (cape_name ) == {}:
272
- return {}
273
297
"""
274
298
elif HAVE_MALDUCK and not parser_loaded and cape_name.lower() in malduck_modules_names:
275
299
log.debug("Running Malduck on %s", file_path)
@@ -290,14 +314,26 @@ def static_config_parsers(cape_name, file_path, file_data):
290
314
del ext
291
315
if tmp_config:
292
316
for key, value in tmp_config[0].items():
293
- cape_config[ cape_name] .update({key: [value]})
317
+ cape_config.setdefault( cape_name, {}) .update({key: [value]})
294
318
"""
295
- if not cape_config [cape_name ]:
296
- return {}
319
+
297
320
return cape_config
298
321
299
322
300
- def static_config_lookup (file_path , sha256 = False ):
323
+ def static_config_lookup (file_path : str , sha256 : str = False ) -> dict :
324
+ """
325
+ Look up static configuration information for a given file based on its SHA-256 hash.
326
+
327
+ This function calculates the SHA-256 hash of the file at the specified path if not provided,
328
+ and then queries either a MongoDB or Elasticsearch database to retrieve configuration information.
329
+
330
+ Args:
331
+ file_path (str): The path to the file for which to look up configuration information.
332
+ sha256 (str, optional): The SHA-256 hash of the file. If not provided, it will be calculated.
333
+
334
+ Returns:
335
+ dict or None: A dictionary containing the configuration information if found, otherwise None.
336
+ """
301
337
if not sha256 :
302
338
sha256 = hashlib .sha256 (open (file_path , "rb" ).read ()).hexdigest ()
303
339
@@ -327,13 +363,26 @@ def static_config_lookup(file_path, sha256=False):
327
363
named_static_extractors = []
328
364
329
365
330
- def static_extraction (path ):
331
- config = False
366
+ def static_extraction (path : str ) -> dict :
367
+ """
368
+ Extracts static configuration from a file using YARA rules and named static extractors.
369
+
370
+ Args:
371
+ path (str): The file path to be analyzed.
372
+
373
+ Returns:
374
+ dict or bool: The extracted configuration as a dictionary if successful,
375
+ False if no configuration is found or an error occurs.
376
+
377
+ Raises:
378
+ Exception: Logs any exceptions that occur during the extraction process.
379
+ """
380
+ config = {}
332
381
try :
333
382
hits = File (path ).get_yara (category = "CAPE" )
334
383
path_name = Path (path ).name
335
384
if not hits and path_name not in named_static_extractors :
336
- return False
385
+ return config
337
386
file_data = path_read_file (path )
338
387
if path_name in named_static_extractors :
339
388
config = static_config_parsers (path_name , path , file_data )
@@ -349,7 +398,18 @@ def static_extraction(path):
349
398
return config
350
399
351
400
352
- def cape_name_from_yara (details , pid , results ):
401
+ def cape_name_from_yara (details : dict , pid : int , results : dict ) -> str :
402
+ """
403
+ Extracts the CAPE name from YARA hit details and associates it with a process ID (pid) in the results dictionary.
404
+
405
+ Args:
406
+ details (dict): A dictionary containing YARA hit details, expected to have a key "cape_yara" with a list of hits.
407
+ pid (int): The process ID to associate the CAPE name with.
408
+ results (dict): A dictionary to store the association between detections and process IDs.
409
+
410
+ Returns:
411
+ str: The CAPE name extracted from the YARA hit, or None if no CAPE name is found.
412
+ """
353
413
for hit in details .get ("cape_yara" , []) or []:
354
414
if File .yara_hit_provides_detection (hit ):
355
415
if "detections2pid" not in results :
0 commit comments