154 lines
5.0 KiB
Python
154 lines
5.0 KiB
Python
import os
|
|
import io
|
|
|
|
from pathlib import Path
|
|
|
|
import lzf
|
|
|
|
from .matcher import SignatureMatcher, Match
|
|
|
|
class CromFS(SignatureMatcher):
|
|
"""
|
|
CromFS a readonly file system
|
|
little-endian
|
|
https://github.com/deadsy/nuttx/blob/master/tools/gencromfs.c
|
|
this should be more correct?
|
|
https://gitea.hedron.io/pixy2040/nuttx/src/branch/master/tools/gencromfs.c
|
|
|
|
this CromFS might be a little bit different from the CROMFS here
|
|
https://bisqwit.iki.fi/src/cromfs-format.txt
|
|
this ^ has the signature CROMFS03 and probably CROMFS02 for v2
|
|
this ^ is big-endian
|
|
"""
|
|
def __init__(self, file):
|
|
self.name = "CromFS"
|
|
self.signature = b'CROM'
|
|
super().__init__(file)
|
|
|
|
def is_valid(self):
|
|
for match in self.search():
|
|
start = match
|
|
header = io.BytesIO(self.file[start:start+20])
|
|
magic = header.read(4)
|
|
nnodes = header.read(2)
|
|
nblocks = header.read(2)
|
|
root = header.read(4)
|
|
fsize = header.read(4)
|
|
bsize = header.read(4)
|
|
|
|
as_num = lambda x: int.from_bytes(x, 'little')
|
|
nnodes = as_num(nnodes)
|
|
nblocks = as_num(nblocks)
|
|
root = as_num(root)
|
|
fsize = as_num(fsize)
|
|
bsize = as_num(bsize)
|
|
if root != 20:
|
|
continue
|
|
data = {
|
|
'nnodes': nnodes,
|
|
'nblocks': nblocks,
|
|
'root': root,
|
|
'bsize': bsize,
|
|
}
|
|
self.matches += [Match(start, fsize, data)]
|
|
|
|
return len(self.matches) != 0
|
|
|
|
def view(self, match, root_folder):
|
|
def extract(path, is_folder):
|
|
if root_folder is None:
|
|
return
|
|
p = root_folder / ('_' + hex(match.offset))
|
|
p.mkdir(exist_ok=True)
|
|
p = p / path
|
|
if is_folder:
|
|
p.mkdir(exist_ok=True)
|
|
else:
|
|
p.touch(exist_ok=True)
|
|
return open(p, 'wb')
|
|
|
|
as_num = lambda x: int.from_bytes(x, 'little')
|
|
|
|
root = match.data['root']
|
|
bsize = match.data['bsize']
|
|
nblocks = match.data['nblocks']
|
|
nnodes = match.data['nnodes']
|
|
|
|
region = io.BytesIO(self.file[match.offset : match.offset + match.length])
|
|
region.seek(root, os.SEEK_SET)
|
|
|
|
def read_str_at(buffer, at=None, recover=False):
|
|
if at == 0 or at is None:
|
|
return ''
|
|
s = b''
|
|
old = buffer.tell()
|
|
if at:
|
|
buffer.seek(at, os.SEEK_SET)
|
|
while True:
|
|
c = buffer.read(1)
|
|
if c == b'\x00':
|
|
break
|
|
s += c
|
|
if recover:
|
|
buffer.seek(old, os.SEEK_SET)
|
|
return s.decode()
|
|
|
|
def read_file_contents(buffer, at=None, recover=False, file=None, total_size=0):
|
|
old = buffer.tell()
|
|
if at:
|
|
buffer.seek(at, os.SEEK_SET)
|
|
|
|
file_contents = b''
|
|
while total_size > 0:
|
|
magic = buffer.read(2)
|
|
typ = as_num(buffer.read(1))
|
|
if typ == 0:
|
|
len = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
|
uncompress = buffer.read(len)
|
|
file_contents += uncompress
|
|
total_size -= len
|
|
if typ == 1:
|
|
clen = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
|
ulen = as_num(buffer.read(1)) << 8 | as_num(buffer.read(1))
|
|
compressed = buffer.read(clen)
|
|
file_contents += lzf.decompress(compressed, ulen)
|
|
total_size -= ulen
|
|
if file:
|
|
file.write(file_contents)
|
|
if recover:
|
|
buffer.seek(old, os.SEEK_SET)
|
|
|
|
def read_nodes(buffer, current):
|
|
mode = as_num(buffer.read(2))
|
|
buffer.read(2)
|
|
name_offset = as_num(buffer.read(4))
|
|
size = as_num(buffer.read(4))
|
|
peer = as_num(buffer.read(4))
|
|
extra = as_num(buffer.read(4))
|
|
name = read_str_at(buffer, at=name_offset, recover=True)
|
|
is_dir = lambda mode: mode & (4 << 12) != 0
|
|
is_reg = lambda mode: mode & (8 << 12) != 0
|
|
is_link = lambda mode: mode & (10 << 12) != 0
|
|
|
|
path = current / name
|
|
|
|
if is_link(mode):
|
|
pass
|
|
if is_dir(mode):
|
|
extract(path, True)
|
|
# traverse the directory children
|
|
buffer.seek(extra, os.SEEK_SET)
|
|
read_nodes(buffer, path)
|
|
if is_reg(mode) and name != '.' and name != '..':
|
|
f = extract(path, False)
|
|
read_file_contents(buffer, at=extra, recover=True, total_size=size, file=f)
|
|
if f:
|
|
f.close()
|
|
|
|
# traverse its peer
|
|
if peer != 0:
|
|
buffer.seek(peer, os.SEEK_SET)
|
|
read_nodes(buffer, current)
|
|
|
|
read_nodes(region, Path(''))
|