diff --git a/python/matcher/cromfs.py b/python/matcher/cromfs.py new file mode 100644 index 0000000..fa6d9ad --- /dev/null +++ b/python/matcher/cromfs.py @@ -0,0 +1,111 @@ +import os +import io + +from .matcher import SignatureMatcher, Match + +class CromFS(SignatureMatcher): + """ + CromFS a readonly file system + little-endian + https://github.com/deadsy/nuttx/blob/master/tools/gencromfs.c + this should be more correct? + https://gitea.hedron.io/pixy2040/nuttx/src/branch/master/tools/gencromfs.c + + this CromFS might be a little bit different from the CROMFS here + https://bisqwit.iki.fi/src/cromfs-format.txt + this ^ has the signature CROMFS03 and probably CROMFS02 for v2 + this ^ is big-endian + """ + def __init__(self, file): + self.name = "CromFS" + self.signature = b'CROM' + super().__init__(file) + + def is_valid(self): + for match in self.search(): + start = match + header = io.BytesIO(self.file[start:start+20]) + magic = header.read(4) + nnodes = header.read(2) + nblocks = header.read(2) + root = header.read(4) + fsize = header.read(4) + bsize = header.read(4) + + as_num = lambda x: int.from_bytes(x, 'little') + nnodes = as_num(nnodes) + nblocks = as_num(nblocks) + root = as_num(root) + fsize = as_num(fsize) + bsize = as_num(bsize) + if root != 20: + continue + data = { + 'nnodes': nnodes, + 'nblocks': nblocks, + 'root': root, + 'bsize': bsize, + } + self.matches += [Match(start, fsize, data)] + + return len(self.matches) != 0 + + def view(self, match): + as_num = lambda x: int.from_bytes(x, 'little') + + root = match.data['root'] + bsize = match.data['bsize'] + nblocks = match.data['nblocks'] + nnodes = match.data['nnodes'] + + region = io.BytesIO(self.file[match.offset : match.offset + match.length]) + region.seek(root, os.SEEK_SET) + + def read_str_at(buffer, at=None, recover=False): + if at == 0 or at is None: + return '' + s = b'' + old = buffer.tell() + if at: + buffer.seek(at, os.SEEK_SET) + while True: + c = buffer.read(1) + if c == b'\x00': + break + s += c + if recover: + buffer.seek(old, os.SEEK_SET) + return s.decode() + + def read_nodes(buffer, current): + mode = as_num(buffer.read(2)) + buffer.read(2) + name_offset = as_num(buffer.read(4)) + size = as_num(buffer.read(4)) + peer = as_num(buffer.read(4)) + extra = as_num(buffer.read(4)) + name = read_str_at(buffer, at=name_offset, recover=True) + is_dir = lambda mode: mode & (4 << 12) != 0 + is_reg = lambda mode: mode & (8 << 12) != 0 + is_link = lambda mode: mode & (10 << 12) != 0 + + path = current + '/' + name + print(path) + + if is_link(mode): + pass + if is_dir(mode): + # traverse the directory children + buffer.seek(extra, os.SEEK_SET) + read_nodes(buffer, current + '/' + name) + if is_reg(mode): + for block in range(extra): + # decrypt each block + pass + + # traverse its peer + if peer != 0: + buffer.seek(peer, os.SEEK_SET) + read_nodes(buffer, current) + + read_nodes(region, '')