Compare commits
2 Commits
d50ec846c2
...
main
Author | SHA1 | Date | |
---|---|---|---|
ad4857317f | |||
558deed4c8 |
@ -2,13 +2,18 @@ import argparse
|
|||||||
import os
|
import os
|
||||||
import io
|
import io
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import matcher
|
import matcher
|
||||||
|
|
||||||
signatures = [
|
signatures = [
|
||||||
matcher.Zip,
|
matcher.Zip,
|
||||||
matcher.Ambarella,
|
matcher.Ambarella,
|
||||||
matcher.SquashFS,
|
matcher.SquashFS,
|
||||||
matcher.FlattenDeviceTree
|
matcher.RomFS,
|
||||||
|
matcher.CromFS,
|
||||||
|
matcher.FlattenDeviceTree,
|
||||||
|
matcher.ELF,
|
||||||
]
|
]
|
||||||
|
|
||||||
def detect(args):
|
def detect(args):
|
||||||
@ -25,13 +30,22 @@ def detect(args):
|
|||||||
print("detected", filetype.name)
|
print("detected", filetype.name)
|
||||||
for m in filetype.matches:
|
for m in filetype.matches:
|
||||||
print(">", m)
|
print(">", m)
|
||||||
filetype.view(m)
|
filetype.view(m, None)
|
||||||
|
|
||||||
return matches
|
return matches
|
||||||
|
|
||||||
|
|
||||||
def extract(args):
|
def extract(args):
|
||||||
pass
|
file = Path(args.file)
|
||||||
|
folder = file.parent / (file.name + "_extracted")
|
||||||
|
folder.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
matches = detect(args)
|
||||||
|
for filetype in matches:
|
||||||
|
print("detected", filetype.name)
|
||||||
|
for m in filetype.matches:
|
||||||
|
print(">", m)
|
||||||
|
filetype.view(m, folder)
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Program for detecting or extracting data.')
|
parser = argparse.ArgumentParser(description='Program for detecting or extracting data.')
|
||||||
@ -65,19 +79,8 @@ def main():
|
|||||||
|
|
||||||
if args.command == 'detect':
|
if args.command == 'detect':
|
||||||
detect(args)
|
detect(args)
|
||||||
# if args.isa:
|
|
||||||
# # Perform ISA detection on the file
|
|
||||||
# print('Performing ISA detection on:', args.file)
|
|
||||||
# else:
|
|
||||||
# parser.print_help()
|
|
||||||
elif args.command == 'extract':
|
elif args.command == 'extract':
|
||||||
extract(args)
|
extract(args)
|
||||||
# if args.dry:
|
|
||||||
# # Perform a dry run without extracting
|
|
||||||
# print('Dry run extraction from:', args.file)
|
|
||||||
# else:
|
|
||||||
# # Extract data from the file
|
|
||||||
# print('Extracting data from:', args.file)
|
|
||||||
else:
|
else:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
|
|
||||||
|
@ -11,3 +11,8 @@ from .flatten_device_tree import FlattenDeviceTree
|
|||||||
# file system formats
|
# file system formats
|
||||||
from .squashfs import SquashFS
|
from .squashfs import SquashFS
|
||||||
from .ubifs import UbiFS
|
from .ubifs import UbiFS
|
||||||
|
from .romfs import RomFS
|
||||||
|
from .cromfs import CromFS
|
||||||
|
|
||||||
|
# common executable formats
|
||||||
|
from .elf import ELF
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
import os
|
import os
|
||||||
import io
|
import io
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import lzf
|
||||||
|
|
||||||
from .matcher import SignatureMatcher, Match
|
from .matcher import SignatureMatcher, Match
|
||||||
|
|
||||||
class CromFS(SignatureMatcher):
|
class CromFS(SignatureMatcher):
|
||||||
@ -50,7 +54,19 @@ class CromFS(SignatureMatcher):
|
|||||||
|
|
||||||
return len(self.matches) != 0
|
return len(self.matches) != 0
|
||||||
|
|
||||||
def view(self, match):
|
def view(self, match, root_folder):
|
||||||
|
def extract(path, is_folder):
|
||||||
|
if root_folder is None:
|
||||||
|
return
|
||||||
|
p = root_folder / ('_' + hex(match.offset))
|
||||||
|
p.mkdir(exist_ok=True)
|
||||||
|
p = p / path
|
||||||
|
if is_folder:
|
||||||
|
p.mkdir(exist_ok=True)
|
||||||
|
else:
|
||||||
|
p.touch(exist_ok=True)
|
||||||
|
return open(p, 'wb')
|
||||||
|
|
||||||
as_num = lambda x: int.from_bytes(x, 'little')
|
as_num = lambda x: int.from_bytes(x, 'little')
|
||||||
|
|
||||||
root = match.data['root']
|
root = match.data['root']
|
||||||
@ -77,6 +93,31 @@ class CromFS(SignatureMatcher):
|
|||||||
buffer.seek(old, os.SEEK_SET)
|
buffer.seek(old, os.SEEK_SET)
|
||||||
return s.decode()
|
return s.decode()
|
||||||
|
|
||||||
|
def read_file_contents(buffer, at=None, recover=False, file=None, total_size=0):
|
||||||
|
old = buffer.tell()
|
||||||
|
if at:
|
||||||
|
buffer.seek(at, os.SEEK_SET)
|
||||||
|
|
||||||
|
file_contents = b''
|
||||||
|
while total_size > 0:
|
||||||
|
magic = buffer.read(2)
|
||||||
|
typ = as_num(buffer.read(1))
|
||||||
|
if typ == 0:
|
||||||
|
len = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
||||||
|
uncompress = buffer.read(len)
|
||||||
|
file_contents += uncompress
|
||||||
|
total_size -= len
|
||||||
|
if typ == 1:
|
||||||
|
clen = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
||||||
|
ulen = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
||||||
|
compressed = buffer.read(clen)
|
||||||
|
file_contents += lzf.decompress(compressed, ulen)
|
||||||
|
total_size -= ulen
|
||||||
|
if file:
|
||||||
|
file.write(file_contents)
|
||||||
|
if recover:
|
||||||
|
buffer.seek(old, os.SEEK_SET)
|
||||||
|
|
||||||
def read_nodes(buffer, current):
|
def read_nodes(buffer, current):
|
||||||
mode = as_num(buffer.read(2))
|
mode = as_num(buffer.read(2))
|
||||||
buffer.read(2)
|
buffer.read(2)
|
||||||
@ -89,23 +130,24 @@ class CromFS(SignatureMatcher):
|
|||||||
is_reg = lambda mode: mode & (8 << 12) != 0
|
is_reg = lambda mode: mode & (8 << 12) != 0
|
||||||
is_link = lambda mode: mode & (10 << 12) != 0
|
is_link = lambda mode: mode & (10 << 12) != 0
|
||||||
|
|
||||||
path = current + '/' + name
|
path = current / name
|
||||||
print(path)
|
|
||||||
|
|
||||||
if is_link(mode):
|
if is_link(mode):
|
||||||
pass
|
pass
|
||||||
if is_dir(mode):
|
if is_dir(mode):
|
||||||
|
extract(path, True)
|
||||||
# traverse the directory children
|
# traverse the directory children
|
||||||
buffer.seek(extra, os.SEEK_SET)
|
buffer.seek(extra, os.SEEK_SET)
|
||||||
read_nodes(buffer, current + '/' + name)
|
read_nodes(buffer, path)
|
||||||
if is_reg(mode):
|
if is_reg(mode) and name != '.' and name != '..':
|
||||||
for block in range(extra):
|
f = extract(path, False)
|
||||||
# decrypt each block
|
read_file_contents(buffer, at=extra, recover=True, total_size=size, file=f)
|
||||||
pass
|
if f:
|
||||||
|
f.close()
|
||||||
|
|
||||||
# traverse its peer
|
# traverse its peer
|
||||||
if peer != 0:
|
if peer != 0:
|
||||||
buffer.seek(peer, os.SEEK_SET)
|
buffer.seek(peer, os.SEEK_SET)
|
||||||
read_nodes(buffer, current)
|
read_nodes(buffer, current)
|
||||||
|
|
||||||
read_nodes(region, '')
|
read_nodes(region, Path(''))
|
||||||
|
17
python/matcher/elf.py
Normal file
17
python/matcher/elf.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import io
|
||||||
|
|
||||||
|
from .matcher import SignatureMatcher, Match
|
||||||
|
|
||||||
|
class ELF(SignatureMatcher):
|
||||||
|
def __init__(self, file):
|
||||||
|
self.name = "ELF"
|
||||||
|
self.signature = b'\x7f\x45\x4c\x46\x02\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
super().__init__(file)
|
||||||
|
|
||||||
|
def is_valid(self):
|
||||||
|
for match in self.search():
|
||||||
|
# walk back for the firmware section header
|
||||||
|
start = match
|
||||||
|
self.matches += [Match(start, 0)]
|
||||||
|
|
||||||
|
return len(self.matches) != 0
|
@ -27,5 +27,5 @@ class SignatureMatcher:
|
|||||||
def is_valid(self):
|
def is_valid(self):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def view(self, match):
|
def view(self, match, root_folder=None):
|
||||||
pass
|
pass
|
||||||
|
38
python/matcher/romfs.py
Normal file
38
python/matcher/romfs.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
import io
|
||||||
|
|
||||||
|
from PyRomfsImage import *
|
||||||
|
|
||||||
|
from .matcher import SignatureMatcher, Match
|
||||||
|
|
||||||
|
class RomFS(SignatureMatcher):
|
||||||
|
"""
|
||||||
|
RomFS a readonly file system
|
||||||
|
big-endian
|
||||||
|
https://www.kernel.org/doc/Documentation/filesystems/romfs.txt
|
||||||
|
"""
|
||||||
|
def __init__(self, file):
|
||||||
|
self.name = "RomFS"
|
||||||
|
self.signature = b'-rom1fs-'
|
||||||
|
super().__init__(file)
|
||||||
|
|
||||||
|
def is_valid(self):
|
||||||
|
for match in self.search():
|
||||||
|
start = match
|
||||||
|
header = io.BytesIO(self.file[start:start+14])
|
||||||
|
magic = header.read(8)
|
||||||
|
fullsize = header.read(4)
|
||||||
|
checksum = header.read(4)
|
||||||
|
fullsize = int.from_bytes(fullsize, 'big')
|
||||||
|
self.matches += [Match(start, fullsize)]
|
||||||
|
|
||||||
|
return len(self.matches) != 0
|
||||||
|
|
||||||
|
def view(self, match, root_folder):
|
||||||
|
start = match.offset
|
||||||
|
length = match.length
|
||||||
|
raw = io.BytesIO(self.file[start:start+length])
|
||||||
|
rom = Romfs(raw)
|
||||||
|
root = rom.getRoot()
|
||||||
|
print(root.name)
|
||||||
|
for ch in root.children:
|
||||||
|
print(ch.name)
|
@ -29,9 +29,11 @@ class Zip(SignatureMatcher):
|
|||||||
file_name_length = header.read(2)
|
file_name_length = header.read(2)
|
||||||
extra_field_length = header.read(2)
|
extra_field_length = header.read(2)
|
||||||
|
|
||||||
file_name_length = int.from_bytes(file_name_length, 'little')
|
as_num = lambda x: int.from_bytes(x, 'little')
|
||||||
extra_field_length = int.from_bytes(extra_field_length, 'little')
|
|
||||||
compressed_size = int.from_bytes(compressed_size, 'little')
|
file_name_length = as_num(file_name_length)
|
||||||
|
extra_field_length = as_num(extra_field_length)
|
||||||
|
compressed_size = as_num(compressed_size)
|
||||||
|
|
||||||
header_size = 4*4 + 2*7
|
header_size = 4*4 + 2*7
|
||||||
data = {
|
data = {
|
||||||
|
Reference in New Issue
Block a user