diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000..61f2dc9 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1 @@ +**/__pycache__/ diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..156a3d7 --- /dev/null +++ b/python/README.md @@ -0,0 +1,25 @@ +This python project is the script versions for Firmex, serves as early development. I want to write the tool in a better structured language then Python, but Python is better for prototype phase. + +# Firmware Extraction + +The main goal for this project is to detect and extract a firmware file. The firmware is often a compressed file with an OS and a filesystem. Although the format of the firmware might be vary across different vendors. + +## How firmware works + +Usually the firmware is stored in a flash device, basically a storage device with very limited capacity. This storage device can be writable or not writable, depending on the type of device. But in modern days, writable flash are more common, as it gives the ability to update the firmware. + +When the microcontroller boots up, the CPU processor loads the flash data in and executes following the CPU specification. For example, the CPU specifies that the flash data is segmented into several regions, and the execution starts at a specific region. + +Usually the microcontroller is also equipped with a MCU (memory controller unit). When the CPU access the memory, either by store or load, the CPU goes through the MCU and the MCU decides which memory device it and where in the device it should use. This allows for virtual memory. + +## Firmware contents + +The typical firmware usually contains an Operating System, and a (compressed) file system. There are firmware without an OS, especially those that are for very small devices performing a certain task, thus not needing a fully working OS. The file system is often compressed and provides the Linux OS with binary files. If the system is not using Linux, then a Real Time OS (RTOS) might be used. There are several RTOS out there, most notable FreeRTOS. + +If these typical firmware are met, then trying to recognize their file system and extract (uncompress) the file system gives binaries files inside. However, some firmware are not designed like that. Some common file system are ... + +Some vendors might build (package) their firmware differently, and some might use a different technique to update the firmware, then a full firmware file is not used, rather it could be some weird format that the currently running system (bootloader?) can detect, extract, and replace. + +## Firmware Analysis + +The most common way to analyze a firmware is by using Binwalk. The fork of Binwalk to use is OSPG, which is still being maintained. Binwalk searches for magic signatures. These signatures can be static byte sequences, are logical byte sequences. The resulting detection of Binwalk provides where and length of the file found. Binwalk also supports extraction of found detections. However, Binwalk detections sometimes come out wrong. Because it only reports what signatures are matched, without checking if they are valid. diff --git a/python/main.py b/python/main.py new file mode 100644 index 0000000..3673159 --- /dev/null +++ b/python/main.py @@ -0,0 +1,84 @@ +import argparse +import os +import io + +import matcher + +signatures = [ + matcher.Zip, + matcher.Ambarella, + matcher.SquashFS, + matcher.FlattenDeviceTree +] + +def detect(args): + print('detecting', args.file) + matches = [] + + # recursive? + for matcher in signatures: + m = matcher(args.file) + if m.is_valid(): + matches += [m] + + for filetype in matches: + print("detected", filetype.name) + for m in filetype.matches: + print(">", m) + + return matches + + +def extract(args): + pass + +def main(): + parser = argparse.ArgumentParser(description='Program for detecting or extracting data.') + + subparsers = parser.add_subparsers(dest='command') + + # Subparser for the 'detect' command + detect_parser = subparsers.add_parser('detect', help='Detect data in a file.') + detect_parser.add_argument('file', help='Input file') + detect_parser.add_argument('--isa', action='store_true', help='Perform ISA detection') + + # Subparser for the 'extract' command + extract_parser = subparsers.add_parser('extract', help='Extract data from a file.') + extract_parser.add_argument('file', help='Input file') + extract_parser.add_argument('--dry', action='store_true', help='Perform a dry run without extracting') + + args = parser.parse_args() + + if args.command is None: + print('no command given') + exit(1) + + if args.file is None: + print('require file') + exit(1) + + if not os.path.exists(args.file) or os.path.isdir(args.file): + print('please provide an existing file') + exit(1) + + + if args.command == 'detect': + detect(args) + # if args.isa: + # # Perform ISA detection on the file + # print('Performing ISA detection on:', args.file) + # else: + # parser.print_help() + elif args.command == 'extract': + extract(args) + # if args.dry: + # # Perform a dry run without extracting + # print('Dry run extraction from:', args.file) + # else: + # # Extract data from the file + # print('Extracting data from:', args.file) + else: + parser.print_help() + +if __name__ == '__main__': + main() diff --git a/python/matcher/__init__.py b/python/matcher/__init__.py new file mode 100644 index 0000000..f4c98f7 --- /dev/null +++ b/python/matcher/__init__.py @@ -0,0 +1,13 @@ +# common archive formats +from .zip import Zip + +# special firmware formats +from .ambarella import Ambarella +from .esp32 import Esp32 + +# idk, common formats? +from .flatten_device_tree import FlattenDeviceTree + +# file system formats +from .squashfs import SquashFS +from .ubifs import UbiFS diff --git a/python/matcher/ambarella.py b/python/matcher/ambarella.py new file mode 100644 index 0000000..27347d1 --- /dev/null +++ b/python/matcher/ambarella.py @@ -0,0 +1,38 @@ +import io + +from .matcher import SignatureMatcher, Match + +class Ambarella(SignatureMatcher): + def __init__(self, file): + self.name = "Ambarella Firmware Section" + self.signature = b'\x90\xeb\x24\xa3' + super().__init__(file) + + def is_valid(self): + for match in self.search(): + # walk back for the firmware section header + start = match - 4*5 + header = io.BytesIO(self.file[start:start+228+4*6]) + crc = header.read(4) + version_major = header.read(2) + version_minor = header.read(2) + filesize = header.read(4) + memory = header.read(4) + flag = header.read(4) + magic = header.read(4) + zeros = header.read() + + # for ambarella firmware, the magic is placed at offset 20 + # and after the magic, 228 bytes of \x00 must be placed + # then crc value must match the CRC( bytes after header) + + + is_matched = magic == self.signature + is_matched &= zeros == bytes(228) + # is_matched &= crc == self.crc(self.file[start + 228+4*6:start + filesize + 228+4*6]) + if is_matched: + # add the header offset to list of matches + filesize = int.from_bytes(filesize, 'little') + self.matches += [Match(start, 228 + 4*6 + filesize)] + + return len(self.matches) != 0 diff --git a/python/matcher/esp32.py b/python/matcher/esp32.py new file mode 100644 index 0000000..564712f --- /dev/null +++ b/python/matcher/esp32.py @@ -0,0 +1,4 @@ +from .matcher import SignatureMatcher, Match + +class Esp32(SignatureMatcher): + pass diff --git a/python/matcher/flatten_device_tree.py b/python/matcher/flatten_device_tree.py new file mode 100644 index 0000000..47c92a3 --- /dev/null +++ b/python/matcher/flatten_device_tree.py @@ -0,0 +1,28 @@ +import io +from .matcher import SignatureMatcher, Match + +class FlattenDeviceTree(SignatureMatcher): + def __init__(self, file): + self.name = "Flatten Device Tree" + self.signature = b'\xd0\x0d\xfe\xed' + super().__init__(file) + + def is_valid(self): + for match in self.search(): + start = match + header = io.BytesIO(self.file[start:start+4*10]) + magic = header.read(4) + totalsize = header.read(4) + off_dt_struct = header.read(4) + off_dt_strings = header.read(4) + off_mem_rsvmap = header.read(4) + version = header.read(4) + last_comp_version = header.read(4) + boot_cpuid_phys = header.read(4) + size_dt_strings = header.read(4) + size_dt_struct = header.read(4) + + totalsize = int.from_bytes(totalsize, 'little') + self.matches += [Match(start, totalsize)] + + return len(self.matches) != 0 diff --git a/python/matcher/matcher.py b/python/matcher/matcher.py new file mode 100644 index 0000000..7b287c2 --- /dev/null +++ b/python/matcher/matcher.py @@ -0,0 +1,28 @@ +class Match: + # store match data, whatever it is + def __init__(self, offset, length, data = {}): + self.offset = offset + self.length = length + self.data = data + + def __repr__(self): + return f"offset:{hex(self.offset)} size:{hex(self.length)} data:{self.data}" + +class SignatureMatcher: + __slot__ = ['name', 'signature', 'file', 'matches'] + def __init__(self, file): + self.file = open(file, 'rb').read() + self.matches = [] + + # util function + def search(self): + i = 0 + while True: + idx = self.file.find(self.signature, i) + if idx == -1: + break + i = idx + 1 + yield idx + + def is_valid(self): + return False diff --git a/python/matcher/squashfs.py b/python/matcher/squashfs.py new file mode 100644 index 0000000..ad35fb1 --- /dev/null +++ b/python/matcher/squashfs.py @@ -0,0 +1,50 @@ +import io +from .matcher import SignatureMatcher, Match + +class SquashFS(SignatureMatcher): + """ + Finding a Squash file system + https://dr-emann.github.io/squashfs/ + + superblock + -> compression options + -> data blocks & fragments + -> inode table + -> directory table + -> fragment table + -> export table + -> uid/gid lookup table + -> xattr table + """ + def __init__(self, file): + self.name = "SquashFS" + self.signature = b'hsqs' + super().__init__(file) + + def is_valid(self): + for match in self.search(): + start = match + header = io.BytesIO(self.file[start:start+ 4*5 + 2*6 + 8*8]) + magic = header.read(4) + inode = header.read(4) + modification_time = header.read(4) + block_size = header.read(4) + fragment_entry_count = header.read(4) + compression_id = header.read(2) + block_log = header.read(2) + flags = header.read(2) + id_count = header.read(2) + version_major = header.read(2) + version_minor = header.read(2) + root_inode_ref = header.read(8) + bytes_used = header.read(8) + id_table_start = header.read(8) + xattr_id_table_start = header.read(8) + inode_table_start = header.read(8) + directory_table_start = header.read(8) + fragment_table_start = header.read(8) + export_table_start = header.read(8) + + # size how to get? + self.matches += [Match(start, 0)] + return len(self.matches) != 0 diff --git a/python/matcher/ubifs.py b/python/matcher/ubifs.py new file mode 100644 index 0000000..5ed8f0f --- /dev/null +++ b/python/matcher/ubifs.py @@ -0,0 +1,5 @@ +from .matcher import SignatureMatcher, Match + +class UbiFS(SignatureMatcher): + pass + diff --git a/python/matcher/zip.py b/python/matcher/zip.py new file mode 100644 index 0000000..b561c96 --- /dev/null +++ b/python/matcher/zip.py @@ -0,0 +1,44 @@ +import io +from .matcher import SignatureMatcher, Match + +class Zip(SignatureMatcher): + """ + Zip files are read from the bottom + The signature PK is the local file header + + https://medium.com/@felixstridsberg/the-zip-file-format-6c8a160d1c34 + """ + def __init__(self, file): + self.name = "Zip" + self.signature = b'PK\x03\x04' + super().__init__(file) + + def is_valid(self): + for match in self.search(): + start = match + header = io.BytesIO(self.file[start:start+4*4 + 2*7]) + magic = header.read(4) + min_version = header.read(2) + bitflag = header.read(2) + compression_method = header.read(2) + last_modification_time = header.read(2) + last_modification_data = header.read(2) + crc = header.read(4) + compressed_size = header.read(4) + uncompressed_size = header.read(4) + file_name_length = header.read(2) + extra_field_length = header.read(2) + + file_name_length = int.from_bytes(file_name_length, 'little') + extra_field_length = int.from_bytes(extra_field_length, 'little') + compressed_size = int.from_bytes(compressed_size, 'little') + + header_size = 4*4 + 2*7 + data = { + 'name': self.file[start+header_size:start+header_size+file_name_length] + } + + size = 4*4 + 2*7 + file_name_length + extra_field_length + compressed_size + self.matches += [Match(start, size, data)] + + return len(self.matches) != 0