python prototype
detection for - zip - ambarella - flatten device tree - squashfs
This commit is contained in:
13
python/matcher/__init__.py
Normal file
13
python/matcher/__init__.py
Normal file
@ -0,0 +1,13 @@
|
||||
# common archive formats
|
||||
from .zip import Zip
|
||||
|
||||
# special firmware formats
|
||||
from .ambarella import Ambarella
|
||||
from .esp32 import Esp32
|
||||
|
||||
# idk, common formats?
|
||||
from .flatten_device_tree import FlattenDeviceTree
|
||||
|
||||
# file system formats
|
||||
from .squashfs import SquashFS
|
||||
from .ubifs import UbiFS
|
38
python/matcher/ambarella.py
Normal file
38
python/matcher/ambarella.py
Normal file
@ -0,0 +1,38 @@
|
||||
import io
|
||||
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class Ambarella(SignatureMatcher):
|
||||
def __init__(self, file):
|
||||
self.name = "Ambarella Firmware Section"
|
||||
self.signature = b'\x90\xeb\x24\xa3'
|
||||
super().__init__(file)
|
||||
|
||||
def is_valid(self):
|
||||
for match in self.search():
|
||||
# walk back for the firmware section header
|
||||
start = match - 4*5
|
||||
header = io.BytesIO(self.file[start:start+228+4*6])
|
||||
crc = header.read(4)
|
||||
version_major = header.read(2)
|
||||
version_minor = header.read(2)
|
||||
filesize = header.read(4)
|
||||
memory = header.read(4)
|
||||
flag = header.read(4)
|
||||
magic = header.read(4)
|
||||
zeros = header.read()
|
||||
|
||||
# for ambarella firmware, the magic is placed at offset 20
|
||||
# and after the magic, 228 bytes of \x00 must be placed
|
||||
# then crc value must match the CRC(<filesize> bytes after header)
|
||||
|
||||
|
||||
is_matched = magic == self.signature
|
||||
is_matched &= zeros == bytes(228)
|
||||
# is_matched &= crc == self.crc(self.file[start + 228+4*6:start + filesize + 228+4*6])
|
||||
if is_matched:
|
||||
# add the header offset to list of matches
|
||||
filesize = int.from_bytes(filesize, 'little')
|
||||
self.matches += [Match(start, 228 + 4*6 + filesize)]
|
||||
|
||||
return len(self.matches) != 0
|
4
python/matcher/esp32.py
Normal file
4
python/matcher/esp32.py
Normal file
@ -0,0 +1,4 @@
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class Esp32(SignatureMatcher):
|
||||
pass
|
28
python/matcher/flatten_device_tree.py
Normal file
28
python/matcher/flatten_device_tree.py
Normal file
@ -0,0 +1,28 @@
|
||||
import io
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class FlattenDeviceTree(SignatureMatcher):
|
||||
def __init__(self, file):
|
||||
self.name = "Flatten Device Tree"
|
||||
self.signature = b'\xd0\x0d\xfe\xed'
|
||||
super().__init__(file)
|
||||
|
||||
def is_valid(self):
|
||||
for match in self.search():
|
||||
start = match
|
||||
header = io.BytesIO(self.file[start:start+4*10])
|
||||
magic = header.read(4)
|
||||
totalsize = header.read(4)
|
||||
off_dt_struct = header.read(4)
|
||||
off_dt_strings = header.read(4)
|
||||
off_mem_rsvmap = header.read(4)
|
||||
version = header.read(4)
|
||||
last_comp_version = header.read(4)
|
||||
boot_cpuid_phys = header.read(4)
|
||||
size_dt_strings = header.read(4)
|
||||
size_dt_struct = header.read(4)
|
||||
|
||||
totalsize = int.from_bytes(totalsize, 'little')
|
||||
self.matches += [Match(start, totalsize)]
|
||||
|
||||
return len(self.matches) != 0
|
28
python/matcher/matcher.py
Normal file
28
python/matcher/matcher.py
Normal file
@ -0,0 +1,28 @@
|
||||
class Match:
|
||||
# store match data, whatever it is
|
||||
def __init__(self, offset, length, data = {}):
|
||||
self.offset = offset
|
||||
self.length = length
|
||||
self.data = data
|
||||
|
||||
def __repr__(self):
|
||||
return f"offset:{hex(self.offset)} size:{hex(self.length)} data:{self.data}"
|
||||
|
||||
class SignatureMatcher:
|
||||
__slot__ = ['name', 'signature', 'file', 'matches']
|
||||
def __init__(self, file):
|
||||
self.file = open(file, 'rb').read()
|
||||
self.matches = []
|
||||
|
||||
# util function
|
||||
def search(self):
|
||||
i = 0
|
||||
while True:
|
||||
idx = self.file.find(self.signature, i)
|
||||
if idx == -1:
|
||||
break
|
||||
i = idx + 1
|
||||
yield idx
|
||||
|
||||
def is_valid(self):
|
||||
return False
|
50
python/matcher/squashfs.py
Normal file
50
python/matcher/squashfs.py
Normal file
@ -0,0 +1,50 @@
|
||||
import io
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class SquashFS(SignatureMatcher):
|
||||
"""
|
||||
Finding a Squash file system
|
||||
https://dr-emann.github.io/squashfs/
|
||||
|
||||
superblock
|
||||
-> compression options
|
||||
-> data blocks & fragments
|
||||
-> inode table
|
||||
-> directory table
|
||||
-> fragment table
|
||||
-> export table
|
||||
-> uid/gid lookup table
|
||||
-> xattr table
|
||||
"""
|
||||
def __init__(self, file):
|
||||
self.name = "SquashFS"
|
||||
self.signature = b'hsqs'
|
||||
super().__init__(file)
|
||||
|
||||
def is_valid(self):
|
||||
for match in self.search():
|
||||
start = match
|
||||
header = io.BytesIO(self.file[start:start+ 4*5 + 2*6 + 8*8])
|
||||
magic = header.read(4)
|
||||
inode = header.read(4)
|
||||
modification_time = header.read(4)
|
||||
block_size = header.read(4)
|
||||
fragment_entry_count = header.read(4)
|
||||
compression_id = header.read(2)
|
||||
block_log = header.read(2)
|
||||
flags = header.read(2)
|
||||
id_count = header.read(2)
|
||||
version_major = header.read(2)
|
||||
version_minor = header.read(2)
|
||||
root_inode_ref = header.read(8)
|
||||
bytes_used = header.read(8)
|
||||
id_table_start = header.read(8)
|
||||
xattr_id_table_start = header.read(8)
|
||||
inode_table_start = header.read(8)
|
||||
directory_table_start = header.read(8)
|
||||
fragment_table_start = header.read(8)
|
||||
export_table_start = header.read(8)
|
||||
|
||||
# size how to get?
|
||||
self.matches += [Match(start, 0)]
|
||||
return len(self.matches) != 0
|
5
python/matcher/ubifs.py
Normal file
5
python/matcher/ubifs.py
Normal file
@ -0,0 +1,5 @@
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class UbiFS(SignatureMatcher):
|
||||
pass
|
||||
|
44
python/matcher/zip.py
Normal file
44
python/matcher/zip.py
Normal file
@ -0,0 +1,44 @@
|
||||
import io
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class Zip(SignatureMatcher):
|
||||
"""
|
||||
Zip files are read from the bottom
|
||||
The signature PK is the local file header
|
||||
|
||||
https://medium.com/@felixstridsberg/the-zip-file-format-6c8a160d1c34
|
||||
"""
|
||||
def __init__(self, file):
|
||||
self.name = "Zip"
|
||||
self.signature = b'PK\x03\x04'
|
||||
super().__init__(file)
|
||||
|
||||
def is_valid(self):
|
||||
for match in self.search():
|
||||
start = match
|
||||
header = io.BytesIO(self.file[start:start+4*4 + 2*7])
|
||||
magic = header.read(4)
|
||||
min_version = header.read(2)
|
||||
bitflag = header.read(2)
|
||||
compression_method = header.read(2)
|
||||
last_modification_time = header.read(2)
|
||||
last_modification_data = header.read(2)
|
||||
crc = header.read(4)
|
||||
compressed_size = header.read(4)
|
||||
uncompressed_size = header.read(4)
|
||||
file_name_length = header.read(2)
|
||||
extra_field_length = header.read(2)
|
||||
|
||||
file_name_length = int.from_bytes(file_name_length, 'little')
|
||||
extra_field_length = int.from_bytes(extra_field_length, 'little')
|
||||
compressed_size = int.from_bytes(compressed_size, 'little')
|
||||
|
||||
header_size = 4*4 + 2*7
|
||||
data = {
|
||||
'name': self.file[start+header_size:start+header_size+file_name_length]
|
||||
}
|
||||
|
||||
size = 4*4 + 2*7 + file_name_length + extra_field_length + compressed_size
|
||||
self.matches += [Match(start, size, data)]
|
||||
|
||||
return len(self.matches) != 0
|
Reference in New Issue
Block a user