Skip to content

Commit ef96787

Browse files
committed
use maps for compression and decompression functionality
1 parent c880117 commit ef96787

File tree

2 files changed

+140
-63
lines changed

2 files changed

+140
-63
lines changed

UnityPy/files/BundleFile.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# TODO: implement encryption for saving files
22
import re
33
from collections import namedtuple
4-
from typing import Optional, Tuple, Union
4+
from typing import Optional, Tuple, Union, cast
55

66
from .. import config
77
from ..enums import ArchiveFlags, ArchiveFlagsOld, CompressionFlags
@@ -351,12 +351,12 @@ def save_fs(self, writer: EndianBinaryWriter, data_flag: int, block_info_flag: i
351351
uncompressed_block_data_size = len(block_data)
352352

353353
switch = data_flag & 0x3F
354-
if switch == 1: # LZMA
355-
block_data = CompressionHelper.compress_lzma(block_data)
356-
elif switch in [2, 3]: # LZ4, LZ4HC
357-
block_data = CompressionHelper.compress_lz4(block_data)
358-
elif switch == 4: # LZHAM
359-
raise NotImplementedError
354+
if switch in CompressionHelper.COMPRESSION_MAP:
355+
block_data = CompressionHelper.COMPRESSION_MAP[switch](block_data)
356+
else:
357+
raise NotImplementedError(
358+
f"No compression function in the CompressionHelper.COMPRESSION_MAP for {switch}"
359+
)
360360

361361
compressed_block_data_size = len(block_data)
362362

@@ -525,16 +525,20 @@ def decompress_data(
525525
The decompressed data."""
526526
comp_flag = CompressionFlags(flags & ArchiveFlags.CompressionTypeMask)
527527

528-
if comp_flag == CompressionFlags.LZMA: # LZMA
529-
return CompressionHelper.decompress_lzma(compressed_data)
530-
elif comp_flag in [CompressionFlags.LZ4, CompressionFlags.LZ4HC]: # LZ4, LZ4HC
531-
if self.decryptor is not None and flags & 0x100:
532-
compressed_data = self.decryptor.decrypt_block(compressed_data, index)
533-
return CompressionHelper.decompress_lz4(compressed_data, uncompressed_size)
534-
elif comp_flag == CompressionFlags.LZHAM: # LZHAM
535-
raise NotImplementedError("LZHAM decompression not implemented")
528+
if self.decryptor is not None and flags & 0x100:
529+
compressed_data = self.decryptor.decrypt_block(compressed_data, index)
530+
531+
if comp_flag in CompressionHelper.DECOMPRESSION_MAP:
532+
return cast(
533+
bytes,
534+
CompressionHelper.DECOMPRESSION_MAP[comp_flag](
535+
compressed_data, uncompressed_size
536+
),
537+
)
536538
else:
537-
return compressed_data
539+
raise ValueError(
540+
f"Unknown compression! flag: {flags}, compression flag: {comp_flag.value}"
541+
)
538542

539543
def get_version_tuple(self) -> Tuple[int, int, int]:
540544
"""Returns the version as a tuple."""
Lines changed: 120 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
import gzip
22
import lzma
33
import struct
4-
from typing import Tuple
4+
from typing import Tuple, Union, Callable, Dict
55

66
import brotli
77
import lz4.block
88

9+
from ..enums.BundleFile import CompressionFlags
10+
11+
12+
ByteString = Union[bytes, bytearray, memoryview]
913
GZIP_MAGIC: bytes = b"\x1f\x8b"
1014
BROTLI_MAGIC: bytes = b"brotli"
1115

1216

1317
# LZMA
14-
def decompress_lzma(data: bytes, read_decompressed_size: bool = False) -> bytes:
18+
def decompress_lzma(data: ByteString, read_decompressed_size: bool = False) -> bytes:
1519
"""decompresses lzma-compressed data
1620
1721
:param data: compressed data
18-
:type data: bytes
22+
:type data: ByteString
1923
:raises _lzma.LZMAError: Compressed data ended before the end-of-stream marker was reached
2024
:return: uncompressed data
2125
:rtype: bytes
@@ -41,13 +45,13 @@ def decompress_lzma(data: bytes, read_decompressed_size: bool = False) -> bytes:
4145
return dec.decompress(data[data_offset:])
4246

4347

44-
def compress_lzma(data: bytes, write_decompressed_size: bool = False) -> bytes:
48+
def compress_lzma(data: ByteString, write_decompressed_size: bool = False) -> bytes:
4549
"""compresses data via lzma (unity specific)
4650
The current static settings may not be the best solution,
4751
but they are the most commonly used values and should therefore be enough for the time being.
4852
4953
:param data: uncompressed data
50-
:type data: bytes
54+
:type data: ByteString
5155
:return: compressed data
5256
:rtype: bytes
5357
"""
@@ -77,11 +81,11 @@ def compress_lzma(data: bytes, write_decompressed_size: bool = False) -> bytes:
7781

7882

7983
# LZ4
80-
def decompress_lz4(data: bytes, uncompressed_size: int) -> bytes: # LZ4M/LZ4HC
84+
def decompress_lz4(data: ByteString, uncompressed_size: int) -> bytes: # LZ4M/LZ4HC
8185
"""decompresses lz4-compressed data
8286
8387
:param data: compressed data
84-
:type data: bytes
88+
:type data: ByteString
8589
:param uncompressed_size: size of the uncompressed data
8690
:type uncompressed_size: int
8791
:raises _block.LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer.
@@ -91,11 +95,11 @@ def decompress_lz4(data: bytes, uncompressed_size: int) -> bytes: # LZ4M/LZ4HC
9195
return lz4.block.decompress(data, uncompressed_size)
9296

9397

94-
def compress_lz4(data: bytes) -> bytes: # LZ4M/LZ4HC
98+
def compress_lz4(data: ByteString) -> bytes: # LZ4M/LZ4HC
9599
"""compresses data via lz4.block
96100
97101
:param data: uncompressed data
98-
:type data: bytes
102+
:type data: ByteString
99103
:return: compressed data
100104
:rtype: bytes
101105
"""
@@ -105,77 +109,88 @@ def compress_lz4(data: bytes) -> bytes: # LZ4M/LZ4HC
105109

106110

107111
# Brotli
108-
def decompress_brotli(data: bytes) -> bytes:
112+
def decompress_brotli(data: ByteString) -> bytes:
109113
"""decompresses brotli-compressed data
110114
111115
:param data: compressed data
112-
:type data: bytes
116+
:type data: ByteString
113117
:raises brotli.error: BrotliDecompress failed
114118
:return: uncompressed data
115119
:rtype: bytes
116120
"""
117121
return brotli.decompress(data)
118122

119123

120-
def compress_brotli(data: bytes) -> bytes:
124+
def compress_brotli(data: ByteString) -> bytes:
121125
"""compresses data via brotli
122126
123127
:param data: uncompressed data
124-
:type data: bytes
128+
:type data: ByteString
125129
:return: compressed data
126130
:rtype: bytes
127131
"""
128132
return brotli.compress(data)
129133

130134

131135
# GZIP
132-
def decompress_gzip(data: bytes) -> bytes:
136+
def decompress_gzip(data: ByteString) -> bytes:
133137
"""decompresses gzip-compressed data
134138
135139
:param data: compressed data
136-
:type data: bytes
140+
:type data: ByteString
137141
:raises OSError: Not a gzipped file
138142
:return: uncompressed data
139143
:rtype: bytes
140144
"""
141145
return gzip.decompress(data)
142146

143147

144-
def compress_gzip(data: bytes) -> bytes:
148+
def compress_gzip(data: ByteString) -> bytes:
145149
"""compresses data via gzip
146150
The current static settings may not be the best solution,
147151
but they are the most commonly used values and should therefore be enough for the time being.
148152
149153
:param data: uncompressed data
150-
:type data: bytes
154+
:type data: ByteString
151155
:return: compressed data
152156
:rtype: bytes
153157
"""
154158
return gzip.compress(data)
155159

156160

157-
def chunk_based_compress(data: bytes, block_info_flag: int) -> Tuple[bytes, list]:
161+
def chunk_based_compress(
162+
data: ByteString, block_info_flag: int
163+
) -> Tuple[ByteString, list]:
158164
"""compresses AssetBundle data based on the block_info_flag
159165
LZ4/LZ4HC will be chunk-based compression
160166
161167
:param data: uncompressed data
162-
:type data: bytes
168+
:type data: ByteString
163169
:param block_info_flag: block info flag
164170
:type block_info_flag: int
165171
:return: compressed data and block info
166172
:rtype: tuple
167173
"""
168174
switch = block_info_flag & 0x3F
175+
chunk_size = None
176+
compress_func = None
169177
if switch == 0: # NONE
170178
return data, [(len(data), len(data), block_info_flag)]
171-
elif switch == 1: # LZMA
172-
chunk_size = 0xFFFFFFFF
173-
compress_func = compress_lzma
174-
elif switch in [2, 3]: # LZ4
175-
chunk_size = 0x00020000
176-
compress_func = compress_lz4
177-
elif switch == 4: # LZHAM
178-
raise NotImplementedError
179+
180+
if switch in COMPRESSION_MAP:
181+
compress_func = COMPRESSION_MAP[switch]
182+
else:
183+
raise NotImplementedError(
184+
f"No compression function in the CompressionHelper.COMPRESSION_MAP for {switch}"
185+
)
186+
187+
if switch in COMPRESSION_CHUNK_SIZE_MAP:
188+
chunk_size = COMPRESSION_CHUNK_SIZE_MAP[switch]
189+
else:
190+
raise NotImplementedError(
191+
f"No chunk size in the CompressionHelper.COMPRESSION_CHUNK_SIZE_MAP for {switch}"
192+
)
193+
179194
block_info = []
180195
uncompressed_data_size = len(data)
181196
compressed_file_data = bytearray()
@@ -184,34 +199,92 @@ def chunk_based_compress(data: bytes, block_info_flag: int) -> Tuple[bytes, list
184199
compressed_data = compress_func(data[p : p + chunk_size])
185200
if len(compressed_data) > chunk_size:
186201
compressed_file_data.extend(data[p : p + chunk_size])
187-
block_info.append((
188-
chunk_size,
189-
chunk_size,
190-
block_info_flag ^ switch,
191-
))
202+
block_info.append(
203+
(
204+
chunk_size,
205+
chunk_size,
206+
block_info_flag ^ switch,
207+
)
208+
)
192209
else:
193210
compressed_file_data.extend(compressed_data)
194-
block_info.append((
195-
chunk_size,
196-
len(compressed_data),
197-
block_info_flag,
198-
))
211+
block_info.append(
212+
(
213+
chunk_size,
214+
len(compressed_data),
215+
block_info_flag,
216+
)
217+
)
199218
p += chunk_size
200219
uncompressed_data_size -= chunk_size
201220
if uncompressed_data_size > 0:
202221
compressed_data = compress_func(data[p:])
203222
if len(compressed_data) > uncompressed_data_size:
204223
compressed_file_data.extend(data[p:])
205-
block_info.append((
206-
uncompressed_data_size,
207-
uncompressed_data_size,
208-
block_info_flag ^ switch,
209-
))
224+
block_info.append(
225+
(
226+
uncompressed_data_size,
227+
uncompressed_data_size,
228+
block_info_flag ^ switch,
229+
)
230+
)
210231
else:
211232
compressed_file_data.extend(compressed_data)
212-
block_info.append((
213-
uncompressed_data_size,
214-
len(compressed_data),
215-
block_info_flag,
216-
))
233+
block_info.append(
234+
(
235+
uncompressed_data_size,
236+
len(compressed_data),
237+
block_info_flag,
238+
)
239+
)
217240
return bytes(compressed_file_data), block_info
241+
242+
243+
def decompress_lzham(data: ByteString, uncompressed_size: int) -> bytes:
244+
raise NotImplementedError(
245+
"Custom compression or unimplemented LZHAM (removed by Unity) encountered!"
246+
)
247+
248+
249+
DECOMPRESSION_MAP: Dict[
250+
Union[int, CompressionFlags], Callable[[ByteString, int], ByteString]
251+
] = {
252+
CompressionFlags.NONE: lambda cd, _ucs: cd,
253+
CompressionFlags.LZMA: lambda cd, _ucs: decompress_lzma(cd),
254+
CompressionFlags.LZ4: lambda cd, ucs: decompress_lz4(cd, ucs),
255+
CompressionFlags.LZ4HC: lambda cd, ucs: decompress_lz4(cd, ucs),
256+
CompressionFlags.LZHAM: lambda cd, ucs: decompress_lzham(cd, ucs),
257+
}
258+
259+
COMPRESSION_MAP: Dict[
260+
Union[int, CompressionFlags], Callable[[ByteString], ByteString]
261+
] = {
262+
CompressionFlags.NONE: lambda cd: cd,
263+
CompressionFlags.LZMA: lambda cd: compress_lzma(cd),
264+
CompressionFlags.LZ4: lambda cd: compress_lz4(cd),
265+
CompressionFlags.LZ4HC: lambda cd: compress_lz4(cd),
266+
}
267+
268+
COMPRESSION_CHUNK_SIZE_MAP: Dict[Union[int, CompressionFlags], int] = {
269+
CompressionFlags.NONE: 0xFFFFFFFF,
270+
CompressionFlags.LZMA: 0xFFFFFFFF,
271+
CompressionFlags.LZ4: 0x00020000,
272+
CompressionFlags.LZ4HC: 0x00020000,
273+
}
274+
275+
276+
__all__ = (
277+
"compress_brotli",
278+
"compress_gzip",
279+
"compress_lz4",
280+
"compress_lzma",
281+
"decompress_brotli",
282+
"decompress_gzip",
283+
"decompress_lz4",
284+
"decompress_lzma",
285+
"decompress_lzham",
286+
"chunk_based_compress",
287+
"COMPRESSION_MAP",
288+
"DECOMPRESSION_MAP",
289+
"COMPRESSION_CHUNK_SIZE_MAP",
290+
)

0 commit comments

Comments
 (0)