From e4072c7e69a2e6c323ff315e2800888e0f99c6f4 Mon Sep 17 00:00:00 2001 From: eve Date: Fri, 25 Oct 2024 16:46:38 +0100 Subject: [PATCH 1/3] Generic: Add first attempt at pgdscan plugin --- volatility3/framework/plugins/pgdscan.py | 376 +++++++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 volatility3/framework/plugins/pgdscan.py diff --git a/volatility3/framework/plugins/pgdscan.py b/volatility3/framework/plugins/pgdscan.py new file mode 100644 index 0000000000..ec0d3048e5 --- /dev/null +++ b/volatility3/framework/plugins/pgdscan.py @@ -0,0 +1,376 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# + +import enum +import logging +import struct +import os +import json +import math +import struct +import hashlib +from typing import Type, Optional, List + + +from volatility3.framework import interfaces, renderers +from volatility3.framework.configuration import requirements +from volatility3.framework.interfaces import plugins +from volatility3.framework.renderers import format_hints +from volatility3.framework.layers import intel + +vollog = logging.getLogger(__name__) + + +class PageGlobalDirectoryScanner(interfaces.layers.ScannerInterface): + + def __init__( + self, + memory_size: int, + intel_class=intel.Intel32e, + ): + """Init the PageGlobalDirectoryScanner. + + Args: + memory_size: The total size in bytes of the physical memory layer to be scanned + intel_class: The layer class (e.g. intel.Intel32e) used to detmine page size, table structure etc + """ + super().__init__() + + if intel_class != intel.Intel32e: + raise NotImplementedError( + "Only intel.Intel32e is currently supported in PageGlobalDirectoryScanner" + ) + self._intel_class = intel_class + self._memory_size = memory_size + + # This is needed to correctly mask the lower bits of an entry, normally only + # calculated in the __init__ for an intel layer but we have not yet constructed + # an intel layer. + self._index_shift = int( + math.ceil(math.log2(struct.calcsize(self._intel_class._entry_format))) + ) + + # calculate the total number of entries that will existper page given the + # size of the entry. + self._number_of_pointers_per_page = ( + self._intel_class.page_size + // struct.calcsize(self._intel_class._entry_format) + ) + + # TODO: reformat this, requires that all layers use a pack format like ' 0] + if len(non_zero_pointers) != len(set(non_zero_pointers)): + return None + + # all tests passed + return khash + + def __call__(self, data: bytes, data_offset: int): + """Scans every page, to see whether this may be a valid PGD + + Args: + data: the actual data to be scanned + data_offset: the offset to where this data begins in relation to the layer being scanned + + Yields: + offset: The offset of the match + page_data: The full page data of the match + khash: A SHA1 of the high half of the match + """ + + page_size = self._intel_class.page_size + + for page_start in range( + data_offset % page_size, + len(data), + page_size, + ): + page_data = data[page_start : page_start + page_size] + + # validate page as being a likely pgd + khash = self._validate_page_table(page_data) + + # if a likely valid PGD was located, and therefore a khash calculated, yield the results + if khash: + if page_start + data_offset < self._memory_size: + yield ( + page_start + data_offset, + data[page_start : page_start + self._intel_class.page_size], + khash, + ) + + +class PGDScan(plugins.PluginInterface): + """Heuristically scans for Page Global Directories and generates volatility configs for them, + it can also dump the memeory for the PGDs that have been located. Not designed to correctly + recover PGD for virtual machines - please use the vmscan plugin. + + Currently only supports 64-bit Intel32e architectures. + + This plugin can allow analysis of virtual memeory when an ISF is unaviabale.""" + + _required_framework_version = (2, 2, 0) + MAXSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + # TODO: perhaps allow user to provide a needle, e.g. "/bin/bash" and only return + # the layers where that needle hits? + return [ + requirements.TranslationLayerRequirement( + name="primary", description="Physical base memory layer" + ), + requirements.ListRequirement( + name="offset", + description="Only scan these selected pages. Useful for dumping out only a sinlge PGD", + element_type=int, + optional=True, + ), + requirements.BooleanRequirement( + name="save-configs", + description="Save configuration JSON file to a file for each recovered PGD", + optional=True, + default=False, + ), + requirements.BooleanRequirement( + name="dump", + description="Extract private memory regions for recovered PGDs", + optional=True, + default=False, + ), + requirements.IntRequirement( + name="maxsize", + description="Maximum size for dumped memory regions " + "(all the bigger sections will be ignored)", + default=cls.MAXSIZE_DEFAULT, + optional=True, + ), + ] + + def _dump( + self, + context: interfaces.context.ContextInterface, + layer_name: str, + start: int, + size: int, + open_method: Type[interfaces.plugins.FileHandlerInterface], + maxsize: int = MAXSIZE_DEFAULT, + ) -> Optional[interfaces.plugins.FileHandlerInterface]: + """Extracts the complete data for a mapping as a FileInterface. + + Args: + context: The context to retrieve required elements from + layer_name: the name of the layer to dump from + start: The start virtual address from the layer to dump + size: The size of data within the layer to dump + open_method: class to provide context manager for opening the file + maxsize: Max size of section (default MAXSIZE_DEFAULT) + + Returns: + An open FileInterface object containing the complete data for the mapping or None in the case of failure + """ + + layer = context.layers[layer_name] + + # check if vm_size is larger than the maxsize limit, and therefore is not saved out. + if maxsize <= size: + vollog.warning( + f"Skip virtual memory dump for {start:#x} as {size} is larger than maxsize limit of {maxsize}" + ) + return None + + file_name = f"pgd.{layer._page_map_offset:#x}.start.{start:#x}.dmp" + try: + file_handle = open_method(file_name) + chunk_size = 1024 * 1024 * 10 + offset = start + while offset < start + size: + to_read = min(chunk_size, start + size - offset) + data = layer.read(offset, to_read, pad=True) + file_handle.write(data) + offset += to_read + except Exception as excp: + vollog.debug(f"Unable to dump virtual memory {file_name}: {excp}") + return None + return file_handle + + def _generator(self): + # get primary layer + layer = self.context.layers[self.config["primary"]] + + # Try to move down to the highest physical layer + if layer.config.get("memory_layer"): + layer = self.context.layers[layer.config["memory_layer"]] + + # TODO: test and support other intel layers, either automatically + # detecting the likely type or allowing the user to provide it as + # a requirement option. + intel_class = intel.Intel32e + + # get max layer address, this is used to validate possible PGDs as they + # cannot have pointers beyond the end of physical memory + maximum_address = layer.maximum_address + + offsets = self.config.get("offset") + if offsets: + sections = [(offset, intel_class.page_size) for offset in offsets] + else: + sections = None + + # store results of the scanning in a lookup so that the most frequent result + # can then be shown to the user. + khash_lookup = {} + + # Run the scan + for pgd_offset, _pgd_data, khash in layer.scan( + self.context, + PageGlobalDirectoryScanner(maximum_address, intel_class=intel_class), + self._progress_callback, + sections=sections, + ): + if khash not in khash_lookup: + khash_lookup[khash] = [] + khash_lookup[khash].append(pgd_offset) + + # join is used a lot when building temp layers, this is simply + # here to make the code a little easier to read + join = interfaces.configuration.path_join + + # find the most common khash, given that all user processes + # share the same kernel it is the most common khash that will + # locate the likely pgds + + max_pgd_count = 0 + most_common_khash = "" + for khash, pgds in khash_lookup.items(): + if len(pgds) > max_pgd_count: + max_pgd_count = len(pgds) + most_common_khash = khash + + for pgd_offset in khash_lookup[most_common_khash]: + + # build a new layer for this likely pgd + temp_context = self.context.clone() + temp_layer_name = self.context.layers.free_layer_name("IntelLayer") + # temp_layer_name = "primary" # I would like to use the name primary but not sure how? + config_path = join("IntelHelper", temp_layer_name) + temp_context.config[join(config_path, "memory_layer")] = "memory_layer" + temp_context.config[join(config_path, "page_map_offset")] = pgd_offset + temp_layer = intel_class( + temp_context, + config_path=config_path, + name=temp_layer_name, + ) + temp_context.add_layer(temp_layer) + + config_fname = "-" + if self.config.get("save-configs"): + # TODO: Fix this. It seems like an ungly hack and must to the wrong way + # to make a new config with a new primary layer? + conf = {} + for key, value in dict(temp_layer.build_configuration()).items(): + conf[f"primary.{key}"] = value + # finished hacking config + + # save config to disk + config_fname = f"pgd.{pgd_offset:#x}.json" + with open(config_fname, "w") as f: + json.dump( + conf, + f, + sort_keys=True, + indent=2, + ) + f.write("\n") + + # calculate the total size of the user mem + user_max_addr = 1 << (temp_layer._maxvirtaddr - 1) + + # get mapping for this temp layer + temp_layer_mapping = [ + (offset, sublength) + for ( + offset, + sublength, + _mapped_offset, + _mapped_length, + _layer, + ) in temp_layer.mapping(0, user_max_addr, ignore_errors=True) + ] + + # calculate the total size in bytes for the user part of the layer + total_user_size = sum( + [sublength for _offset, sublength in temp_layer_mapping] + ) + + # display result to user + yield (0, (format_hints.Hex(pgd_offset), total_user_size, config_fname)) + + # dump put memory if requested + # TODO: perhaps merge regions that are quite close together, if might be more useful to + # have fewer files with a few extra blank pages than to have the highly accurate result + # of 100s of tiny regions saved to there own files. + if self.config.get("dump"): + for offset, sublength in temp_layer_mapping: + self._dump( + temp_context, temp_layer.name, offset, sublength, self.open + ) + + def run(self): + # TODO: Implement scanning for 32bit PGDs! + + return renderers.TreeGrid( + [("PGD offset", format_hints.Hex), ("size", int), ("config", str)], + self._generator(), + ) From 6195eeb64b8746202924d723e659e71dd8830648 Mon Sep 17 00:00:00 2001 From: eve Date: Tue, 12 Nov 2024 06:34:53 +0000 Subject: [PATCH 2/3] Add _merge_mappings_with_gap to pgdscan --- volatility3/framework/plugins/pgdscan.py | 61 +++++++++++++++++++++--- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/volatility3/framework/plugins/pgdscan.py b/volatility3/framework/plugins/pgdscan.py index ec0d3048e5..96d1a90b87 100644 --- a/volatility3/framework/plugins/pgdscan.py +++ b/volatility3/framework/plugins/pgdscan.py @@ -10,7 +10,7 @@ import math import struct import hashlib -from typing import Type, Optional, List +from typing import Type, Optional, List, Tuple from volatility3.framework import interfaces, renderers @@ -161,7 +161,12 @@ class PGDScan(plugins.PluginInterface): This plugin can allow analysis of virtual memeory when an ISF is unaviabale.""" _required_framework_version = (2, 2, 0) - MAXSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb + MAXSIZE_DEFAULT = ( + 1024 * 1024 * 1024 + ) # 1 Gb, the largest region to be saved when using --dump + MAX_GAP = ( + 4096 * 8 + ) # the max gap between mapped pages for them to be considered as one contagious block @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: @@ -223,7 +228,7 @@ def _dump( layer = context.layers[layer_name] - # check if vm_size is larger than the maxsize limit, and therefore is not saved out. + # check if size is larger than the maxsize limit, and therefore is not saved out. if maxsize <= size: vollog.warning( f"Skip virtual memory dump for {start:#x} as {size} is larger than maxsize limit of {maxsize}" @@ -245,6 +250,47 @@ def _dump( return None return file_handle + def _merge_mappings_with_gap(self, mappings: List[Tuple[int, int]], gap: int): + """ + Merge overlapping or consecutive ranges based on a specified gap. + + Args: + mappings (list of tuples): List of tuples where each tuple is (start, length). + gap (int): The gap that determines if two ranges should be merged. + + Returns: + list of tuples: The merged mappings, where each tuple is (start, length) + """ + + # Sort ranges by the start value, mappings should already be in order + # but for this to work they MUST be in order so sorting + sorted_mappings = sorted(mappings, key=lambda x: x[0]) + + merged_mappings = [] + + # Start with the first range + current_start, current_length = sorted_mappings[0] + current_end = current_start + current_length + + for start, length in sorted_mappings[1:]: + next_end = start + length + + # If the current range and the next range are within the gap, merge them + if start <= current_end + gap: + # Extend the current range to include the next one + current_end = max(current_end, next_end) + else: + # If not, add the current range and start a new one + merged_mappings.append((current_start, current_end - current_start)) + current_start, current_length = start, length + current_end = next_end + + # Add the last range + merged_mappings.append((current_start, current_end - current_start)) + + # Return the merged ranges with the gap considered + return merged_mappings + def _generator(self): # get primary layer layer = self.context.layers[self.config["primary"]] @@ -358,10 +404,13 @@ def _generator(self): yield (0, (format_hints.Hex(pgd_offset), total_user_size, config_fname)) # dump put memory if requested - # TODO: perhaps merge regions that are quite close together, if might be more useful to - # have fewer files with a few extra blank pages than to have the highly accurate result - # of 100s of tiny regions saved to there own files. if self.config.get("dump"): + + # merge mappings for this temp layer so that contagious blocks are saved to a single file + temp_layer_mapping = self._merge_mappings_with_gap( + temp_layer_mapping, self.MAX_GAP + ) + for offset, sublength in temp_layer_mapping: self._dump( temp_context, temp_layer.name, offset, sublength, self.open From 18ecbc722fa16f8352de1ae4a068ad5aec9797a7 Mon Sep 17 00:00:00 2001 From: eve Date: Tue, 12 Nov 2024 06:38:17 +0000 Subject: [PATCH 3/3] Fix unused an duplicate imports in pgdscan --- volatility3/framework/plugins/pgdscan.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/volatility3/framework/plugins/pgdscan.py b/volatility3/framework/plugins/pgdscan.py index 96d1a90b87..eab19e27bf 100644 --- a/volatility3/framework/plugins/pgdscan.py +++ b/volatility3/framework/plugins/pgdscan.py @@ -2,13 +2,10 @@ # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # -import enum import logging import struct -import os import json import math -import struct import hashlib from typing import Type, Optional, List, Tuple