Skip to content

Commit 2283cfb

Browse files
committed
style: Automatic code formatting
1 parent 30f7fb9 commit 2283cfb

File tree

10 files changed

+51
-152
lines changed

10 files changed

+51
-152
lines changed

lib/parsers_aux/ratking/__init__.py

Lines changed: 20 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,14 @@
3434
from re import DOTALL, compile, search
3535
from typing import Any, Tuple
3636

37-
# from yara import Rules
38-
3937
from .config_parser_exception import ConfigParserException
4038
from .utils import config_item
41-
from .utils.decryptors import (
42-
SUPPORTED_DECRYPTORS,
43-
ConfigDecryptor,
44-
IncompatibleDecryptorException,
45-
)
39+
from .utils.decryptors import SUPPORTED_DECRYPTORS, ConfigDecryptor, IncompatibleDecryptorException
4640
from .utils.dotnetpe_payload import DotNetPEPayload
4741

42+
# from yara import Rules
43+
44+
4845
logger = getLogger(__name__)
4946

5047

@@ -54,9 +51,7 @@ class RATConfigParser:
5451
_MIN_CONFIG_LEN_CEILING = 9
5552

5653
# Pattern to find the VerifyHash() method
57-
_PATTERN_VERIFY_HASH = compile(
58-
rb"\x7e.{3}\x04(?:\x6f.{3}\x0a){2}\x74.{3}\x01", DOTALL
59-
)
54+
_PATTERN_VERIFY_HASH = compile(rb"\x7e.{3}\x04(?:\x6f.{3}\x0a){2}\x74.{3}\x01", DOTALL)
6055

6156
# def __init__(self, file_path: str, yara_rule: Rules = None) -> None:
6257
def __init__(self, file_data: bytes = None) -> None:
@@ -77,33 +72,24 @@ def __init__(self, file_data: bytes = None) -> None:
7772
self._decryptor: ConfigDecryptor = None
7873
self.report["config"] = self._get_config()
7974
self.report["key"] = (
80-
self._decryptor.key.hex()
81-
if self._decryptor is not None and self._decryptor.key is not None
82-
else "None"
75+
self._decryptor.key.hex() if self._decryptor is not None and self._decryptor.key is not None else "None"
8376
)
8477
self.report["salt"] = (
85-
self._decryptor.salt.hex()
86-
if self._decryptor is not None and self._decryptor.salt is not None
87-
else "None"
78+
self._decryptor.salt.hex() if self._decryptor is not None and self._decryptor.salt is not None else "None"
8879
)
8980
except Exception as e:
9081
# self.report["config"] = f"Exception encountered for {file_path}: {e}"
9182
self.report["config"] = f"Exception encountered: {e}"
9283

9384
# Decrypts/decodes values from an encrypted config and returns the
9485
# decrypted/decoded config
95-
def _decrypt_and_decode_config(
96-
self, encrypted_config: bytes, min_config_len: int
97-
) -> dict[str, Any]:
86+
def _decrypt_and_decode_config(self, encrypted_config: bytes, min_config_len: int) -> dict[str, Any]:
9887
decoded_config = {}
9988

10089
for item_class in config_item.SUPPORTED_CONFIG_ITEMS:
10190
item = item_class()
10291
# Translate config Field RVAs to Field names
103-
item_data = {
104-
self._dnpp.field_name_from_rva(k): v
105-
for k, v in item.parse_from(encrypted_config).items()
106-
}
92+
item_data = {self._dnpp.field_name_from_rva(k): v for k, v in item.parse_from(encrypted_config).items()}
10793

10894
if len(item_data) > 0:
10995
if type(item) is config_item.EncryptedStringConfigItem:
@@ -122,22 +108,16 @@ def _decrypt_and_decode_config(
122108
try:
123109
self._decryptor = decryptor(self._dnpp)
124110
except IncompatibleDecryptorException as ide:
125-
logger.debug(
126-
f"Decryptor incompatible {decryptor} : {ide}"
127-
)
111+
logger.debug(f"Decryptor incompatible {decryptor} : {ide}")
128112
self._incompatible_decryptors.append(decryptor)
129113
continue
130114
try:
131115
# Try to decrypt the encrypted strings
132116
# Continue to next compatible decryptor on failure
133-
item_data = self._decryptor.decrypt_encrypted_strings(
134-
item_data
135-
)
117+
item_data = self._decryptor.decrypt_encrypted_strings(item_data)
136118
break
137119
except Exception as e:
138-
logger.debug(
139-
f"Decryption failed with decryptor {decryptor} : {e}"
140-
)
120+
logger.debug(f"Decryption failed with decryptor {decryptor} : {e}")
141121
self._decryptor = None
142122

143123
if self._decryptor is None:
@@ -146,16 +126,12 @@ def _decrypt_and_decode_config(
146126
elif type(item) is config_item.ByteArrayConfigItem:
147127
for k in item_data:
148128
arr_size, arr_rva = item_data[k]
149-
item_data[k] = self._dnpp.byte_array_from_size_and_rva(
150-
arr_size, arr_rva
151-
).hex()
129+
item_data[k] = self._dnpp.byte_array_from_size_and_rva(arr_size, arr_rva).hex()
152130

153131
decoded_config.update(item_data)
154132

155133
if len(decoded_config) < min_config_len:
156-
raise ConfigParserException(
157-
f"Minimum threshold of config items not met: {len(decoded_config)}/{min_config_len}"
158-
)
134+
raise ConfigParserException(f"Minimum threshold of config items not met: {len(decoded_config)}/{min_config_len}")
159135
return decoded_config
160136

161137
# Searches for the RAT configuration section, using the VerifyHash() marker
@@ -185,38 +161,29 @@ def _get_config_cctor_brute_force(self) -> Tuple[int, dict[str, Any]]:
185161
raise ConfigParserException("No .cctor method could be found")
186162

187163
# For each .cctor method, map its RVA and body (in raw bytes)
188-
candidate_cctor_data = {
189-
method.rva: self._dnpp.method_body_from_method(method)
190-
for method in candidates
191-
}
164+
candidate_cctor_data = {method.rva: self._dnpp.method_body_from_method(method) for method in candidates}
192165

193166
config_start, decrypted_config = None, None
194167
# Start at our ceiling value for number of config items
195168
min_config_len = self._MIN_CONFIG_LEN_CEILING
196169

197170
while decrypted_config is None and min_config_len >= self._MIN_CONFIG_LEN_FLOOR:
198171
for method_rva, method_body in candidate_cctor_data.items():
199-
logger.debug(
200-
f"Attempting brute force at .cctor method at {hex(method_rva)}"
201-
)
172+
logger.debug(f"Attempting brute force at .cctor method at {hex(method_rva)}")
202173
try:
203174
config_start, decrypted_config = (
204175
method_rva,
205176
self._decrypt_and_decode_config(method_body, min_config_len),
206177
)
207178
break
208179
except Exception as e:
209-
logger.debug(
210-
f"Brute force failed for method at {hex(method_rva)}: {e}"
211-
)
180+
logger.debug(f"Brute force failed for method at {hex(method_rva)}: {e}")
212181
continue
213182
# Reduce the minimum config length until we reach our floor
214183
min_config_len -= 1
215184

216185
if decrypted_config is None:
217-
raise ConfigParserException(
218-
"No valid configuration could be parsed from any .cctor methods"
219-
)
186+
raise ConfigParserException("No valid configuration could be parsed from any .cctor methods")
220187
return config_start, decrypted_config
221188

222189
# Attempts to retrieve the config via looking for a config section preceded
@@ -230,16 +197,12 @@ def _get_config_verify_hash_method(self) -> Tuple[int, dict[str, Any]]:
230197

231198
# Reverse the hit to find the VerifyHash() method, then grab the
232199
# subsequent function
233-
config_method = self._dnpp.method_from_instruction_offset(
234-
verify_hash_hit.start(), 1
235-
)
200+
config_method = self._dnpp.method_from_instruction_offset(verify_hash_hit.start(), 1)
236201
encrypted_config = self._dnpp.method_body_from_method(config_method)
237202
min_config_len = self._MIN_CONFIG_LEN_CEILING
238203
while True:
239204
try:
240-
decrypted_config = self._decrypt_and_decode_config(
241-
encrypted_config, min_config_len
242-
)
205+
decrypted_config = self._decrypt_and_decode_config(encrypted_config, min_config_len)
243206
return config_method.rva, decrypted_config
244207
except Exception as e:
245208
# Reduce the minimum config length until we reach our floor

lib/parsers_aux/ratking/utils/config_item.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ def parse_from(self, data: bytes) -> dict[int, Any]:
6868
fields[field_rva] = field_value
6969
found_items += 1
7070
else:
71-
logger.debug(
72-
f"Overlapping Field RVAs detected in config at {hex(field_rva)}"
73-
)
71+
logger.debug(f"Overlapping Field RVAs detected in config at {hex(field_rva)}")
7472
logger.debug(f"Parsed {found_items} {self._label} values")
7573
return fields
7674

lib/parsers_aux/ratking/utils/data_utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ def decode_bytes(byte_str: bytes | str) -> str:
5050
else:
5151
result = byte_str.decode("utf-8")
5252
except Exception:
53-
raise ConfigParserException(
54-
f"Error decoding bytes object to Unicode: {byte_str}"
55-
)
53+
raise ConfigParserException(f"Error decoding bytes object to Unicode: {byte_str}")
5654
return result
5755

5856

lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,5 @@ def __init__(self, payload: DotNetPEPayload) -> None:
4949
# Abstract method to take in a map representing a configuration of config
5050
# Field names and values and return a decoded/decrypted configuration
5151
@abstractmethod
52-
def decrypt_encrypted_strings(
53-
self, encrypted_strings: dict[str, str]
54-
) -> dict[str, list[str] | str]:
52+
def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, list[str] | str]:
5553
pass

lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ class ConfigDecryptorAESCBC(ConfigDecryptor):
5656
_MIN_CIPHERTEXT_LEN = 48
5757

5858
# Patterns for identifying AES metadata
59-
_PATTERN_AES_KEY_AND_BLOCK_SIZE = compile(
60-
b"[\x06-\x09]\x20(.{4})\x6f.{4}[\x06-\x09]\x20(.{4})", DOTALL
61-
)
59+
_PATTERN_AES_KEY_AND_BLOCK_SIZE = compile(b"[\x06-\x09]\x20(.{4})\x6f.{4}[\x06-\x09]\x20(.{4})", DOTALL)
6260
# Do not compile in-line replacement patterns
6361
_PATTERN_AES_KEY_BASE = b"(.{3}\x04).%b"
6462
_PATTERN_AES_SALT_INIT = b"\x80%b\x2a"
@@ -79,9 +77,7 @@ def __init__(self, payload: DotNetPEPayload) -> None:
7977
# Given an initialization vector and ciphertext, creates a Cipher
8078
# object with the AES key and specified IV and decrypts the ciphertext
8179
def _decrypt(self, iv: bytes, ciphertext: bytes) -> bytes:
82-
logger.debug(
83-
f"Decrypting {ciphertext} with key {self.key.hex()} and IV {iv.hex()}..."
84-
)
80+
logger.debug(f"Decrypting {ciphertext} with key {self.key.hex()} and IV {iv.hex()}...")
8581
aes_cipher = Cipher(AES(self.key), CBC(iv), backend=default_backend())
8682
decryptor = aes_cipher.decryptor()
8783
# Use a PKCS7 unpadder to remove padding from decrypted value
@@ -113,9 +109,7 @@ def _derive_aes_passphrase_candidates(self, key_val: str) -> list[bytes]:
113109
return passphrase_candidates
114110

115111
# Decrypts encrypted config values with the provided cipher data
116-
def decrypt_encrypted_strings(
117-
self, encrypted_strings: dict[str, str]
118-
) -> dict[str, str]:
112+
def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, str]:
119113
logger.debug("Decrypting encrypted strings...")
120114
if self._key_candidates is None:
121115
self._key_candidates = self._get_aes_key_candidates(encrypted_strings)
@@ -157,9 +151,7 @@ def decrypt_encrypted_strings(
157151
last_exc = e
158152

159153
if result is None:
160-
logger.debug(
161-
f"Decryption failed for item {v}: {last_exc}; Leaving as original value..."
162-
)
154+
logger.debug(f"Decryption failed for item {v}: {last_exc}; Leaving as original value...")
163155
result = v
164156

165157
logger.debug(f"Key: {k}, Value: {result}")
@@ -174,9 +166,7 @@ def _get_aes_key_candidates(self, encrypted_strings: dict[str, str]) -> list[byt
174166
keys = []
175167

176168
# Use the key Field name to index into our existing config
177-
key_raw_value = encrypted_strings[
178-
self._payload.field_name_from_rva(self._key_rva)
179-
]
169+
key_raw_value = encrypted_strings[self._payload.field_name_from_rva(self._key_rva)]
180170
passphrase_candidates = self._derive_aes_passphrase_candidates(key_raw_value)
181171

182172
for candidate in passphrase_candidates:
@@ -196,9 +186,7 @@ def _get_aes_key_candidates(self, encrypted_strings: dict[str, str]) -> list[byt
196186
continue
197187

198188
if len(keys) == 0:
199-
raise ConfigParserException(
200-
f"Could not derive key from passphrase candidates: {passphrase_candidates}"
201-
)
189+
raise ConfigParserException(f"Could not derive key from passphrase candidates: {passphrase_candidates}")
202190
return keys
203191

204192
# Extracts the AES key and block size from the payload
@@ -222,9 +210,7 @@ def _get_aes_key_rva(self, metadata_ins_offset: int) -> int:
222210
logger.debug("Extracting AES key RVA...")
223211

224212
# Get the RVA of the method that sets up AES256 metadata
225-
metadata_method_token = self._payload.method_from_instruction_offset(
226-
metadata_ins_offset, by_token=True
227-
).token
213+
metadata_method_token = self._payload.method_from_instruction_offset(metadata_ins_offset, by_token=True).token
228214

229215
# Insert this RVA into the KEY_BASE pattern to find where the AES key
230216
# is initialized
@@ -269,9 +255,7 @@ def _get_aes_salt(self, salt_rva: int) -> bytes:
269255
#
270256
# stsfld uint8[] Client.Algorithm.Aes256::Salt
271257
# ret
272-
aes_salt_initialization = self._payload.data.find(
273-
self._PATTERN_AES_SALT_INIT % escape(salt_rva)
274-
)
258+
aes_salt_initialization = self._payload.data.find(self._PATTERN_AES_SALT_INIT % escape(salt_rva))
275259
if aes_salt_initialization == -1:
276260
raise ConfigParserException("Could not identify AES salt initialization")
277261

@@ -283,9 +267,7 @@ def _get_aes_salt(self, salt_rva: int) -> bytes:
283267
salt_op = bytes([self._payload.data[salt_op_offset]])
284268

285269
# Get the salt RVA from the 4 bytes following the initialization op
286-
salt_strings_rva_packed = self._payload.data[
287-
salt_op_offset + 1 : salt_op_offset + 5
288-
]
270+
salt_strings_rva_packed = self._payload.data[salt_op_offset + 1 : salt_op_offset + 5]
289271
salt_strings_rva = bytes_to_int(salt_strings_rva_packed)
290272

291273
# If the op is a ldstr op, just get the bytes value of the string being
@@ -300,12 +282,9 @@ def _get_aes_salt(self, salt_rva: int) -> bytes:
300282
# byte array value from the FieldRVA table
301283
elif salt_op == OPCODE_LDTOKEN:
302284
salt_size = self._payload.data[salt_op_offset - 7]
303-
salt = self._payload.byte_array_from_size_and_rva(
304-
salt_size, salt_strings_rva
305-
)
285+
salt = self._payload.byte_array_from_size_and_rva(salt_size, salt_strings_rva)
306286
else:
307287
raise ConfigParserException(f"Unknown salt opcode found: {salt_op.hex()}")
308288

309289
logger.debug(f"Found salt value: {salt.hex()}")
310290
return salt
311-

lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,13 @@ def _decrypt(self, ciphertext: bytes) -> bytes:
7474
padded_text = decryptor.update(ciphertext) + decryptor.finalize()
7575
unpadded_text = unpadder.update(padded_text) + unpadder.finalize()
7676
except Exception as e:
77-
raise ConfigParserException(
78-
f"Error decrypting ciphertext {ciphertext} with key {self.key.hex()}: {e}"
79-
)
77+
raise ConfigParserException(f"Error decrypting ciphertext {ciphertext} with key {self.key.hex()}: {e}")
8078

8179
logger.debug(f"Decryption result: {unpadded_text}")
8280
return unpadded_text
8381

8482
# Decrypts encrypted config values with the provided cipher data
85-
def decrypt_encrypted_strings(
86-
self, encrypted_strings: dict[str, str]
87-
) -> dict[str, str]:
83+
def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, str]:
8884
logger.debug("Decrypting encrypted strings...")
8985

9086
if self.key is None:

lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,7 @@ def _decode_encoded_strings(self) -> list[str]:
8383

8484
# Parses the config, adds decoded XOR strings, and returns the decoded
8585
# config
86-
def decrypt_encrypted_strings(
87-
self, encrypted_strings: dict[str, str]
88-
) -> dict[str, list[str] | str]:
86+
def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, list[str] | str]:
8987
config = {}
9088
# Pass off plaintext config to a ConfigDecryptorPlaintext
9189
ptcd = ConfigDecryptorPlaintext(self._payload)
@@ -105,18 +103,13 @@ def _get_xor_metadata(self):
105103
self._xor_strings = list(
106104
filter(
107105
None,
108-
[
109-
self._payload.user_string_from_rva(bytes_to_int(rva))
110-
for rva in xor_string_rvas
111-
],
106+
[self._payload.user_string_from_rva(bytes_to_int(rva)) for rva in xor_string_rvas],
112107
)
113108
)
114109
logger.debug(f"{len(self._xor_strings)} XOR strings found")
115110

116111
# Get the static constructor containing the XOR key
117-
xor_key_cctor = self._payload.method_from_instruction_offset(
118-
dxor_block.start(), step=1, by_token=True
119-
)
112+
xor_key_cctor = self._payload.method_from_instruction_offset(dxor_block.start(), step=1, by_token=True)
120113
xor_key_cctor_body = self._payload.method_body_from_method(xor_key_cctor)
121114

122115
# Derive the XOR key RVA and value

lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,9 @@ def __init__(self, payload: DotNetPEPayload) -> None:
116116

117117
# Calculates whether the config meets the minimum threshold for known Field
118118
# Names and returns it if it does
119-
def decrypt_encrypted_strings(
120-
self, encrypted_strings: dict[str, str]
121-
) -> dict[str, str]:
119+
def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, str]:
122120
field_names = set(encrypted_strings.keys())
123121
num_overlapping_field_names = len(KNOWN_CONFIG_FIELD_NAMES & field_names)
124122
if num_overlapping_field_names < self.MIN_THRESHOLD_MATCH:
125-
raise ConfigParserException(
126-
"Plaintext threshold of known config items not met"
127-
)
123+
raise ConfigParserException("Plaintext threshold of known config items not met")
128124
return encrypted_strings

0 commit comments

Comments
 (0)