[python] support cromfs extraction
This commit is contained in:
parent
558deed4c8
commit
ad4857317f
@ -2,6 +2,8 @@ import argparse
|
||||
import os
|
||||
import io
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import matcher
|
||||
|
||||
signatures = [
|
||||
@ -28,13 +30,22 @@ def detect(args):
|
||||
print("detected", filetype.name)
|
||||
for m in filetype.matches:
|
||||
print(">", m)
|
||||
filetype.view(m)
|
||||
filetype.view(m, None)
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def extract(args):
|
||||
pass
|
||||
file = Path(args.file)
|
||||
folder = file.parent / (file.name + "_extracted")
|
||||
folder.mkdir(exist_ok=True)
|
||||
|
||||
matches = detect(args)
|
||||
for filetype in matches:
|
||||
print("detected", filetype.name)
|
||||
for m in filetype.matches:
|
||||
print(">", m)
|
||||
filetype.view(m, folder)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Program for detecting or extracting data.')
|
||||
@ -68,19 +79,8 @@ def main():
|
||||
|
||||
if args.command == 'detect':
|
||||
detect(args)
|
||||
# if args.isa:
|
||||
# # Perform ISA detection on the file
|
||||
# print('Performing ISA detection on:', args.file)
|
||||
# else:
|
||||
# parser.print_help()
|
||||
elif args.command == 'extract':
|
||||
extract(args)
|
||||
# if args.dry:
|
||||
# # Perform a dry run without extracting
|
||||
# print('Dry run extraction from:', args.file)
|
||||
# else:
|
||||
# # Extract data from the file
|
||||
# print('Extracting data from:', args.file)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
@ -1,6 +1,10 @@
|
||||
import os
|
||||
import io
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import lzf
|
||||
|
||||
from .matcher import SignatureMatcher, Match
|
||||
|
||||
class CromFS(SignatureMatcher):
|
||||
@ -50,7 +54,19 @@ class CromFS(SignatureMatcher):
|
||||
|
||||
return len(self.matches) != 0
|
||||
|
||||
def view(self, match):
|
||||
def view(self, match, root_folder):
|
||||
def extract(path, is_folder):
|
||||
if root_folder is None:
|
||||
return
|
||||
p = root_folder / ('_' + hex(match.offset))
|
||||
p.mkdir(exist_ok=True)
|
||||
p = p / path
|
||||
if is_folder:
|
||||
p.mkdir(exist_ok=True)
|
||||
else:
|
||||
p.touch(exist_ok=True)
|
||||
return open(p, 'wb')
|
||||
|
||||
as_num = lambda x: int.from_bytes(x, 'little')
|
||||
|
||||
root = match.data['root']
|
||||
@ -77,6 +93,31 @@ class CromFS(SignatureMatcher):
|
||||
buffer.seek(old, os.SEEK_SET)
|
||||
return s.decode()
|
||||
|
||||
def read_file_contents(buffer, at=None, recover=False, file=None, total_size=0):
|
||||
old = buffer.tell()
|
||||
if at:
|
||||
buffer.seek(at, os.SEEK_SET)
|
||||
|
||||
file_contents = b''
|
||||
while total_size > 0:
|
||||
magic = buffer.read(2)
|
||||
typ = as_num(buffer.read(1))
|
||||
if typ == 0:
|
||||
len = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
||||
uncompress = buffer.read(len)
|
||||
file_contents += uncompress
|
||||
total_size -= len
|
||||
if typ == 1:
|
||||
clen = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
||||
ulen = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
||||
compressed = buffer.read(clen)
|
||||
file_contents += lzf.decompress(compressed, ulen)
|
||||
total_size -= ulen
|
||||
if file:
|
||||
file.write(file_contents)
|
||||
if recover:
|
||||
buffer.seek(old, os.SEEK_SET)
|
||||
|
||||
def read_nodes(buffer, current):
|
||||
mode = as_num(buffer.read(2))
|
||||
buffer.read(2)
|
||||
@ -89,23 +130,24 @@ class CromFS(SignatureMatcher):
|
||||
is_reg = lambda mode: mode & (8 << 12) != 0
|
||||
is_link = lambda mode: mode & (10 << 12) != 0
|
||||
|
||||
path = current + '/' + name
|
||||
print(path)
|
||||
path = current / name
|
||||
|
||||
if is_link(mode):
|
||||
pass
|
||||
if is_dir(mode):
|
||||
extract(path, True)
|
||||
# traverse the directory children
|
||||
buffer.seek(extra, os.SEEK_SET)
|
||||
read_nodes(buffer, current + '/' + name)
|
||||
if is_reg(mode):
|
||||
for block in range(extra):
|
||||
# decrypt each block
|
||||
pass
|
||||
read_nodes(buffer, path)
|
||||
if is_reg(mode) and name != '.' and name != '..':
|
||||
f = extract(path, False)
|
||||
read_file_contents(buffer, at=extra, recover=True, total_size=size, file=f)
|
||||
if f:
|
||||
f.close()
|
||||
|
||||
# traverse its peer
|
||||
if peer != 0:
|
||||
buffer.seek(peer, os.SEEK_SET)
|
||||
read_nodes(buffer, current)
|
||||
|
||||
read_nodes(region, '')
|
||||
read_nodes(region, Path(''))
|
||||
|
@ -27,5 +27,5 @@ class SignatureMatcher:
|
||||
def is_valid(self):
|
||||
return False
|
||||
|
||||
def view(self, match):
|
||||
def view(self, match, root_folder=None):
|
||||
pass
|
||||
|
@ -27,8 +27,7 @@ class RomFS(SignatureMatcher):
|
||||
|
||||
return len(self.matches) != 0
|
||||
|
||||
def view(self, match):
|
||||
|
||||
def view(self, match, root_folder):
|
||||
start = match.offset
|
||||
length = match.length
|
||||
raw = io.BytesIO(self.file[start:start+length])
|
||||
|
Loading…
Reference in New Issue
Block a user