diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..04032f7 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "nokia-dumper"] + path = nokia-dumper + url = https://github.com/stze/Home-Brew_Tool-Nokia.git +[submodule "splituapp"] + path = splituapp + url = https://github.com/stze/splituapp.git +[submodule "sinextract"] + path = sinextract + url = https://github.com/stze/anyxperia_dumper.git diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..00f20ee --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM ubuntu:20.04 +ENV DEBIAN_FRONTEND=noninteractive +COPY . /extractor/ +RUN apt-get update +#Install tzdata in non-interactive mode, otherwise it asks for timezones. +RUN apt-get install -y --no-install-recommends tzdata +RUN apt-get install -y python3 python3-pip swig +RUN apt-get install -y git android-sdk-libsparse-utils liblz4-tool brotli unrar +RUN apt-get install -y zip rsync +RUN cd /extractor && pip3 install -r requirements.txt +ENTRYPOINT ["/extractor/extractor.py"] diff --git a/README.md b/README.md index 67d0aab..2cd2fd0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,67 @@ -# extractor -Extractor: The Android firmware image extraction tool +# Extractor · [![GitHub license](https://img.shields.io/badge/license-Apache%202.0-blue)](#LICENSE) + +

+ +

+ +Extractor is a powerful Android firmware image extraction utility + +# Installation +To run Extractor on your computer some preparation steps are required. Since Extractor is a python tool, a working python environment is required. Extractor depends on some git submodules, all of which can be initialized like so + +```bash +# Initialize git submodules +./scripts/init.sh +``` + +If you wish to run Extractor without installing the necesarry requirements yourself, you may run it using docker. + +## Debian-based (Debian, Ubuntu) + +Currently supports Debian 10 and Ubuntu 20.04. Use a terminal shell to execute the following commands: + +```bash +sudo apt update +# Install dependencies +sudo apt install -y git android-sdk-libsparse-utils liblz4-tool brotli unrar +``` + +We recommend using a python virtualenv for installing Extractors python dependencies: + +```bash +# Create virtualenv in venv directory +python3 -m venv venv +# Activate virtualenv +source venv/bin/activate +``` + +Now, install the python dependencies: + +```bash +pip3 install -r requirements.txt +``` +# Usage + +You can run Extractor on your machine by running: + +```bash +sudo ./extractor.py --system-dir-output +``` + +This will extract a firmware image into a specified output directory. Extractor also supports saving the output in a tar archive: + +```bash +sudo ./extractor.py --tar-output +``` + +Note: root privileges are required due to temporarily active loopback mount operations + +## Docker + +```bash +./extract-docker.py --in-file --out-dir +``` + +## License + +Extractor is [Apache 2.0 licensed](LICENSE). \ No newline at end of file diff --git a/construct_typing.py b/construct_typing.py new file mode 100644 index 0000000..bbac4f2 --- /dev/null +++ b/construct_typing.py @@ -0,0 +1,104 @@ +# This file is part of Extractor. + +# Copyright (C) 2021 Security Research Labs GmbH +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import TypeVar, Type +from io import BytesIO +from construct import Struct, Construct # type: ignore + + +# TypeVar is required so that parse returns the right type (of the sub-class). +# https://stackoverflow.com/a/46064289 +# noinspection PyTypeChecker +T = TypeVar('T', bound='TypedContainer') + + +class TypedContainer: + """ + Base class for a typed struct for use with construct. Usage instructions: + * Make your own class with TypedContainer as superclass + * Define instance fields with typing (e.g. bytes or int) + * Set the class variable construct_struct to the actual construct Struct() definition + """ + construct_struct: Struct + + @classmethod + def parse(cls: Type[T], buf: bytes) -> T: + """ + Parses a buffer + :param buf: + :return: + """ + return cls.parse_stream(BytesIO(buf)) + + @classmethod + def parse_stream(cls: Type[T], stream): + self = cls() + construct_container = cls.construct_struct.parse_stream(stream) + for k, v in dict(construct_container).items(): + self.__setattr__(k, v) + return self + + @classmethod + def sizeof(cls): + return cls.construct_struct.sizeof() + + def build(self) -> bytes: + return self.__class__.construct_struct.build(self.__dict__) + + def __str__(self): + string_list = [self.__class__.__name__] + for k, v in sorted(self.__dict__.items()): + string_list.append(" %s = %r" % (k, v)) + return "\n".join(string_list) + + def __repr__(self): + field_params = [] + # Use correct order + for field in self.construct_struct.subcons: + field_params.append("%s=%r" % (field.name, self.__getattribute__(field.name))) + return "%s(%s)" % (self.__class__.__name__, ", ".join(field_params)) + + def __eq__(self, other: T): + if type(self) is not type(other): + return False + for field in self.construct_struct.subcons: + if self.__getattribute__(field.name) != other.__getattribute__(field.name): + return False + return True + + @classmethod + def as_inner_type(cls): + return InnerTypedContainer(cls) + + +class InnerTypedContainer(Construct): + inner_type: T + + def __init__(self, inner_type): + super().__init__() + self.inner_type = inner_type + + def _parse(self, stream, context, path): + return self.inner_type.parse_stream(stream) + + def _build(self, obj, stream, context, path): + buf = obj.build() + stream.write(buf) + + def _sizeof(self, context, path): + return self.inner_type.sizeof() diff --git a/docs/media/ext.png b/docs/media/ext.png new file mode 100644 index 0000000..e0d83c4 Binary files /dev/null and b/docs/media/ext.png differ diff --git a/erofs_tool.py b/erofs_tool.py new file mode 100755 index 0000000..a798569 --- /dev/null +++ b/erofs_tool.py @@ -0,0 +1,625 @@ +#!/usr/bin/env python3 + +# This file is part of Extractor. + +# Copyright (C) 2021 Security Research Labs GmbH +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import argparse +import mmap +import os +from construct import Struct, Int32ul, Int16ul, Int8ul, Int64ul, Array, Union +from enum import Enum +from typing import List, Set +import subprocess +from io import BytesIO +import math +import sys +from stat import S_IFLNK, S_IFDIR, S_IFREG, S_IFMT + + +# Parser for Huawei EROFS filesystem, used on some new models. +# Supported by Linux Kernel 4.19 and later +# drivers/staging/erofs +# Filesystem generation tool at https://git.kernel.org/pub/scm/linux/kernel/git/xiang/erofs-utils.git/ + +def main(): + parser = argparse.ArgumentParser(description='EROFS filesystem extractor') + sp = parser.add_subparsers() + p = sp.add_parser("debug", help="Run debug code") + p.set_defaults(target=command_debug) + p = sp.add_parser("check", help="Check a given filesystem") + p.add_argument("fn", help="EROFS image file") + p.set_defaults(target=command_check) + p = sp.add_parser("file", help="Check a given filesystem") + p.add_argument("fn", help="EROFS image file") + p.add_argument("path", help="Path within erofs") + p.add_argument("--verify", help="Path to verify file") + p.add_argument("--extract", help="Path to save extracted file") + p.set_defaults(target=command_file) + p = sp.add_parser("extract", help="Extract erofs to directory") + p.add_argument("erofs_image", help="Path to erofs image") + p.add_argument("output_dir", help="Output directory") + p.add_argument("--verify-zip", action="store_true", help="Run test on all zip/apk/jar files to ensure that extraction works correctly") + p.set_defaults(target=command_extract) + args = parser.parse_args() + if hasattr(args, "target"): + args.target(args) + else: + parser.print_help() + + +def command_debug(_args): + pass + + +def command_extract(args): + try: + os.mkdir(args.output_dir) + except FileExistsError: + assert os.path.isdir(args.output_dir), "Output %r is not a directory" % args.output_dir + assert len(os.listdir(args.output_dir)) == 0, "Output directory %r is not empty: %r" % (args.output_dir, os.listdir(args.output_dir)) + erofs = Erofs(args.erofs_image) + erofs.root_inode.extract(args.output_dir.encode(), verify_zip=args.verify_zip) + + +def command_check(args): + erofs = Erofs(args.fn) + erofs.root_inode.traverse() + + +def command_file(args): + erofs = Erofs(args.fn) + file_inode = erofs.get_file(args.path.encode()) + data = file_inode.get_data(debug=True) + if args.verify is not None: + verify_buf = open(args.verify, 'rb').read() + assert len(data) == len(verify_buf), "Verify length mismatch: %r <=> %r" % (len(data), len(verify_buf)) + for i in range(len(data)): + assert data[i] == verify_buf[i], "Mismatch at 0x%x: %r <=> %r" % (i, data[i], verify_buf[i]) + print("File verified OK") + if args.extract is not None: + with open(args.extract, 'wb') as f: + f.write(data) + + +# noinspection PyUnresolvedReferences +struct_erofs_super = Struct( + "magic" / Int32ul, + "checksum" / Int32ul, + "features" / Int32ul, + "blkszbits" / Int8ul, + "reserved" / Int8ul, + "root_nid" / Int16ul, + "inos" / Int64ul, + "build_time" / Int64ul, + "build_time_nsec" / Int32ul, + "blocks" / Int32ul, + "meta_blkaddr" / Int32ul, + "xattr_blkaddr" / Int32ul, + "uuid" / Array(16, Int8ul), + "volume_name" / Array(16, Int8ul), + "reserved2" / Array(48, Int8ul) +) +assert struct_erofs_super.sizeof() == 128, struct_erofs_super.sizeof() + + +class DataMappingMode(Enum): + EROFS_INODE_FLAT_PLAIN = 0 + EROFS_INODE_FLAT_COMPRESSION_LEGACY = 1 + EROFS_INODE_FLAT_INLINE = 2 + EROFS_INODE_FLAT_COMPRESSION = 3 + EROFS_INODE_LAYOUT_MAX = 4 + + +# noinspection PyUnresolvedReferences +struct_erofs_inode_v1 = Struct( + "i_advise" / Int16ul, + "i_xattr_icount" / Int16ul, + "i_mode" / Int16ul, + "i_nlink" / Int16ul, + "i_size" / Int32ul, + "i_reserved" / Int32ul, + "i_u" / Int32ul, + "i_ino" / Int32ul, + "i_uid" / Int16ul, + "i_gid" / Int16ul, + "checksum" / Int32ul, +) +assert struct_erofs_inode_v1.sizeof() == 32, struct_erofs_inode_v1.sizeof() + + +class FileType(Enum): + EROFS_FT_UNKNOWN = 0 + EROFS_FT_REG_FILE = 1 + EROFS_FT_DIR = 2 + EROFS_FT_CHRDEV = 3 + EROFS_FT_BLKDEV = 4 + EROFS_FT_FIFO = 5 + EROFS_FT_SOCK = 6 + EROFS_FT_SYMLINK = 7 + EROFS_FT_MAX = 8 + + +# noinspection PyUnresolvedReferences +struct_erofs_dirent = Struct( + "nid" / Int64ul, + "nameoff" / Int16ul, + "file_type" / Int8ul, + "reserved" / Int8ul +) +assert struct_erofs_dirent.sizeof() == 12, struct_erofs_dirent.sizeof() + + +class DecompressIndexType(Enum): + Z_EROFS_VLE_CLUSTER_TYPE_PLAIN = 0 + Z_EROFS_VLE_CLUSTER_TYPE_HEAD = 1 + Z_EROFS_VLE_CLUSTER_TYPE_NONHEAD = 2 + Z_EROFS_VLE_CLUSTER_TYPE_RESERVED = 3 + + +# noinspection PyUnresolvedReferences +struct_z_erofs_vle_decompressed_index = Struct( + "di_advise" / Int16ul, + "di_clusterofs" / Int16ul, + "di_u" / Union(0, + "blkaddr" / Int32ul, + "delta" / Struct("delta0" / Int16ul, "delta1" / Int16ul) + ) +) +assert struct_z_erofs_vle_decompressed_index.sizeof() == 8 + + +# noinspection PyUnresolvedReferences +struct_z_erofs_map_header = Struct( + "h_reserved1" / Int32ul, + "h_advise" / Int16ul, + "h_algorithmtype" / Int8ul, + "h_clusterbits" / Int8ul +) + + +class Erofs: + def __init__(self, fn: str): + self.fn = fn + self.file_handle = open(fn, 'rb') + self.file_size = os.fstat(self.file_handle.fileno()).st_size + self.mmap = mmap.mmap(self.file_handle.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ) + self.super = struct_erofs_super.parse(self.mmap[0x400:0x400+struct_erofs_super.sizeof()]) + print("0x%08x-0x%08x: SUPER" % (0x400, 0x400 + struct_erofs_super.sizeof())) + assert self.super.magic == 0xe0f5e1e2, "0x%x" % self.super.magic + assert self.super.blkszbits == 12 + print("root_nid=%r" % self.super.root_nid) + # print("super:\n%s" % self.super) + self.root_inode = self.get_inode(self.super.root_nid, FileType.EROFS_FT_DIR) + print("0x%08x-0x%08x: ROOT Inode" % (self.root_inode.inode_off, self.root_inode.inode_off + struct_erofs_inode_v1.sizeof())) + # print("root:\n%s" % self.root_inode) + # self.root_inode.traverse() + + def get_inode(self, nid: int, file_type: FileType): + if file_type == FileType.EROFS_FT_DIR: + return DirInode(self, nid) + elif file_type == FileType.EROFS_FT_SYMLINK: + return SymlinkInode(self, nid) + elif file_type == FileType.EROFS_FT_REG_FILE: + return RegFileInode(self, nid) + else: + raise ValueError("inode type %r not supported" % file_type) + + def get_inode_header(self, nid) -> struct_erofs_inode_v1: + inode_off = self.super.meta_blkaddr * 4096 + 32 * nid + if inode_off + struct_erofs_inode_v1.sizeof() > self.file_size: + raise ValueError("Inode nid 0x016%x out of range" % nid) + inode_buf = self.mmap[inode_off:inode_off + struct_erofs_inode_v1.sizeof()] + return struct_erofs_inode_v1.parse(inode_buf) + + def get_file(self, path: bytes) -> "Inode": + path = path.split(b'/') + path = [x for x in path if x != b''] + inode: DirInode = self.root_inode + for i in range(len(path)): + path_elem = path[i] + ok = False + for dirent in inode.get_dirents(): + if dirent.filename == path_elem: + if i == len(path) - 1: + return self.get_inode(dirent.nid, dirent.file_type) + else: + next_inode = self.get_inode(dirent.nid, dirent.file_type) + if isinstance(inode, DirInode): + inode = next_inode + ok = True + else: + raise ValueError("Inode at %r is of type %r instead of DirInode" % (path[0:i], type(inode))) + if not ok: + raise FileNotFoundError("Failed to find %r in %r" % (path[i], path[0:i])) + assert False, path + + +class Inode: + def __init__(self, erofs: Erofs, nid: int): + self.erofs = erofs + self.nid: int = nid + self.inode_off = erofs.super.meta_blkaddr * 4096 + 32 * nid + inode_buf = erofs.mmap[self.inode_off:self.inode_off + struct_erofs_inode_v1.sizeof()] + self.inode_header = struct_erofs_inode_v1.parse(inode_buf) + self.xattr_start_off = self.inode_off + struct_erofs_inode_v1.sizeof() + if self.inode_header.i_xattr_icount > 0: + self.xattr_size = 12 + (self.inode_header.i_xattr_icount - 1) * 4 + else: + self.xattr_size = 0 + self.data_mapping_mode = DataMappingMode(self.inode_header.i_advise >> 1) + assert self.inode_header.i_advise & 0x01 == 0 + + def get_data(self, debug=False) -> bytes: + if debug: + print("Inode(nid=%r).get_data(): data_mapping_mode=%s" % (self.nid, self.data_mapping_mode.name)) + print("0x%08x-0x%08x: get_data Inode" % (self.inode_off, self.inode_off + struct_erofs_inode_v1.sizeof())) + print(self.inode_header) + if self.data_mapping_mode == DataMappingMode.EROFS_INODE_FLAT_INLINE: + # Last block of file is directly following the inode/xattr data + # Previous blocks are following this last block + last_block_data_off = self.xattr_start_off + self.xattr_size + last_block_data_size = 4096 - (last_block_data_off % 4096) + if last_block_data_size == 4096: + raise NotImplementedError("TODO: Check manually if there is a last block following the inode or not") + last_block_data = self.erofs.mmap[last_block_data_off: last_block_data_off + last_block_data_size] + if self.inode_header.i_size <= last_block_data_size: + return last_block_data[0:self.inode_header.i_size] + else: + # initial_blocks_data_off = last_block_data_off + last_block_data_size + # assert initial_blocks_data_off % 4096 == 0 + initial_blocks_data_off = self.inode_header.i_u * 4096 + initial_blocks_data_size = 4096 * math.ceil((self.inode_header.i_size - last_block_data_size) / 4096) + initial_blocks_data = self.erofs.mmap[initial_blocks_data_off:initial_blocks_data_off + initial_blocks_data_size] + assert len(initial_blocks_data) + len(last_block_data) >= self.inode_header.i_size + assert len(initial_blocks_data) + len(last_block_data) - self.inode_header.i_size < 4096 + return (initial_blocks_data + last_block_data)[0:self.inode_header.i_size] + elif self.data_mapping_mode == DataMappingMode.EROFS_INODE_FLAT_COMPRESSION_LEGACY: + # print("HEADER: %s\n" % self.inode_header) + # i_u is number of compressed blocks for EROFS_INODE_LAYOUT_COMPRESSION + num_compressed_blocks = self.inode_header.i_u + if num_compressed_blocks > 30e3: + raise ValueError("Too may compressed blocks (self.inode_header.i_u=%r" % self.inode_header.i_u) + decompress_index_header_pos = self.xattr_start_off + self.xattr_size + # See Z_EROFS_VLE_LEGACY_INDEX_ALIGN(size) + # round_up to a multiple of 8 bytes + if decompress_index_header_pos % 8 == 4: + decompress_index_header_pos += 4 + assert decompress_index_header_pos % 8 == 0 + decompress_index_header_pos += struct_z_erofs_map_header.sizeof() + decompress_index_header_pos += 8 # Z_EROFS_VLE_LEGACY_HEADER_PADDING + assert decompress_index_header_pos % 8 == 0 + # assert decompress_index_header_pos == self.xattr_start_off + self.xattr_size + 20 + # assert False + prev_clusterofs = 0 + num_decompressed_blocks = math.ceil(self.inode_header.i_size / 4096) + with BytesIO() as out: + prev_blkaddr = 0 + prev_reserved_blkaddr = 0 + for di_number in range(num_decompressed_blocks): + buf = self.erofs.mmap[decompress_index_header_pos + struct_z_erofs_vle_decompressed_index.sizeof() * di_number: decompress_index_header_pos + struct_z_erofs_vle_decompressed_index.sizeof() * (di_number + 1)] + # print(" %s" % codecs.encode(buf, 'hex').decode()) + di = struct_z_erofs_vle_decompressed_index.parse(buf) + if debug: + print("DI %d/%d: adv=0x%04x %r" % (di_number, num_decompressed_blocks, di.di_advise, di)) + print(" OFF %r" % ((2**16 + di.di_clusterofs - prev_clusterofs) % 2**16)) + prev_clusterofs = di.di_clusterofs + Z_EROFS_VLE_DI_CLUSTER_TYPE_BIT = 0 + Z_EROFS_VLE_DI_CLUSTER_TYPE_BITS = 2 + # See vle_legacy_load_cluster_from_disk() in drivers/staging/erofs/zmap.c + type_int = (di.di_advise >> Z_EROFS_VLE_DI_CLUSTER_TYPE_BIT) & ((1 << Z_EROFS_VLE_DI_CLUSTER_TYPE_BITS) - 1) + decompress_index_type = DecompressIndexType(type_int) + # print("DI %r: %r" % (di_number, decompress_index_type)) + # print("OFFSET CHECK: %r <=> %r" % (out.tell() % 4096, di.di_clusterofs)) + if decompress_index_type == DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_RESERVED: + if di.di_u.blkaddr == prev_blkaddr: + decompress_index_type = DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_NONHEAD + else: + decompress_index_type = DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_HEAD + prev_blkaddr = di.di_u.blkaddr + if decompress_index_type == DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_PLAIN: + out.seek(di_number * 4096 + di.di_clusterofs) + assert out.tell() == di_number * 4096 + di.di_clusterofs + blkaddr = di.di_u.blkaddr + buf = self.erofs.mmap[4096 * blkaddr: 4096 * (blkaddr + 1)] + if self.inode_header.i_size < out.tell() + len(buf): + buf = buf[0:self.inode_header.i_size - out.tell()] + out.write(buf) + elif decompress_index_type == DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_HEAD: + if out.tell() % 4096 != di.di_clusterofs: + if di.di_clusterofs == 0: + out.seek(out.tell() - (out.tell() % 4096)) + else: + raise ValueError("Cluster offset check failed: %r <=> %r" % (out.tell() % 4096, di.di_clusterofs)) + # assert out.tell() % 4096 == di.di_clusterofs, "Cluster offset check failed: %r <=> %r" % (out.tell() % 4096, di.di_clusterofs) + blkaddr = di.di_u.blkaddr + compressed_buf = self.erofs.mmap[4096 * blkaddr: 4096 * (blkaddr + 1)] + # hd(compressed_buf) + # decompressed_buf = pp_decompress_lz4(compressed_buf, maxlen=self.inode_header.i_size - out.tell(), expected=open("/usr/bin/lxc", "rb").read()[out.tell():]) + decompressed_buf = pp_decompress_lz4(compressed_buf, maxlen=self.inode_header.i_size - out.tell()) + out.write(decompressed_buf) + elif decompress_index_type == DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_NONHEAD: + pass + elif decompress_index_type == DecompressIndexType.Z_EROFS_VLE_CLUSTER_TYPE_RESERVED: + blkaddr = di.di_u.blkaddr + if blkaddr == prev_reserved_blkaddr: + continue + else: + prev_reserved_blkaddr = blkaddr + compressed_buf = self.erofs.mmap[4096 * blkaddr: 4096 * (blkaddr + 1)] + # hd(compressed_buf) + decompressed_buf = pp_decompress_lz4(compressed_buf, maxlen=self.inode_header.i_size - out.tell()) + print("len(decompressed_buf)=%r decompressed_buf[0:50] = %r" % (len(decompressed_buf), decompressed_buf[0:50])) + out.write(decompressed_buf) + else: + raise ValueError("Unexpected decompress_index_type %r" % decompress_index_type) + if self.inode_header.checksum != 0: + raise NotImplementedError("Checksum verification not yet implemented") + if out.tell() == self.inode_header.i_size: + return out.getvalue() + elif out.tell() > self.inode_header.i_size: + return out.getvalue()[0:self.inode_header.i_size] + else: + raise ValueError("Bad file size %r (expected: %r)" % (out.tell(), self.inode_header.i_size)) + elif self.data_mapping_mode == DataMappingMode.EROFS_INODE_FLAT_PLAIN: + # print("HEADER: %s\n" % self.inode_header) + last_block_data_off = self.inode_header.i_u * 4096 + data_size = self.inode_header.i_size + data = self.erofs.mmap[last_block_data_off:last_block_data_off+data_size] + # assert False + return data + elif self.data_mapping_mode == DataMappingMode.EROFS_INODE_FLAT_COMPRESSION: + raise NotImplementedError("TODO: Implement EROFS_INODE_FLAT_COMPRESSION") + else: + raise ValueError("Don't know how to get data for data_mapping_mode=%r" % self.data_mapping_mode) + + def get_data_dir(self, debug=False) -> bytes: + """ + Gets the directory data (struct erofs_dirent + filename buffer). + Separate function required since EROFS_INODE_FLAT_INLINE behaves differently for directories + and regular files + :param debug: + :return: + """ + if debug: + print("Inode(nid=%r).get_data(): data_mapping_mode=%s" % (self.nid, self.data_mapping_mode.name)) + print("0x%08x-0x%08x: get_data Inode" % (self.inode_off, self.inode_off + struct_erofs_inode_v1.sizeof())) + print(self.inode_header) + if self.data_mapping_mode == DataMappingMode.EROFS_INODE_FLAT_INLINE: + # For directories with EROFS_INODE_FLAT_INLINE, the full data is (sequentially) following the inode header/xattr. + data_off = self.xattr_start_off + self.xattr_size + data_size = self.inode_header.i_size + return self.erofs.mmap[data_off: data_off + data_size] + else: + # Other mdoes are equal for directories and file data + return self.get_data() + + +class DirEnt: + def __init__(self, filename: bytes, file_type: FileType, nid: int): + self.filename: bytes = filename + self.file_type: FileType = file_type + self.nid: int = nid + + def __repr__(self): + return "DirEnt(%r, %r, %r)" % (self.filename, self.file_type, self.nid) + + +class DirInode(Inode): + def __init__(self, erofs: Erofs, nid: int): + super(DirInode, self).__init__(erofs, nid) + if S_IFMT(self.inode_header.i_mode) != S_IFDIR: + raise ValueError("DirInode at nid=0x%16x is not of type S_IFDIR, self.inode_header.i_mode=0x%08x" % (nid, self.inode_header.i_mode)) + # print("self.inode_off=0x%x" % self.inode_off) + # print(self.inode_header) + data = self.get_data_dir() + self.dirents: List[DirEnt] = [] + if len(data) == 0: + return + # hd(data) + dirent0 = struct_erofs_dirent.parse(data[0:12]) + # print(dirent0) + # return + assert dirent0.nameoff % 12 == 0 + num_dirents = int(dirent0.nameoff / 12) + struct_dirents: List[struct_erofs_dirent] = [] + for i in range(num_dirents): + struct_dirents.append(struct_erofs_dirent.parse(data[12*i:12*i+12])) + self.dirents = [] + filenames_done: Set[bytes] = set() + for i in range(num_dirents): + struct_dirent = struct_dirents[i] + name_end = len(data) + if i < num_dirents - 1: + name_end = struct_dirents[i+1].nameoff + filename = data[struct_dirent.nameoff:name_end] + filename = filename.split(b'\0', 1)[0] + if filename == b'': + raise ValueError("Empty filename") + if filename in filenames_done: + raise ValueError("Duplicate filename %r" % filename) + # print("FILE %r: %r" % (filename, struct_dirent)) + assert len(filename) < 255, "Filename too long(%d bytes): %r..." % (len(filename), filename[0:50]) + if struct_dirent.file_type >= FileType.EROFS_FT_MAX.value: + raise ValueError("Bad struct_dirent.file_type %r" % struct_dirent.file_type) + file_type = FileType(struct_dirent.file_type) + dirent = DirEnt(filename, file_type, struct_dirent.nid) + self.dirents.append(dirent) + # print("%r" % dirent) + + def get_dirents(self) -> List[DirEnt]: + return self.dirents + + def traverse(self, prefix=b"/"): + for dirent in self.dirents: + print("TRAVERSE: %r => %r" % (prefix, dirent.filename)) + child_inode = self.erofs.get_inode(dirent.nid, dirent.file_type) + if dirent.file_type == FileType.EROFS_FT_SYMLINK: + print("%s%s: %r => %r" % (prefix.decode(errors="ignore"), dirent.filename.decode(errors="ignore"), dirent, child_inode.get_symlink_dest())) + elif dirent.file_type == FileType.EROFS_FT_REG_FILE: + print("%s%s: %r" % (prefix.decode(errors="ignore"), dirent.filename.decode(errors="ignore"), dirent)) + elif dirent.file_type == FileType.EROFS_FT_DIR: + # Some versions of mkfs.erofs add entries for "." and ".." + if dirent.filename in (b'.', b'..'): + continue + print("%s%s: %r" % (prefix.decode(errors="ignore"), dirent.filename.decode(errors="ignore"), dirent)) + child_inode.traverse(prefix + dirent.filename + b'/') + + def extract(self, output_dir: bytes, verify_zip: bool = False): + """ + Extracts this directory to output_dir. + :param output_dir: + Must already exist (as an empty directory) + :param verify_zip: + Verify all zip/jar/apk files in output (using "unzip -tqq") to detect potential extraction errors + :return: + """ + for dirent in self.dirents: + out_path = os.path.join(output_dir, dirent.filename) + print("Extracting %r" % out_path.decode()) + if os.path.exists(out_path): + raise ValueError("Duplicate file %r" % out_path) + child_inode = self.erofs.get_inode(dirent.nid, dirent.file_type) + if dirent.file_type == FileType.EROFS_FT_SYMLINK: + os.symlink(child_inode.get_symlink_dest(), out_path) + elif dirent.file_type == FileType.EROFS_FT_DIR: + # Some versions of mkfs.erofs add entries for "." and ".." + if dirent.filename in (b'.', b'..'): + continue + os.mkdir(out_path) + # Always make directories mode 755 + os.chmod(out_path, 0o755) + child_inode.extract(out_path, verify_zip=verify_zip) + elif dirent.file_type == FileType.EROFS_FT_REG_FILE: + with open(out_path, 'wb') as f: + f.write(child_inode.get_data()) + # use original mode & 0o755 => Ignore setuid/setgid bit + mode = child_inode.inode_header.i_mode & 0o777 + # Ensure files are always readable + mode |= 0o444 + os.chmod(out_path, mode) + if verify_zip: + ext = out_path.split(b'.')[-1].lower() + if ext in (b'zip', b'jar', b'apk'): + print("Verifying %r" % out_path) + subprocess.check_call(["unzip", "-tqq", out_path]) + else: + raise NotImplementedError("file_type %r not implemented" % dirent.file_type) + + +class SymlinkInode(Inode): + def __init__(self, erofs: Erofs, nid: int): + super(SymlinkInode, self).__init__(erofs, nid) + if S_IFMT(self.inode_header.i_mode) != S_IFLNK: + raise ValueError("SymlinkInode at nid=0x%16x is not of type S_IFLNK, self.inode_header.i_mode=0x%08x" % (nid, self.inode_header.i_mode)) + self.symlink_dest = self.get_data() + + def get_symlink_dest(self): + return self.symlink_dest + + +class RegFileInode(Inode): + def __init__(self, erofs: Erofs, nid: int): + super(RegFileInode, self).__init__(erofs, nid) + if S_IFMT(self.inode_header.i_mode) != S_IFREG: + raise ValueError("RegFileInode at nid=0x%16x is not of type S_IFREG, self.inode_header.i_mode=0x%08x" % (nid, self.inode_header.i_mode)) + + +def hd(buf: bytes): + sys.stdout.flush() + p = subprocess.Popen(["hd"], stdin=subprocess.PIPE) + p.stdin.write(buf) + p.stdin.close() + p.wait() + + +def pp_decompress_lz4(buf: bytes, maxlen: int = None, expected: bytes = None) -> bytes: + """ + https://github.com/lz4/lz4/blob/master/doc/lz4_Block_format.md + :param buf: Compressed buffer, raw LZ4 without framing or length header + :param maxlen: Maximum length to extract, will return buffer after extracting that amount of bytes + :param expected: Optional known decompressed value to debug extraction errors + :return: + """ + with BytesIO() as out: + pos = 0 + while pos < len(buf): + token_byte = buf[pos] + # print("Token 0x%02x at 0x%x" % (token_byte, pos)) + pos += 1 + # Get length of literal from input + literal_length = token_byte >> 4 + if literal_length == 0xf: + length_byte = buf[pos] + pos += 1 + literal_length += length_byte + while length_byte == 0xff: + length_byte = buf[pos] + pos += 1 + literal_length += length_byte + literal_buf = buf[pos: pos + literal_length] + pos += literal_length + if expected is not None: + for i in range(len(literal_buf)): + assert literal_buf[i] == expected[out.tell() + i], "Mismatch at position 0x%x: %r <=> %r" % (out.tell() + i, literal_buf[i], expected[out.tell() + i]) + out.write(literal_buf) + if maxlen is not None and out.tell() >= maxlen: + return out.getvalue()[0:maxlen] + if pos == len(buf) or pos == len(buf) - 1: + # Reached end of input after literal => OK + break + # print("OFFSET POS: 0x%x" % pos) + # Get offset for copy operation + offset = buf[pos] + 256 * buf[pos + 1] + pos += 2 + if offset == 0: + continue + # raise ValueError("Offset cannot be 0") + # Get matchlength for copy operation + matchlength = token_byte & 0x0f + if matchlength == 0xf: + length_byte = buf[pos] + pos += 1 + matchlength += length_byte + while length_byte == 0xff: + length_byte = buf[pos] + pos += 1 + matchlength += length_byte + matchlength += 4 + match_pos = out.tell() - offset + while matchlength > 0: + copylen = min(matchlength, out.tell() - match_pos) + copybuf = out.getvalue()[match_pos: match_pos + copylen] + if expected is not None: + for i in range(len(copybuf)): + assert copybuf[i] == expected[out.tell() + i], "Mismatch at position %r" % (out.tell() + i) + out.write(copybuf) + if maxlen is not None and out.tell() >= maxlen: + return out.getvalue()[0:maxlen] + matchlength -= copylen + # print("copylen=%r" % copylen) + # Copy from the original position => Copy as many bytes as possible at a time + assert copylen % offset == 0 or matchlength == 0 + # match_pos += copylen % offset + # Old, un-optimized code: + # for i in range(matchlength): + # out.write(out.getvalue()[match_pos + i:match_pos + i + 1]) + return out.getvalue() + + +if __name__ == "__main__": + main() diff --git a/extract-docker.py b/extract-docker.py new file mode 100755 index 0000000..dece66e --- /dev/null +++ b/extract-docker.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 + +# This file is part of Extractor. + +# Copyright (C) 2021 Security Research Labs GmbH +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sys +import time +import argparse +import pathlib +import logging +import subprocess + + +def main(): + parser = argparse.ArgumentParser("Extract using docker extractor image") + + parser.add_argument("--in-file", type=lambda p: pathlib.Path(p).absolute(), required=True, help="Input file (e.g. Android image)") + parser.add_argument("--out-dir", type=lambda p: pathlib.Path(p).absolute(), required=True, help="Output directory") + parser.add_argument('--force-cleanup-and-rebuild', action='store_true') + args = parser.parse_args() + + logging.basicConfig(format='%(levelname)s:%(asctime)s:%(message)s', level=logging.DEBUG) + + # Abort if out dir does not exist or is non-empty + if not args.out_dir.is_dir(): + logging.error("[!] %s not a directory, exiting", args.out_dir) + sys.exit(1) + if any(args.out_dir.iterdir()): + logging.error("[!] %s not empty, exiting", args.out_dir) + sys.exit(1) + + start_time = time.time() + logging.info("[+] Check if docker image is up-to-date") + extractor_revision = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"], cwd=pathlib.Path(__file__).absolute().parents[0]).strip().decode() + image_name = "extractor_image:" + extractor_revision + extractor_image_exists = False + + # Check if some extractor_image exists (all versions), if not build + extractor_image_list = subprocess.check_output(["docker", "images", "-q", "extractor_image"], stderr=subprocess.DEVNULL).splitlines() + + if not extractor_image_list: + logging.info("[+] Building docker image %s", image_name) + subprocess.check_output(["docker", "build", ".", "-t", image_name]) + else: + # If extractor_image already exists, check if we want to force rebuild + if args.force_cleanup_and_rebuild: + # Delete all existing extractor_image images + for image in extractor_image_list: + subprocess.check_output(["docker", "rmi", image.decode()]) + # Build new image + subprocess.check_output(["docker", "build", ".", "-t", image_name]) + else: + # Stop in case we find multiple local images or an outdated image + if len(extractor_image_list) != 1: + logging.error("[!] Too many local extractor_images exist, please use --force-cleanup-and-rebuild to cleanup and rebuild") + sys.exit(1) + elif subprocess.check_output(["docker", "images", "-q", image_name], stderr=subprocess.DEVNULL).strip() not in extractor_image_list: + logging.error("[!] Your existing local image %s is outdated, please use --force-cleanup-and-rebuild to rebuild", extractor_image_list[0].decode()) + sys.exit(1) + + logging.info("[+] Running extractor with docker image %s", image_name) + subprocess.check_call([ + "docker", + "run", + "--privileged", + "--mount", + "type=bind,src=" + str(args.in_file.parents[0]) + ",dst=/in_dir", + "--mount", + "type=bind,src=" + str(args.out_dir) + ",dst=/out_dir", + "--rm", + image_name, + "/in_dir/" + args.in_file.name, + "--system-dir-output", + "/out_dir/" + ]) + + duration = time.time() - start_time + logging.info("%s", f"[+] Output saved to {str(args.out_dir)} in {duration}s") + +if __name__ == "__main__": + main() diff --git a/extractor.py b/extractor.py new file mode 100755 index 0000000..75bd4ec --- /dev/null +++ b/extractor.py @@ -0,0 +1,2290 @@ +#!/usr/bin/env python3 + +# This file is part of Extractor. + +# Copyright (C) 2021 Security Research Labs GmbH +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import logging +import sys +from collections import defaultdict, deque +import re +import argparse +import tempfile +from enum import Enum, auto +from Crypto.Cipher import AES +import struct +# noinspection PyPep8Naming +import xml.etree.ElementTree as ET +import json +from typing import List, Optional, DefaultDict +import shutil +import shlex +from lxml import etree +import liblp + + +base_dir = os.path.dirname(os.path.realpath(__file__)) + + +def main(): + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)-12s %(levelname)-8s: %(message)s') + if os.getuid() != 0: + logging.error("Not running as root, exiting") + sys.exit(1) + parser = argparse.ArgumentParser(description='Android firmware extraction tool') + parser.add_argument("input") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--tar-output", help="Path to system.tar file to generate") + group.add_argument("--system-dir-output", help="Path to store system dir, without intermediate tar file") + group.add_argument("--no-output", action="store_true", help="Only run extraction but ignore output") + parser.add_argument("--boot-recovery-output", help="Directory where boot/recovery img should be stored") + parser.add_argument("--allow-missing-vendor", action="store_true", help="Allow missing vendor partition for extraction, required for system-only updates (=> Project Treble), e.g. for some LineageOS images") + args = parser.parse_args() + extractor = FirmwareExtractor(args.input) + try: + output_boot_img_path = None + output_recovery_img_path = None + if args.boot_recovery_output is not None: + output_boot_img_path = os.path.join(os.path.abspath(args.boot_recovery_output), "boot.img") + output_recovery_img_path = os.path.join(os.path.abspath(args.boot_recovery_output), "recovery.img") + extractor.extract(output_system_tar=args.tar_output, output_system_dir=args.system_dir_output, output_boot_img_path=output_boot_img_path, output_recovery_img_path=output_recovery_img_path, allow_missing_vendor=args.allow_missing_vendor) + finally: + extractor.cleanup() + + +class CheckFileResult(Enum): + ARCHIVE = auto() + SYSTEM_IMG = auto() + VENDOR_IMG = auto() + BOOT_IMG = auto() + RECOVERY_IMG = auto() + SYSTEM_OR_VENDOR = auto() + HANDLER_NO_MATCH = auto() + HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE = auto() + IGNORE = auto() + + +class ImageType(Enum): + SYSTEM = auto() + VENDOR = auto() + + +class FileHandler: + def __init__(self, extractor: "FirmwareExtractor", input_path_rel, file_type, image_type: ImageType = None): + self.extractor: FirmwareExtractor = extractor + self.input_path_rel = input_path_rel + self.abs_fn = self.extractor.abs_fn(input_path_rel) + assert isinstance(self.abs_fn, bytes), "abs_fn must be of type bytes" + assert isinstance(file_type, str), "file_type must be of type str" + assert image_type in (None, ImageType.SYSTEM, ImageType.VENDOR), "Invalid image_type=%r" % image_type + self.fn = self.abs_fn.split(b'/')[-1] + self.file_type = file_type + self.image_type: ImageType = image_type + + def check(self) -> CheckFileResult: + raise NotImplementedError("check() must be implemented in subclass (%s)" % self.__class__.__name__) + + def get_extra_handled_size(self): + return 0 + + +class ZipHandler(FileHandler): + """ + Generic Zip Handler, often used as top-level container format + """ + def check(self) -> CheckFileResult: + if not self.is_good_extension(): + return CheckFileResult.HANDLER_NO_MATCH + if not (self.file_type.lower().startswith("zip") or self.file_type.lower().startswith("java archive data")): + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.ARCHIVE + + def is_good_extension(self) -> bool: + if self.abs_fn.lower().endswith(b".zip"): + return True + if self.abs_fn.lower().endswith(b".ftf"): + # Sony ftf format + return True + if self.abs_fn.lower().endswith(b".ozip"): + # Oppo ozip, in some cases custom format (see OzipHandler), in other cases just a zip file + return True + if self.abs_fn.lower().endswith(b".up"): + # Some ZTE firmwares use ".up" for zip files + return True + return False + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["unzip", "-q", self.abs_fn] + logging.info("ZipHandler: cmd=%r cwd=%r" % (cmd, abs_output_path)) + exitcode = subprocess.call(cmd, cwd=abs_output_path, stdin=subprocess.DEVNULL) + # 0: OK, 1: Finished with warnings + if exitcode in (0, 1): + return + logging.info("Extracting zip file with 'unzip' command failed (exit code %d), retrying with 'jar xf'", exitcode) + # unzip failed, clean up stage dir and try other extractor + assert b'/tmp/AND' in abs_output_path, "abs_output_path %r doesn't contain /tmp/AND" % abs_output_path + shutil.rmtree(abs_output_path) + os.mkdir(abs_output_path) + # Try jar as a second extractor, there is a known issue with unzip and large (>4GB) files: + # https://stackoverflow.com/a/31084012 + cmd = ["jar", "xf", self.abs_fn] + logging.info("ZipHandler fallback to jar: cmd=%r cwd=%r" % (cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class TopLevelZipHandler(ZipHandler): + """ + Generic Zip Handler for top level format, also supports arbitrary file extensions, to be used for initial input file only + """ + def is_good_extension(self) -> bool: + return True + + +class SevenZipHandler(FileHandler): + """ + Generic 7z Handler, sometimes used as top-level container format + """ + def check(self) -> CheckFileResult: + good_extension = False + if self.abs_fn.lower().endswith(b".7z"): + good_extension = True + if not good_extension: + return CheckFileResult.HANDLER_NO_MATCH + if not self.file_type.lower().startswith("7-zip archive data"): + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["7z", "x", self.abs_fn] + logging.info("ZipHandler: cmd=%r cwd=%r" % (cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class RarHandler(FileHandler): + """ + Generic rar Handler, sometimes used as top-level container format + """ + def check(self) -> CheckFileResult: + # Disabled extension check, sometimes there is no .rar extension (but the file type should be reliable enough) + # good_extension = False + # if self.abs_fn.lower().endswith(b".rar"): + # good_extension = True + # if not good_extension: + # return CheckFileResult.HANDLER_NO_MATCH + if not self.file_type.lower().startswith("rar archive data"): + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["unrar", "x", "-psamdownloads.de", self.abs_fn] + logging.info("RarHandler: cmd=%r cwd=%r" % (cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class OzipHandler(FileHandler): + """ + OPPO encrypted image + """ + def check(self) -> CheckFileResult: + good_extension = False + if self.abs_fn.lower().endswith(b".ozip"): + good_extension = True + if not good_extension: + return CheckFileResult.HANDLER_NO_MATCH + magic = open(self.abs_fn, 'rb').read(12) + if magic != b'OPPOENCRYPT!': + if self.file_type.lower().startswith("java archive"): + # Some .ozip files are actually zip, e.g. CPH1809EX_11_OTA_0180_all_OlU3r4ImvcSX_local.ozip + return CheckFileResult.HANDLER_NO_MATCH + assert False, "Invalid ozip magic %r" % magic + return CheckFileResult.ARCHIVE + + def extract_and_get_next_handler(self, stage_dir_rel): + # Replace .ozip with .zip + assert self.abs_fn.lower().endswith(b".ozip") + out_filename = os.path.basename(self.abs_fn)[0:-5] + b".zip" + out_path_rel = os.path.join(stage_dir_rel, out_filename) + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + out_path_abs = os.path.join(abs_stage_dir, out_filename) + ozip = AES.new(b'\xD6\xDC\xCF\x0A\xD5\xAC\xD4\xE0\x29\x2E\x52\x2D\xB7\xC1\x38\x1E', AES.MODE_ECB) + with open(self.abs_fn, 'rb') as ifs: + ifs.seek(0x1050, 0) + with open(out_path_abs, 'wb') as ofs: + while True: + data = ifs.read(16) + ofs.write(ozip.decrypt(data)) + data = ifs.read(0x4000) + if len(data) == 0: + break + ofs.write(data) + return ZipHandler(self.extractor, out_path_rel, file_type=get_file_type(out_path_abs)) + + +class PacHandler(FileHandler): + """ + Mediatek PAC image + """ + def check(self) -> CheckFileResult: + good_extension = False + if self.abs_fn.lower().endswith(b".pac"): + good_extension = True + if not good_extension: + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + sr = os.stat(self.abs_fn) + with open(self.abs_fn, 'rb') as pac: + buf = pac.read(1024 * 1024) + + last_file_end = 0 + + for pos in range(60, 69721, 2580): + file_item = buf[pos:pos + 2580] + name = file_item[0:0x40].decode("utf-16").replace("\x00", "") + start_pos = struct.unpack("= last_file_end, "start_pos >= last_file_end: %d >= %x" % (start_pos, last_file_end) + + if start_pos + length > sr.st_size: + continue + + if name in ("system.img", "system_raw.img", "boot.img", "recovery.img"): + logging.debug("Doing %s", name) + with open(os.path.join(abs_output_path, name.encode()), 'wb') as out: + pac.seek(start_pos) + bytes_done = 0 + + while bytes_done < length: + chunk_len = min(1024 * 1024, length - bytes_done) + out.write(pac.read(chunk_len)) + bytes_done += chunk_len + last_file_end = start_pos + length + + +class IgnoreBadTarMd5Handler(FileHandler): + """ + Ignore same non-wanted .tar.md5 files + """ + def check(self) -> CheckFileResult: + good_extension = False + if self.abs_fn.lower().endswith(b".tar.md5"): # Samsung + good_extension = True + if not good_extension: + return CheckFileResult.HANDLER_NO_MATCH + if 'POSIX tar archive' not in self.file_type: + return CheckFileResult.HANDLER_NO_MATCH + if self.fn.startswith(b'USERDATA_'): + # USERDATA_*.tar.md5 is present on some Samsung images, but it contains only useless stuff such as dalvik caches + # Needs to be ignored so that ArchiveDirectoryHandler can accept handling only other files + return CheckFileResult.IGNORE + if self.fn.startswith(b'CSC_') or self.fn.startswith(b'HOME_CSC_'): + # Needs to be ignored so that ArchiveDirectoryHandler can accept handling only other files + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class TarHandler(FileHandler): + """ + Handler for tar files (and Samsung .tar.md5 files, which are actually tar archives) + """ + def check(self) -> CheckFileResult: + if not self.is_good_extension(): + return CheckFileResult.HANDLER_NO_MATCH + # Not every tar archive is a "POSIX" tar archive. + if 'POSIX tar archive' in self.file_type or self.file_type == 'tar archive': + if self.fn.startswith(b'USERDATA_') and self.fn.endswith(b'.tar.md5'): + # USERDATA_*.tar.md5 is present on some Samsung images, but it contains only useless stuff such as dalvik caches + # Needs to be ignored so that ArchiveDirectoryHandler can accept handling only other files + return CheckFileResult.IGNORE + return CheckFileResult.ARCHIVE + else: + return CheckFileResult.HANDLER_NO_MATCH + + def is_good_extension(self) -> bool: + if self.abs_fn.lower().endswith(b".tar"): + return True + if self.abs_fn.lower().endswith(b".tar.md5"): # Samsung + return True + return False + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["tar", "xvf", self.abs_fn] + logging.info("TarHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class TarHandlerIgnoreExtension(TarHandler): + def is_good_extension(self) -> bool: + return True + + +class HuaweiAppHandler(FileHandler): + """ + Handler for Huawei .app images + """ + def check(self) -> CheckFileResult: + if not self.abs_fn.lower().endswith(b".app"): + return CheckFileResult.HANDLER_NO_MATCH + assert self.fn.lower().startswith(b'update') + # No file_type check, is typically "data" + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["%s/splituapp/splituapp" % base_dir, "-f", self.abs_fn, "-o", abs_output_path, "--no-crc"] + logging.info("HuaweiAppHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class KdzHandler(FileHandler): + """ + Handler for LG kdz format + """ + def check(self) -> CheckFileResult: + if not self.abs_fn.lower().endswith(b".kdz"): + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["%s/kdzextractor/unkdz.py" % base_dir, "-x", "-f", self.abs_fn] + logging.info("KdzHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class DzHandler(FileHandler): + """ + Handler for LG kdz format + """ + def check(self) -> CheckFileResult: + if not self.abs_fn.lower().endswith(b".dz"): + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["python", "%s/kdzextractor/undz.py" % base_dir, "-x", "-f", self.abs_fn] + logging.info("DzHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + # undz creates a number of system_.bin files. + # is the block number whete this file is in the final system image. + # The block size is variable, can be found based on the size of the first image + # and the offset of the second image. + # pdb.set_trace() + dzextracted_path = os.path.join(abs_output_path, b'dzextracted') + listing = os.listdir(dzextracted_path) + file_num_to_fn = dict() + for fn in listing: + if fn.startswith(b'vendor_') and fn.endswith(b'.bin'): + assert False, "Please implement DzHandler Vendor extraction" + if not fn.startswith(b'system_'): + continue + if not fn.endswith(b'.bin'): + continue + num = int(fn[7:-4].decode()) + file_num_to_fn[num] = os.path.join(dzextracted_path, fn) + files_sorted = list(sorted(file_num_to_fn.keys())) + offset = files_sorted[0] + value = files_sorted[1] - files_sorted[0] + info = os.stat(os.path.join(output_path_rel, file_num_to_fn[files_sorted[0]])) + # noinspection PyUnusedLocal + bs: int + if (value * 512) >= info.st_size: + bs = 512 + elif (value * 1024) >= info.st_size: + bs = 1024 + elif (value * 2048) >= info.st_size: + bs = 2048 + elif (value * 4096) >= info.st_size: + bs = 4096 + else: + assert False, "Failed to find block size" + + abs_system_img = os.path.join(abs_output_path, b'system.img') + with open(abs_system_img, 'wb') as out_fh: + for file_num in files_sorted: + file_name = file_num_to_fn[file_num] + pos = bs * (file_num - offset) + out_fh.seek(pos) + with open(file_name, 'rb') as in_fh: + while True: + buf = in_fh.read(1024 * 1024) + if len(buf) == 0: + break + out_fh.write(buf) + os.unlink(file_name) # Unlink is required so that the next ArchiveDirectoryHandler will not be confused by the low handled size + # logging.info("Please check the results once") + # pdb.set_trace() + + +class SinHandler(FileHandler): + """ + Handler for system.sin files (Sony) + """ + def check(self) -> CheckFileResult: + if self.fn.lower() == b'system.sin': + return CheckFileResult.ARCHIVE + elif self.fn.lower() == b'vendor.sin': + assert False, "TODO: Check and implement extraction of vendor.sin" + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_file2dir(self, output_path_rel): + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["%s/sinextract/sinextract" % base_dir, abs_output_path, self.abs_fn] + logging.info("SinHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path)) + subprocess.check_call(cmd, cwd=abs_output_path) + + +class RawprogramUnsparseHandler(FileHandler): + system_parts_with_pos: List[List] + vendor_parts_with_pos: List[List] + super_parts_with_pos: List[List] + extra_ignored_size: int + + def check(self) -> CheckFileResult: + self.system_parts_with_pos = [] + self.vendor_parts_with_pos = [] + self.super_parts_with_pos = [] + self.extra_ignored_size = 0 + if self.fn == b"contents.xml": + return self.check_contents_xml(self.abs_fn) + else: + xml_files_by_priority = ( + b'rawprogram_unsparse.xml', + b'rawprogram0.xml', + b'rawprogram_unsparse(US).xml', + b'rawprogram0_unsparse.xml', + b'rawprogram_unsparse0.xml', + b'rawprogram0_unsparse_upgrade.xml', + b'rawprogram_upgrade.xml', + b'rawprogram_unsparse_upgrade.xml' + ) + if self.fn not in xml_files_by_priority: + return CheckFileResult.HANDLER_NO_MATCH + input_dir_abs = os.path.dirname(self.abs_fn) + direct_system_img_path = os.path.join(input_dir_abs, b'system.img') + if os.path.exists(direct_system_img_path) and os.stat(direct_system_img_path).st_size > 128 * 1024 * 1024: + return CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE # Some images contain a system.img file directly and some non-working xml + # If contents.xml exists, it should be used and this handler should return HANDLER_NO_MATCH for all other + # xml files. However, in some cases contents.xml is broken and we need to continue based on + # xml_files_by_priority + content_xml_path = os.path.join(input_dir_abs, b'contents.xml') + if os.path.exists(content_xml_path): + if self.check_contents_xml(content_xml_path) != CheckFileResult.HANDLER_NO_MATCH: + return CheckFileResult.HANDLER_NO_MATCH + highest_priority_existing_file = None + for tmp_fn in reversed(xml_files_by_priority): + if os.path.exists(os.path.join(input_dir_abs, tmp_fn)): + highest_priority_existing_file = tmp_fn + assert highest_priority_existing_file is not None + if highest_priority_existing_file != self.fn: + return CheckFileResult.HANDLER_NO_MATCH # There is a better (higher priority) xml file, so let's ignore this one + logging.info("RawprogramUnsparseHandler: Checking file %r" % self.abs_fn) + return self.parse_xml_file(self.abs_fn) + + def check_contents_xml(self, abs_contents_xml: bytes) -> CheckFileResult: + contents_xml_dir = os.path.dirname(abs_contents_xml) + parser = etree.XMLParser() + tree = etree.parse(open(abs_contents_xml), parser) + root = tree.getroot() + params_tags = root.xpath('//step[@filter="hlos"]/params') + if len(params_tags) == 0: + raise ValueError("Failed to parse contents.xml") + result = CheckFileResult.HANDLER_NO_MATCH + for params_tag in params_tags: + cmd_str = params_tag.text.strip() + assert "@rawprogram_xml" in cmd_str + cmd = shlex.split(cmd_str) + assert cmd[-2] == "-o" + xml_fn = cmd[-1] + abs_fn = os.path.join(contents_xml_dir, xml_fn.encode()) + # assert os.path.exists(abs_fn), "File %r (referenced from %r) doesn't exist" % (abs_fn, self.abs_fn) + if os.path.exists(abs_fn): + result = self.parse_xml_file(abs_fn) + else: + logging.error("File %r (referenced from %r) doesn't exist", abs_fn, abs_contents_xml) + return result + + def parse_xml_file(self, abs_xml_fn): + try: + root = ET.parse(open(abs_xml_fn)) + except ET.ParseError: + # Workaround for crappy XML, e.g. document starting with \n" + "\n".join(lines) + "") + program_tags = root.findall('program') + sector_size: Optional[int] = None + image_base_dir = os.path.dirname(abs_xml_fn) + partition_start_sector_by_label = {} + found_vendor_b: bool = False + for program_tag in program_tags: + label = program_tag.attrib["label"] + if label in ("system", "system_a", "vendor", "vendor_a", "super"): + logging.info("RawprogramUnsparseHandler: program_tag.attrib=%s" % json.dumps(program_tag.attrib, sort_keys=True)) + # Sparse attribute can cause problems (sector size mismatch etc.), it will be handled directly by SuperImageHandler + if label == "super" and "sparse" in program_tag.attrib and program_tag.attrib["sparse"].lower() == "true": + continue + filename = program_tag.attrib["filename"] + abs_fn = os.path.join(image_base_dir, filename.encode()) + if not os.path.exists(abs_fn): + if b'image/modem/' in abs_xml_fn: + return CheckFileResult.IGNORE + raise ValueError("File %r doesn't exist (referenced from %r)" % (abs_fn, abs_xml_fn)) + if "SECTOR_SIZE_IN_BYTES" in program_tag.attrib: + if sector_size is None: + sector_size = int(program_tag.attrib["SECTOR_SIZE_IN_BYTES"]) + assert sector_size in [512, 4096] + else: + assert int(program_tag.attrib["SECTOR_SIZE_IN_BYTES"]) == sector_size, "Inconsistent sector size: %r <=> %r" % (int(program_tag.attrib["SECTOR_SIZE_IN_BYTES"]), sector_size) + else: + # Found a program tag without SECTOR_SIZE_IN_BYTES => Fall back to default 512 + sector_size = 512 + assert os.stat(abs_fn).st_size % sector_size == 0, "File %r is not a multiple of %d bytes" % (abs_fn, sector_size) + start_sector = int(program_tag.attrib["start_sector"]) + if label not in partition_start_sector_by_label: + partition_start_sector_by_label[label] = start_sector + start_pos = sector_size * (start_sector - partition_start_sector_by_label[label]) + assert start_pos < 10 * 1024 * 1024 * 1024, "RawprogramUnsparseHandler: Sparse image too big (>10 GiB)" + if label.startswith("system"): + self.system_parts_with_pos.append([abs_fn, start_pos]) + elif label.startswith("vendor"): + self.vendor_parts_with_pos.append([abs_fn, start_pos]) + elif label.startswith("super"): + self.super_parts_with_pos.append([abs_fn, start_pos]) + else: + raise ValueError("Bad label %r, this should not happen" % label) + elif label == "vendor_b": + found_vendor_b = True + elif label.startswith("custom") or label.startswith("userdata"): + filename = program_tag.attrib["filename"] + abs_fn = os.path.join(image_base_dir, filename.encode()) + self.extra_ignored_size += os.stat(abs_fn).st_size + else: + # Just to make sure we aren't missing a vendor partition here + assert 'vendor' not in label.lower(), "Found unexpected program label containing 'vendor' in %r" % program_tag.attrib["label"] + if found_vendor_b: + assert len(self.vendor_parts_with_pos) > 0, "XML file %r contains vendor_b but no valid vendor" % abs_xml_fn + if len(self.system_parts_with_pos) == 0 and len(self.vendor_parts_with_pos) == 0 and len(self.super_parts_with_pos) == 0: + return CheckFileResult.HANDLER_NO_MATCH + return CheckFileResult.SYSTEM_OR_VENDOR + + def get_extra_handled_size(self): + result = 0 + for (part_fn, pos) in self.system_parts_with_pos: + result += os.stat(part_fn).st_size + for (part_fn, pos) in self.vendor_parts_with_pos: + result += os.stat(part_fn).st_size + for (part_fn, pos) in self.super_parts_with_pos: + result += os.stat(part_fn).st_size + return result + + def has_vendor(self): + return len(self.vendor_parts_with_pos) > 0 + + def extract_and_get_next_handlers(self, stage_dir_rel) -> List[FileHandler]: + def extract_parts_to_file(my_parts: List[List], my_abs_out_fn): + with open(my_abs_out_fn, 'wb') as out_fh: + for item in my_parts: + # logging.info("ITEM: %r" % item) + (part_fn, pos) = item + out_fh.seek(pos) + with open(part_fn, 'rb') as in_fh: + while True: + buf = in_fh.read(1024 * 1024) + if len(buf) == 0: + break + out_fh.write(buf) + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + result: List[FileHandler] = [] + if len(self.super_parts_with_pos) > 0: + assert len(self.system_parts_with_pos) == 0, "Can't have system and super image" + assert len(self.vendor_parts_with_pos) == 0, "Can't have vendor and super image" + output_fn = b'super.img' + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + extract_parts_to_file(self.super_parts_with_pos, abs_out_fn) + handler = SuperImageHandler(self.extractor, self.extractor.rel_path(abs_out_fn), file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result == CheckFileResult.HANDLER_NO_MATCH: + raise ValueError("RawprogramUnsparseHandler: Extracted super.img but SuperImageHandler returned HANDLER_NO_MATCH") + result.append(handler) + else: + for image_type in (ImageType.SYSTEM, ImageType.VENDOR): + if image_type == ImageType.VENDOR and len(self.vendor_parts_with_pos) == 0: + continue + output_fn = b'%s.img' % image_type.name.lower().encode() + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + parts: List[List] + if image_type == ImageType.SYSTEM: + parts = self.system_parts_with_pos + elif image_type == ImageType.VENDOR: + parts = self.vendor_parts_with_pos + else: + raise ValueError("Invalid image_type=%r" % image_type) + extract_parts_to_file(parts, abs_out_fn) + HANDLER_TYPES = [ExtfsHandler, ErofsHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("RawprogramUnsparseHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + result.append(handlers[0]) + return result + + +class IgnoreRadioHandler(FileHandler): + """ + Handler to ignore radio-*.img files, e.g. radio-taimen-g8998-00253-1805232234.img + """ + def check(self) -> CheckFileResult: + if self.fn.startswith(b'radio-') and self.fn.endswith(b'.img'): + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreBootloaderHandler(FileHandler): + """ + Handler to ignore bootloader-*.img files, e.g. bootloader-taimen-tmz20k.img + """ + def check(self) -> CheckFileResult: + if self.fn.startswith(b'bootloader-') and self.fn.endswith(b'.img'): + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreVmlinuxHandler(FileHandler): + """ + Handler to ignore "vmlinux" files, helps with coverage for some images + """ + def check(self) -> CheckFileResult: + if self.fn.lower() == b"vmlinux": + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreOpImageHandler(FileHandler): + """ + Handler to ignore OP_\\d+.bin files, e.g. from /android/LG/may2017/H840AR10a_01_0906.kdz/H840AR10a_01_0906.kdz + """ + def check(self) -> CheckFileResult: + m = re.match(rb'op_\d+\.bin', self.fn.lower()) + if m: + # May have various different file types, e.g. ext4 or jar. So let's just match by filename here. + logging.info("IgnoreOpImageHandler: file %r => file_type=%r" % (self.abs_fn, self.file_type)) + return CheckFileResult.IGNORE + m = re.match(rb'op_\w+\.img', self.fn.lower()) + if m: + # Sample: OP_OPEN_ZA.img from H84020c_00_OPEN_ZA_OP_0630.kdz + logging.info("IgnoreOpImageHandler: file %r => file_type=%r" % (self.abs_fn, self.file_type)) + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreOemImgHandler(FileHandler): + """ + Handler to ignore oem.img files, e.g. from /android/2018-06/Blur_Version.28.11.15.payton_fi.google_fi.en.US.zip/Blur_Version.28.11.15.payton_fi.google_fi.en.US.zip + """ + def check(self) -> CheckFileResult: + if self.fn == b'oem.img': + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreAppsImgHandler(FileHandler): + """ + Handler to ignore apps.img (and apps_X.img) files + """ + def check(self) -> CheckFileResult: + if re.match(rb'apps(_\d+)?\.img', self.fn): + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreUpdateHwHandler(FileHandler): + """ + Handler to ignore update_full_*_hw_*.zip files + Required e.g. for /android/2018-11/9.0.0.47-c432.zip/9.0.0.47-c432.zip + Only required in ArchiveDirectoryHandler Pass2 + """ + def check(self) -> CheckFileResult: + m = re.match(rb'update_full_.*_hw_\w+\.zip', self.fn.lower()) + if m: + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreHuaweiUserdataAppHandler(FileHandler): + """ + Handler to ignore USERDATA.APP + Required so that the actual image (UPDATE.APP) will be >90% + """ + def check(self) -> CheckFileResult: + if self.fn.lower() == b"userdata.app": + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class IgnoreElfHandler(FileHandler): + """ + Handler to ignore elf files + Required to reach size coverage threshold + """ + def check(self) -> CheckFileResult: + if self.file_type.startswith("ELF ") and self.fn.lower().endswith(b'.elf'): + return CheckFileResult.IGNORE + return CheckFileResult.HANDLER_NO_MATCH + + +class SparseImageHandler(FileHandler): + abs_fn_list: List[bytes] + + def check(self) -> CheckFileResult: + if self.file_type.startswith("Android sparse image, version: 1.0,"): + if self.fn.lower().startswith(b"system_other"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'hidden.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'cache'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'userdata.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'userdata_'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'persist.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'3rdmodem.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'cust.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'product.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'odm.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'oem.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'container.'): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b'apps.'): + # Vivo + return CheckFileResult.IGNORE + if self.fn.lower().endswith(b".duplicate"): + return CheckFileResult.IGNORE # splituapp duplicate file entries in Huawei UPDATE.APP + if self.fn.lower().startswith(b"op_") or self.fn.lower().startswith(b"op."): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"oem_"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"preas_"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"preas."): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"non-hlos."): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"super"): + return CheckFileResult.HANDLER_NO_MATCH # Will be covered by SuperImageHandler + self.abs_fn_list = [] + if b"sparsechunk" in self.fn.lower(): + if self.fn.lower().endswith(b"sparsechunk.0"): + base_abs_fn = self.abs_fn[0:-2] + for i in range(100): + abs_fn = base_abs_fn + b".%d" % i + if os.path.exists(abs_fn): + self.abs_fn_list.append(abs_fn) + else: + break + else: + return CheckFileResult.IGNORE + else: + self.abs_fn_list.append(self.abs_fn) + if self.fn.lower().startswith(b"system.") or self.fn.lower().startswith(b"system_a.") or self.fn.lower().startswith(b"system-sign."): + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + elif self.fn.lower().startswith(b"vendor.") or self.fn.lower().startswith(b"vendor_a.") or self.fn.lower().startswith(b"vendor-sign."): + self.image_type = ImageType.VENDOR + return CheckFileResult.VENDOR_IMG + elif self.fn.lower().startswith(b"system_b.") or self.fn.lower().startswith(b"vendor_b."): + return CheckFileResult.IGNORE + elif self.fn.lower().startswith(b"system_ext."): + return CheckFileResult.IGNORE + elif self.fn.lower().startswith(b"system_other."): + return CheckFileResult.IGNORE + else: + if os.stat(self.abs_fn).st_size < 32 * 1024 * 1024: + # Ignore images smaller than 32 MiB, these images can't be a valid system/vendor partition + return CheckFileResult.IGNORE + assert False, "SparseImageHandler: %r does not start with system/vendor (Size %.2f MiB)" % (self.fn, os.stat(self.abs_fn).st_size / 1024**2) + else: + return CheckFileResult.HANDLER_NO_MATCH + + def get_extra_handled_size(self) -> int: + result = 0 + # Only count from file 1 + for fn in self.abs_fn_list[1:]: + result += os.stat(fn).st_size + return result + + def extract_and_get_next_handler(self, stage_dir_rel): + output_fn = self.fn + b".SparseImageHandler" + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + cmd: List[bytes] = [b"simg2img"] + self.abs_fn_list + [abs_out_fn] + logging.info("SparseImageHandler: cmd=%r" % cmd) + subprocess.check_call(cmd) + assert os.path.exists(abs_out_fn) + HANDLER_TYPES = [ExtfsHandler, AsusMagicHandler, ErofsHandler, MotoPivHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("SparseImageHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class MotoPivHandler(FileHandler): + def check(self) -> CheckFileResult: + with open(self.abs_fn, 'rb') as f: + buf = f.read(32) + if buf[0:4] == b'MOTO' and b'MOT_PIV_FULL256' in buf: + if self.image_type == ImageType.SYSTEM: + return CheckFileResult.SYSTEM_IMG + elif self.image_type == ImageType.VENDOR: + return CheckFileResult.VENDOR_IMG + else: + raise ValueError("Bad image_type %r" % self.image_type) + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + output_fn = self.fn + b".MotoPivHandler" + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + with open(self.abs_fn, 'rb') as input_file: + buf = input_file.read(32) + offset = struct.unpack(" 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("MotoPivHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class SuperImageHandler(FileHandler): + is_sparse: bool + + def check(self) -> CheckFileResult: + self.is_sparse = False + if self.file_type.startswith("Android sparse image, version: 1.0,"): + if self.fn.lower().startswith(b"super"): + self.is_sparse = True + return CheckFileResult.ARCHIVE + else: + return CheckFileResult.HANDLER_NO_MATCH + else: + with open(self.abs_fn, 'rb') as f: + if not liblp.check_magic(f): + return CheckFileResult.HANDLER_NO_MATCH + if not self.fn.lower().startswith(b'super'): + raise ValueError("Found liblp magic but not in super image, this should not happen") + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + super_img_fn = self.abs_fn + if self.is_sparse: + super_img_fn = self.abs_fn + b'.unsparse' + cmd: List[bytes] = [b"simg2img", self.abs_fn, super_img_fn] + subprocess.check_call(cmd) + super_img = liblp.SuperImage(super_img_fn) + abs_output_path = self.extractor.abs_fn(output_path_rel) + partition_names = super_img.get_partition_names() + found_system = False + for partition_name in ["system", "system_a", "system_b"]: + if partition_name in partition_names: + output_fn = os.path.join(abs_output_path, partition_name.encode() + b".img") + with open(output_fn, 'wb') as f: + super_img.write_partition(partition_name, f) + found_system = True + break + assert found_system, "Failed to find system in super.img" + found_vendor = False + for partition_name in ["vendor", "vendor_a", "vendor_b"]: + if partition_name in partition_names: + output_fn = os.path.join(abs_output_path, partition_name.encode() + b".img") + with open(output_fn, 'wb') as f: + super_img.write_partition(partition_name, f) + found_vendor = True + break + assert found_vendor, "Failed to find vendor in super.img" + + +class SignImgHandler(FileHandler): + """ + https://github.com/R0rt1z2/signimg2img + """ + def check(self) -> CheckFileResult: + magic_buf: bytes + # Read magic + with open(self.abs_fn, 'rb') as f: + magic_buf = f.read(4) + if magic_buf not in (b'BFBF', b'SSSS'): + return CheckFileResult.HANDLER_NO_MATCH + if self.fn.lower() == b"system-sign.img": + return CheckFileResult.SYSTEM_IMG + elif self.fn.lower() == b"vendor-sign.img": + return CheckFileResult.VENDOR_IMG + # TODO: Maybe also add boot/recovery images + assert b'system' not in self.fn, "Unexpected system image in SignImgHandler: %r" % self.fn + assert b'vendor' not in self.fn, "Unexpected vendor image in SignImgHandler: %r" % self.fn + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + output_fn = self.fn + b".SparseImageHandler" + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + with open(self.abs_fn, 'rb') as input_fh, open(abs_out_fn, 'wb') as output_fh: + buf = input_fh.read(1024) + magic = buf[0:4] + if magic == b'SSSS': + # https://github.com/R0rt1z2/signimg2img is using 60:64, but at least some images have offset 44:48 + # Sample: TB-7305F_S000083_200703_ROW.zip + offset = struct.unpack(" 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError( + "SignImgHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % ( + abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class AsusMagicHandler(FileHandler): + def check(self) -> CheckFileResult: + if self.fn.lower().startswith(b"system") or self.fn.lower().startswith(b"vendor"): + buf = open(self.abs_fn, 'rb').read(4096) + magic = buf[0x0:0xc] + if magic == b'ASUS MAGIC!\n': + if self.fn.lower().startswith(b"system"): + return CheckFileResult.SYSTEM_IMG + else: + assert self.fn.lower().startswith(b"vendor") + return CheckFileResult.VENDOR_IMG + else: + return CheckFileResult.HANDLER_NO_MATCH + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + output_fn = self.fn + b".AsusMagicHandler" + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + with open(self.abs_fn, 'rb') as input_file: + input_file.read(4096) + with open(abs_out_fn, 'wb') as output_file: + buf = input_file.read(128 * 1024) + while len(buf) > 0: + output_file.write(buf) + buf = input_file.read(128 * 1024) + HANDLER_TYPES = [ExtfsHandler, ErofsHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("AsusMagicHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class Lz4Handler(FileHandler): + def check(self) -> CheckFileResult: + if self.file_type.startswith("LZ4 compressed data"): + if not self.fn.lower().endswith(b'.lz4'): + if os.stat(self.abs_fn).st_size > 32 * 1024 * 1024: + raise ValueError("Bad LZ4 filename %r" % self.fn) + else: + return CheckFileResult.HANDLER_NO_MATCH + if self.fn.lower().startswith(b"system_other"): + return CheckFileResult.IGNORE + if self.fn.lower() == b"boot.img.lz4": + return CheckFileResult.HANDLER_NO_MATCH # Handled by BootImageHandler + if self.fn.lower() == b"recovery.img.lz4": + return CheckFileResult.HANDLER_NO_MATCH # Handled by RecoveryImageHandler + if self.fn.lower().startswith(b"persist."): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"userdata."): # userdata partition contains stuff like dalvik cache etc. + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"carrier."): # userdata partition contains stuff like dalvik cache etc. + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"product.img"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"cache.img"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"hidden.img"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"non-hlos"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"modem"): + return CheckFileResult.IGNORE + if self.fn.lower().startswith(b"system"): + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + elif self.fn.lower().startswith(b"vendor"): + self.image_type = ImageType.VENDOR + return CheckFileResult.VENDOR_IMG + else: + if os.stat(self.abs_fn).st_size < 32 * 1024 * 1024: + # Ignore images smaller than 32 MiB, these images can't be a valid system/vendor partition + return CheckFileResult.IGNORE + assert False, "Lz4Handler: %r does not start with system/vendor" % self.fn + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + assert self.fn.endswith(b'.lz4') + output_fn = self.fn[0:-4] + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + cmd = ["lz4", "-dc", self.abs_fn] + logging.info("Lz4Handler: cmd=%r, out=%r" % (cmd, abs_out_fn)) + # The command "lz4 -d" is behaving differently depending on whether stdout is a console or not. + # If it is a console, it will strip the .lz4 extension and use the remaining path as output file. + # If it is not a console (e.g. if the extractor is called from another utility and stdout is captured), + # lz4 -d will just output the decompressed data to stdout. There is no command-line option to force output + # to a file, so let's force output to stdout and redirect it using subprocess + with open(abs_out_fn, 'wb') as f: + subprocess.check_call(cmd, stdout=f) + assert os.path.exists(abs_out_fn) + HANDLER_TYPES = [ExtfsHandler, SparseImageHandler, ErofsHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("Lz4Handler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class GzipHandler(FileHandler): + def check(self) -> CheckFileResult: + if self.file_type.startswith("gzip compressed data"): + assert self.fn.endswith(b'.gz') or self.fn.endswith(b'.tgz') + return CheckFileResult.ARCHIVE + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + if self.fn.endswith(b'.gz'): + output_fn = self.fn[0:-3] + elif self.fn.endswith(b'.tgz'): + output_fn = self.fn[0:-4] + b'.tar' + else: + assert False, "Invalid gzip filename %r" % self.fn + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + cmd = ["gzip", "-dc", self.abs_fn] + logging.info("GzipHandler: cmd=%r out=%r" % (cmd, abs_out_fn)) + # The command "gzip -d" has no command line option to force output to a + # specific file. + with open(abs_out_fn, 'wb') as f: + retcode = subprocess.call(cmd, stdout=f) + # Exit code 2 means warning, e.g. "trailing garbage ignored" + assert retcode in (0, 2), "GzipHandler: command %r failed with exit code %r" % (cmd, retcode) + assert os.path.exists(abs_out_fn) + HANDLER_TYPES = [TarHandlerIgnoreExtension] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in [CheckFileResult.ARCHIVE], "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("GzipHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class Bzip2Handler(FileHandler): + def check(self) -> CheckFileResult: + if self.file_type.startswith("bzip2 compressed data"): + assert self.fn.endswith(b'.bz2') + return CheckFileResult.ARCHIVE + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + if self.fn.endswith(b'.bz2'): + output_fn = self.fn[0:-4] + else: + assert False, "Invalid bzip2 filename %r" % self.fn + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + cmd = ["bzip2", "-dc", self.abs_fn] + logging.info("Bzip2Handler: cmd=%r out=%r" % (cmd, abs_out_fn)) + # The command "bzip2 -d" has no command line option to force output to a + # specific file. + with open(abs_out_fn, 'wb') as f: + subprocess.check_call(cmd, stdout=f) + assert os.path.exists(abs_out_fn) + HANDLER_TYPES = [TarHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in [CheckFileResult.ARCHIVE], "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("Bzip2Handler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class XzHandler(FileHandler): + def check(self) -> CheckFileResult: + if self.file_type.startswith("XZ compressed data"): + assert self.fn.endswith(b'.xz') + return CheckFileResult.ARCHIVE + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + if self.fn.endswith(b'.xz'): + output_fn = self.fn[0:-3] + else: + assert False, "Invalid xz filename %r" % self.fn + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + cmd = ["xz", "-dc", self.abs_fn] + logging.info("XzHandler: cmd=%r out=%r" % (cmd, abs_out_fn)) + # The command "xz -d" has no command line option to force output to a + # specific file. + with open(abs_out_fn, 'wb') as f: + subprocess.check_call(cmd, stdout=f) + assert os.path.exists(abs_out_fn) + HANDLER_TYPES = [TarHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in [CheckFileResult.ARCHIVE], "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("XzHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class BrotliHandler(FileHandler): + def check(self) -> CheckFileResult: + # Brotli is not recognized with file + if self.fn == b'system.new.dat.br': + if os.path.exists(os.path.join(os.path.dirname(self.abs_fn), b'system.transfer.list')): + # This case is handled by TransferListHandler, which also contains brotli decompression + return CheckFileResult.HANDLER_NO_MATCH + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + elif self.fn == b'vendor.new.dat.br': + if os.path.exists(os.path.join(os.path.dirname(self.abs_fn), b'vendor.transfer.list')): + # This case is handled by TransferListHandler, which also contains brotli decompression + return CheckFileResult.HANDLER_NO_MATCH + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_and_get_next_handler(self, stage_dir_rel): + output_fn = self.fn[0:-3] + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + cmd = ["brotli", "--decompress", self.abs_fn, b"--output=%s" % abs_out_fn] + logging.info("BrotliHandler: cmd=%r" % cmd) + subprocess.check_call(cmd) + assert os.path.exists(abs_out_fn) + HANDLER_TYPES = [ExtfsHandler, ErofsHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("BrotliHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class TransferListHandler(FileHandler): + file_size: int + new_commands: List[List[int]] + data_files: List[bytes] + image_type: ImageType + force_single_file: bool + + BLOCK_SIZE = 4096 + + def check(self) -> CheckFileResult: + if self.fn == b"system.transfer.list": + self.image_type = ImageType.SYSTEM + elif self.fn == b"vendor.transfer.list": + self.image_type = ImageType.VENDOR + else: + return CheckFileResult.HANDLER_NO_MATCH + self.file_size = 0 + self.new_commands = [] + self.data_files = [] + for line in open(self.abs_fn): + line_split = line.split(" ") + cmd = line_split[0] + if cmd in ("new", "erase", "zero"): + assert len(line_split) == 2, "Not exactly 2 items in line %r" % line + cmd_data = line_split[1] + cmd_data_items = [int(x) for x in cmd_data.split(",")] + # First element is number of elements + assert cmd_data_items[0] == len(cmd_data_items) - 1 + # Find file size based on maximum block number + for i in range(1, len(cmd_data_items), 2): + if cmd_data_items[i] * TransferListHandler.BLOCK_SIZE > self.file_size: + self.file_size = cmd_data_items[i] * TransferListHandler.BLOCK_SIZE + if cmd == "new": + self.new_commands.append(cmd_data_items[1:]) + # Some firmwares append ".1", ".2", ... to the input files for individual "new" commands in system.transfer.list. + # Other firmwares use one single file for that. + self.force_single_file = False + for i in range(len(self.new_commands)): + if self.image_type == ImageType.SYSTEM: + fn = b'system.new.dat' + elif self.image_type == ImageType.VENDOR: + fn = b'vendor.new.dat' + else: + raise ValueError("Bad image_type %r" % self.image_type) + fn_with_index = fn + (".%d" % i).encode() + if i == 1: + abs_fn_with_index = os.path.join(os.path.dirname(self.abs_fn), fn_with_index) + if not os.path.exists(abs_fn_with_index) and not os.path.exists(abs_fn_with_index + b'.br'): + self.force_single_file = True + if i > 0 and not self.force_single_file: + fn = fn_with_index + abs_fn = os.path.join(os.path.dirname(self.abs_fn), fn) + if os.path.exists(abs_fn): + self.data_files.append(abs_fn) + else: + abs_fn += b'.br' + assert os.path.exists(abs_fn), "File %r (referenced from %r) doesn't exist" % (abs_fn, self.abs_fn) + self.data_files.append(abs_fn) + assert self.file_size > 0 + if self.image_type == ImageType.SYSTEM: + return CheckFileResult.SYSTEM_IMG + elif self.image_type == ImageType.VENDOR: + return CheckFileResult.VENDOR_IMG + else: + raise ValueError("Bad image_type %r" % self.image_type) + + def get_extra_handled_size(self) -> int: + result = 0 + for fn in self.data_files: + result += os.stat(fn).st_size + return result + + def extract_and_get_next_handler(self, stage_dir_rel) -> FileHandler: + output_fn = self.fn[0:-len(b'.transfer.list')] + b'.img' + abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel) + abs_out_fn = os.path.join(abs_stage_dir, output_fn) + with open(abs_out_fn, 'wb') as output_file: + assert len(self.new_commands) == len(self.data_files) + if self.force_single_file: + data_file = self.data_files[0] + if data_file.endswith(b".br"): + real_data_file = data_file[0:-3] + cmd = ["brotli", "--decompress", data_file, b"--output=%s" % real_data_file] + subprocess.check_call(cmd) + data_file = real_data_file + with open(data_file, 'rb') as input_file: + for cmd_index in range(len(self.new_commands)): + new_cmd = self.new_commands[cmd_index] + for i in range(0, len(new_cmd), 2): + begin_block = new_cmd[i] + end_block = new_cmd[i + 1] + block_cnt = end_block - begin_block + output_file.seek(begin_block * TransferListHandler.BLOCK_SIZE) + for _i in range(block_cnt): + buf = input_file.read(TransferListHandler.BLOCK_SIZE) + assert len(buf) == TransferListHandler.BLOCK_SIZE, "Short read from %r: %d bytes" % (data_file, len(buf)) + output_file.write(buf) + else: + for cmd_index in range(len(self.new_commands)): + new_cmd = self.new_commands[cmd_index] + data_file = self.data_files[cmd_index] + if data_file.endswith(b".br"): + real_data_file = data_file[0:-3] + cmd = ["brotli", "--decompress", data_file, b"--output=%s" % real_data_file] + subprocess.check_call(cmd) + data_file = real_data_file + with open(data_file, 'rb') as input_file: + for i in range(0, len(new_cmd), 2): + begin_block = new_cmd[i] + end_block = new_cmd[i+1] + block_cnt = end_block - begin_block + output_file.seek(begin_block * TransferListHandler.BLOCK_SIZE) + for _i in range(block_cnt): + buf = input_file.read(TransferListHandler.BLOCK_SIZE) + assert len(buf) == TransferListHandler.BLOCK_SIZE, "Short read from %r: %d bytes" % (data_file, len(buf)) + output_file.write(buf) + if output_file.tell() < self.file_size: + output_file.truncate(self.file_size) + HANDLER_TYPES = [ExtfsHandler, ErofsHandler] + handlers = [] + for handler_type in HANDLER_TYPES: + handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn)) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__) + handlers.append(handler) + if len(handlers) > 1: + raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers])) + elif len(handlers) == 0: + raise NoHandlerMatchError("TransferListHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn))) + else: + return handlers[0] + + +class NokiaPayloadBinHandler(FileHandler): + """ + Handler for Nokia payload.bin files + """ + def check(self) -> CheckFileResult: + if self.fn.lower() != b'payload.bin': + return CheckFileResult.HANDLER_NO_MATCH + # file_type is 'data' => no check for that + assert os.stat(self.abs_fn).st_size >= 32 * 1024 * 1024 + return CheckFileResult.ARCHIVE + + def extract_file2dir(self, output_path_rel): + global base_dir + abs_output_path = self.extractor.abs_fn(output_path_rel) + cmd = ["python3", "%s/nokia-dumper/payload_dumper.py" % base_dir, self.abs_fn, abs_output_path] + logging.info("NokiaPayloadBinHandler.extract_file2dir(%r): cmd=%r" % (output_path_rel, cmd)) + subprocess.check_call(cmd) + + +class MountableImage(FileHandler): + def mount(self, mountpoint): + assert False, "Must be implemented in subclass" + + def umount(self): + assert False, "Must be implemented in subclass" + + def check(self) -> CheckFileResult: + raise NotImplementedError() + + +class ExtfsHandler(MountableImage): + mountpoint: Optional[bytes] + + def check(self) -> CheckFileResult: + if self.file_type.startswith("Linux rev 1.0 ext4 filesystem data") or self.file_type.startswith("Linux rev 1.0 ext2 filesystem data"): + if self.fn.lower() in (b'system.new.dat', b'vendor.new.dat'): + # These files are not the actual filesystem and need to be assembled based on system.transfer.list/vendor.transfer.list + return CheckFileResult.HANDLER_NO_MATCH + if self.image_type == ImageType.SYSTEM: + return CheckFileResult.SYSTEM_IMG + elif self.image_type == ImageType.VENDOR: + return CheckFileResult.VENDOR_IMG + # system_1.img is a potential false positive, so let's enforce filenames starting with "system." + if self.fn.lower().startswith(b"system.") or self.fn.lower().startswith(b"system_a.") or self.fn.lower().startswith(b"system_b."): + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + elif self.fn.lower().endswith(b'stock_system_image.img') and os.stat(self.abs_fn).st_size > 1024**3: + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + elif self.fn.lower().endswith(b'system_raw.img') and os.stat(self.abs_fn).st_size > 256*1024**2: + self.image_type = ImageType.SYSTEM + return CheckFileResult.SYSTEM_IMG + elif self.fn.lower().startswith(b"vendor.") or self.fn.lower().startswith(b"vendor_a.") or self.fn.lower().startswith(b"vendor_b."): + self.image_type = ImageType.VENDOR + return CheckFileResult.VENDOR_IMG + else: + logging.warning("ExtfsHandler: %r does not start with system/vendor" % self.abs_fn) + return CheckFileResult.HANDLER_NO_MATCH + else: + return CheckFileResult.HANDLER_NO_MATCH + + def mount(self, mountpoint): + mountpoint = self.extractor.abs_fn(mountpoint) + assert not hasattr(self, "mountpoint") or self.mountpoint is None, "ExtfsHandler: Can only mount once" + assert os.path.exists(mountpoint), "Mountpoint %r doesn't exist" % mountpoint + assert os.path.isdir(mountpoint), "Mountpoint %r is not a directory" % mountpoint + # Increase size when required + target_size = 0 + dumpe2fs_cmd = ["dumpe2fs", "-h", self.abs_fn] + logging.info("ExtfsHandler.mount(): dumpe2fs_cmd=%r" % dumpe2fs_cmd) + for line in subprocess.Popen(dumpe2fs_cmd, stdout=subprocess.PIPE).communicate()[0].splitlines(): + m = re.match(r'Block count:\s*(\d+)', line.decode("ASCII")) + if m: + target_size = 4096 * int(m.group(1)) + + logging.debug("TARGET SIZE: %d", target_size) + logging.debug("ACTUAL SIZE: %d", os.stat(self.abs_fn).st_size) + + if target_size > os.stat(self.abs_fn).st_size: + logging.debug("Increasing filesystem size to %d bytes (%.3fGB)", + target_size, target_size / (1024.0 * 1024 * 1024)) + fh = open(self.abs_fn, "rb+") + fh.seek(target_size - 1) + fh.write(b'\0') + fh.close() + check_cmd = ["e2fsck", "-y", "-f", self.abs_fn] + logging.info("ExtfsHandler.mount(): check_cmd=%r" % check_cmd) + retcode = subprocess.call(check_cmd) + # 0: No errors + # 1/2: Errors fixed + # 8: Operational errors, e.g. new FEATURE_R14 for Android 10 images + assert retcode in (0, 1, 2, 8), "Failed to check/fix filesystem, e2fsck returned %d" % retcode + self.mountpoint = mountpoint + # Some Android10 images can only be mounted read-only due to new filesystem features + mount_cmd = ["mount", "-o", "loop,ro", self.abs_fn, mountpoint] + logging.info("ExtfsHandler.mount(): mount_cmd=%r" % mount_cmd) + subprocess.check_call(mount_cmd) + + def umount(self): + cmd = ["umount", self.mountpoint] + logging.info("MountableImage.umount: cmd=%r" % cmd) + self.mountpoint = None + subprocess.check_call(cmd) + + +class FilesystemExtractor(FileHandler): + def check(self) -> CheckFileResult: + raise NotImplementedError() + + def extract_filesystem(self, output_dir): + raise NotImplementedError("Must be implemented in subclass") + + +class ErofsHandler(FilesystemExtractor): + def check(self) -> CheckFileResult: + with open(self.abs_fn, 'rb') as f: + f.seek(0x400) + buf = f.read(4) + if buf == b'\xe2\xe1\xf5\xe0': + if self.image_type == ImageType.SYSTEM: + return CheckFileResult.SYSTEM_IMG + elif self.image_type == ImageType.VENDOR: + return CheckFileResult.VENDOR_IMG + else: + raise ValueError("ErofsHandler: Detected EROFS filesystem but self.image_type is not ImageType.SYSTEM or ImageType.VENDOR") + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_filesystem(self, output_dir): + global base_dir + erofs_tool = os.path.join(base_dir, "erofs_tool.py") + subprocess.check_call([erofs_tool, "extract", "--verify-zip", self.abs_fn, output_dir]) + + +class CpbHandler(FileHandler): + def check(self) -> CheckFileResult: + ext = self.fn.split(b'.')[-1].lower() + if ext != b'cpb': + return CheckFileResult.HANDLER_NO_MATCH + with open(self.abs_fn, 'rb') as f: + buf = f.read(4) + if buf == b'CP\x03\x03': + return CheckFileResult.ARCHIVE + else: + return CheckFileResult.HANDLER_NO_MATCH + + def extract_file2dir(self, output_path_rel): + raise NotImplementedError("TODO: Implement CPB files, see https://github.com/scue/unpackcpb/blob/master/unpackcpb.c") + + +class BootImageHandler(FileHandler): + def check(self) -> CheckFileResult: + if self.fn.lower() == b'boot.img': + # Some boot/recovery images have type 'data', e.g. for ryo + # assert self.file_type.lower().startswith("android bootimg") + return CheckFileResult.BOOT_IMG + elif self.fn.lower() == b'boot.img.lz4': + return CheckFileResult.BOOT_IMG + elif self.fn.lower() == b'boot.img.p': + # Some kind of binary patch. ignored for now + return CheckFileResult.IGNORE + elif self.fn.lower().startswith(b'boot.img'): + assert False, "Potential boot image: %r (file_type=%r)" % (self.abs_fn, self.file_type) + return CheckFileResult.HANDLER_NO_MATCH + + def write_image(self, f): + if self.fn.lower() == b'boot.img': + f.write(open(self.abs_fn, 'rb').read()) + elif self.fn.lower() == b'boot.img.lz4': + f.write(subprocess.check_output(["lz4cat", self.abs_fn])) + + +class RecoveryImageHandler(FileHandler): + def check(self) -> CheckFileResult: + if self.fn.lower() == b'recovery.img': + # Some boot/recovery images have type 'data', e.g. for ryo + # assert self.file_type.lower().startswith("android bootimg") + return CheckFileResult.RECOVERY_IMG + elif self.fn.lower() == b'recovery.img.lz4': + return CheckFileResult.BOOT_IMG + elif self.fn.lower() == b'recovery.img.p': + # Some kind of binary patch. ignored for now + return CheckFileResult.IGNORE + elif self.fn.lower().startswith(b'recovery.img'): + assert False, "Potential recovery image: %r (file_type=%r)" % (self.abs_fn, self.file_type) + return CheckFileResult.HANDLER_NO_MATCH + + def write_image(self, f): + if self.fn.lower() == b'recovery.img': + f.write(open(self.abs_fn, 'rb').read()) + elif self.fn.lower() == b'recovery.img.lz4': + f.write(subprocess.check_output(["lz4cat", self.abs_fn])) + + +class MultipleHandlerMatchError(Exception): + pass + + +class NoHandlerMatchError(Exception): + pass + + +class ArchiveDirectoryHandler: + def __init__(self, extractor, input_path_rel): + self.extractor = extractor + self.input_path_rel = input_path_rel + self.abs_dir = self.extractor.abs_fn(input_path_rel) + + def get_next_handlers(self): + # Pass 0: Check if the directory contains the unpacked system partition already + if os.path.exists(os.path.join(self.abs_dir, b'system', b'build.prop')): + filelist = [ + b'system/bin/audioserver', + b'system/lib/libstagefright.so', + b'system/lib64/libstagefright.so', + b'system/bin/sh', + b'system/framework/wifi-service.jar', + b'system/lib/libssl.so', + b'system/framework/services.jar', + b'system/framework/telephony-common.jar' + ] + found_list = [] + for fn in filelist: + if os.path.exists(os.path.join(self.abs_dir, fn)): + found_list.append(fn) + if len(found_list) > 0: # Some archives just contain system/build.prop but nothing else, so let's continue to normal extraction in these cases + assert len(found_list) >= 3, "Only %d items of system partition found: %r" % (len(found_list), found_list) + result = [SystemDirectoryHandler(self.extractor, os.path.join(self.input_path_rel, b'system'))] + # Also allow boot.img/recovery.img + for dirpath, dirnames, filenames in os.walk(self.abs_dir): + for file in filenames: + # We are only looking for boot images, so no need to look into system => Significant performance improvement + if b'system' in dirnames: + dirnames.remove(b'system') + abs_fn = os.path.join(self.abs_dir, dirpath, file) + rel_path = self.extractor.rel_path(os.path.join(dirpath, file)) + file_type = get_file_type(abs_fn) + for handler_type in [BootImageHandler, RecoveryImageHandler]: + handler = handler_type(self.extractor, rel_path, file_type) + if handler.check() in (CheckFileResult.BOOT_IMG, CheckFileResult.RECOVERY_IMG): + result.append(handler) + return result + # Pass 0: Do rawprogram_XXX.xml, return if acceptable + handlers_found_pass0: List[RawprogramUnsparseHandler] = list() + total_handled_size = 0 + total_ignored_size = 0 + total_size = 0 + for dirpath, _dirnames, filenames in os.walk(self.abs_dir): + for file in filenames: + abs_fn = os.path.join(self.abs_dir, dirpath, file) + rel_path = self.extractor.rel_path(os.path.join(dirpath, file)) + if os.path.isfile(abs_fn): + sr = os.stat(abs_fn) + total_size += sr.st_size + if file.lower().endswith(b'.xml'): + file_type = get_file_type(abs_fn) + handler_pass0 = RawprogramUnsparseHandler(self.extractor, rel_path, file_type) + handler_result = handler_pass0.check() + if handler_result not in (CheckFileResult.HANDLER_NO_MATCH, CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE, CheckFileResult.IGNORE): + assert handler_result == CheckFileResult.SYSTEM_OR_VENDOR, "Bad handler_result %r for RawprogramUnsparseHandler" % handler_result + handlers_found_pass0.append(handler_pass0) + total_handled_size += sr.st_size + total_ignored_size += handler_pass0.extra_ignored_size + total_handled_size += handler_pass0.get_extra_handled_size() + elif file.lower().endswith(b".elf") or file.lower().endswith(b".mbn"): + total_ignored_size += sr.st_size + if len(handlers_found_pass0) == 1: + if total_handled_size + total_ignored_size > 0.8 * total_size - 100e6: + return handlers_found_pass0 + elif handlers_found_pass0[0].has_vendor(): + return handlers_found_pass0 + else: + raise ValueError("RawprogramUnsparseHandler doesn't handle enough, total_handled_size=%.2fMiB total_size=%.2fMiB" % (total_handled_size/1024**2, total_size/1024**2)) + elif len(handlers_found_pass0) > 1: + raise MultipleHandlerMatchError("Multiple RawprogramUnsparseHandler found: %r!" % [x.abs_fn for x in handlers_found_pass0]) + # Pass 1: Find image handlers, accept solution and return if 95% of the size is accounted for (ignored, system/vendor img, boot/recovery img + total_size = 0 + ignored_size = 0 # Files intentionally ignored + unmatched_size = 0 # Files not matched by any handler + handled_size = 0 + extra_handled_size = 0 # Additional files handled by matching handler + HANDLER_LIST_PASS1 = [ + ExtfsHandler, + ErofsHandler, + SparseImageHandler, + SignImgHandler, + TransferListHandler, + BrotliHandler, + Lz4Handler, + IgnoreBadTarMd5Handler, + IgnoreRadioHandler, + IgnoreBootloaderHandler, + IgnoreOpImageHandler, + IgnoreOemImgHandler, + IgnoreElfHandler, + IgnoreVmlinuxHandler, + BootImageHandler, + RecoveryImageHandler, + PacHandler, + IgnoreAppsImgHandler + ] + handlers_found_pass1: List[FileHandler] = list() + ignore_size_coverage: bool = False + found_system_img: bool = False + found_vendor_img: bool = False + ignored_archive_size: int = 0 + for dirpath, dirnames, filenames in os.walk(self.abs_dir): + for file in filenames: + abs_fn = os.path.join(self.abs_dir, dirpath, file) + if os.path.islink(abs_fn): + continue + ext = file.split(b".")[-1] + rel_path = self.extractor.rel_path(os.path.join(dirpath, file)) + assert os.path.exists(abs_fn), "File %r doesn't exist" % abs_fn + if os.path.isfile(abs_fn): + sr = os.stat(abs_fn) + total_size += sr.st_size + handler_result_to_handlers: DefaultDict[CheckFileResult, List[FileHandler]] = defaultdict(list) + for handler_type in HANDLER_LIST_PASS1: + file_type = get_file_type(abs_fn) + # logging.info("HANDLER_TYPE: %r rel_path=%r" % (handler_type, rel_path)) + handler = handler_type(self.extractor, rel_path, file_type) + handler_result = handler.check() + if handler_result == CheckFileResult.HANDLER_NO_MATCH: + pass # Handler doesn't match, ignore it + elif handler_result == CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE: + ignore_size_coverage = True + else: + handler_result_to_handlers[handler_result].append(handler) + if len(handler_result_to_handlers) > 1: + logging.error("Multiple handler results for %r" % abs_fn) + for (handler_result, handlers) in handler_result_to_handlers.items(): + logging.error("%r => %r" % (handler_result, [type(x) for x in handlers])) + raise MultipleHandlerMatchError() + elif len(handler_result_to_handlers) == 1: + handler_result: CheckFileResult = list(handler_result_to_handlers.keys())[0] + handlers: List[FileHandler] = handler_result_to_handlers[handler_result] + if handler_result == CheckFileResult.IGNORE: + # Allow multiple handlers for result IGNORE + logging.info("Ignoring file %r due to %r" % (abs_fn, [type(x) for x in handlers])) + ignored_size += sr.st_size + elif len(handlers) > 1: + logging.error("Multiple handlers for %r => %r: %r" % (abs_fn, handler_result, [type(x) for x in handlers])) + raise MultipleHandlerMatchError() + else: + handler: FileHandler = handlers[0] + logging.info("Selected handler %s for %r" % (handler.__class__.__name__, abs_fn)) + handled_size += sr.st_size + extra_handled_size += handler.get_extra_handled_size() + handlers_found_pass1.append(handler) + if handler_result == CheckFileResult.SYSTEM_IMG: + found_system_img = True + elif handler_result == CheckFileResult.VENDOR_IMG: + found_vendor_img = True + else: + logging.info("Ignoring file %r since no handler matches" % abs_fn) + if ext.lower() in (b"tar", b"zip", b"rar") or abs_fn.endswith(b".tar.gz"): + ignored_archive_size += sr.st_size + logging.info("ignored_archive_size += %d => %d (file %r)" % (sr.st_size, ignored_archive_size, abs_fn)) + unmatched_size += sr.st_size + # Check if Pass 1 solution can be accepted + total_handled_size = (handled_size + extra_handled_size) + total_unmatched_size = unmatched_size - extra_handled_size + found_pac = False + for handler in handlers_found_pass1: + if isinstance(handler, PacHandler): + found_pac = True + logging.info("PASS1: total_handled_size=%r total_unmatched_size=%r ignored_archive_size=%r found_system_img=%r found_vendor_img=%r", total_handled_size, total_unmatched_size, ignored_archive_size, found_system_img, found_vendor_img) + if total_handled_size >= 0.85 * (handled_size + total_unmatched_size) or (total_handled_size > 0 and ignore_size_coverage): + return handlers_found_pass1 + elif found_system_img and found_vendor_img and total_handled_size > 0.85 * (handled_size + total_unmatched_size - ignored_archive_size): + # Some firmwares contain a second copy of the firmware within an archive (tar/tar.gz/...). + # If we have a system/vendor image, we can check if 85% of the total size is covered while ignoring + # additional archives. + return handlers_found_pass1 + elif found_pac and total_handled_size > 0.85 * (handled_size + total_unmatched_size - ignored_archive_size): + # Some firmwares contain a second copy of the firmware within an archive (tar/tar.gz/...). + # If we have a PAC image, we can check if 85% of the total size is covered while ignoring + # additional archives. + return handlers_found_pass1 + elif total_handled_size >= 0.1 * (handled_size + total_unmatched_size): + logging.warning("ArchiveDirectoryHandler.get_handlers(): Rejecting pass 1 with covered percentage %.2f%%" % (100.0 * total_handled_size / (handled_size + total_unmatched_size))) + logging.info("ArchiveDirectoryHandler.get_handlers(): Going to pass 2") + # Pass 2: Find biggest file, check if is an archive file and it is at least 90% of total size + # Handle boot/recovery images and intentionally ignore unwanted files + HANDLER_LIST_PASS2 = [ + IgnoreBadTarMd5Handler, + IgnoreRadioHandler, + IgnoreBootloaderHandler, + IgnoreOpImageHandler, + IgnoreOemImgHandler, + IgnoreUpdateHwHandler, # Only for Pass 2 + IgnoreHuaweiUserdataAppHandler, + IgnoreElfHandler, + BootImageHandler, + RecoveryImageHandler + ] + # Hanlder list for the biggest file only + # Will only be used if the biggest file reaches a certain percentage of the total + # size (excluding boot/recovery image and intentionally ignored files) + # Contains all kind of archive handlers + HANDLER_LIST_PASS2_BIGGEST_FILE = [ + ZipHandler, + TarHandler, + SinHandler, + PacHandler, + OzipHandler, + HuaweiAppHandler, + DzHandler, + NokiaPayloadBinHandler, + CpbHandler, + SuperImageHandler + ] + # Find biggest file + total_size = 0 + unmatched_size = 0 # Files not matched by any handler + handled_size = 0 + ignored_size = 0 + # ignore_size_coverage = False + biggest_file_size = 0 + biggest_file_abs = None + biggest_file_rel = None + handlers_found_pass2: List[FileHandler] = [] + for dirpath, dirnames, filenames in os.walk(self.abs_dir): + for file in filenames: + abs_fn = os.path.join(self.abs_dir, dirpath, file) + if os.path.islink(abs_fn): + continue + rel_path = self.extractor.rel_path(os.path.join(dirpath, file)) + assert os.path.exists(abs_fn), "File %r doesn't exist" % abs_fn + if os.path.isfile(abs_fn): + sr = os.stat(abs_fn) + total_size += sr.st_size + # Find biggest file + if sr.st_size > biggest_file_size: + biggest_file_size = sr.st_size + biggest_file_abs = abs_fn + biggest_file_rel = rel_path + handler_result_to_handlers = defaultdict(list) + for handler_type in HANDLER_LIST_PASS2: + file_type = get_file_type(abs_fn) + handler = handler_type(self.extractor, rel_path, file_type) + handler_result = handler.check() + if handler_result == CheckFileResult.HANDLER_NO_MATCH: + pass # Handler doesn't match, ignore it + elif handler_result == CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE: + # ignore_size_coverage = True + pass + else: + handler_result_to_handlers[handler_result].append(handler) + if len(handler_result_to_handlers) > 1: + logging.error("PASS2: Multiple handler results for %r" % abs_fn) + for (handler_result, handlers) in handler_result_to_handlers.items(): + logging.error("%r => %r" % (handler_result, [type(x) for x in handlers])) + raise MultipleHandlerMatchError() + elif len(handler_result_to_handlers) == 1: + handler_result = list(handler_result_to_handlers.keys())[0] + handlers = handler_result_to_handlers[handler_result] + if handler_result == CheckFileResult.IGNORE: + # Allow multiple handlers for result IGNORE + logging.info("PASS2: Ignoring file %r due to %r" % (abs_fn, [type(x) for x in handlers])) + ignored_size += sr.st_size + elif len(handlers) > 1: + logging.error("PASS2: Multiple handlers for %r => %r: %r" % (abs_fn, handler_result, [type(x) for x in handlers])) + raise MultipleHandlerMatchError() + else: + handler = handlers[0] + logging.info("PASS2: Selected handler %s for %r" % (handler.__class__.__name__, abs_fn)) + handled_size += sr.st_size + extra_handled_size += handler.get_extra_handled_size() + handlers_found_pass2.append(handler) + else: + logging.info("PASS2: Ignoring file %r since no handler matches" % abs_fn) + unmatched_size += sr.st_size + total_uncovered_size = total_size - ignored_size - handled_size # ignroed_size is from pass1 + logging.info("ArchiveDirectoryHandler.get_handlers(): PASS2: Biggest file: %.3fMiB/%.3fMiB (%.2f%%): %r" % (biggest_file_size / (1024 * 1024), total_uncovered_size / (1024 * 1024), 100.0 * biggest_file_size / total_uncovered_size, biggest_file_abs)) + sr = os.stat(biggest_file_abs) + if sr.st_size > 0.9 * total_uncovered_size: + handler_result_to_handlers = defaultdict(list) + for handler_type in HANDLER_LIST_PASS2_BIGGEST_FILE: + file_type = get_file_type(biggest_file_abs) + handler = handler_type(self.extractor, biggest_file_rel, file_type) + handler_result = handler.check() + if handler_result != CheckFileResult.HANDLER_NO_MATCH: + handler_result_to_handlers[handler_result].append(handler) + if len(handler_result_to_handlers) > 1: + logging.error("Multiple handler results for %r" % biggest_file_abs) + for (handler_result, handlers) in handler_result_to_handlers.items(): + logging.error("%r => %r" % (handler_result, [type(x) for x in handlers])) + raise MultipleHandlerMatchError() + elif len(handler_result_to_handlers) == 1: + handler_result = list(handler_result_to_handlers.keys())[0] + handlers = handler_result_to_handlers[handler_result] + if handler_result == CheckFileResult.IGNORE: + raise NoHandlerMatchError("Biggest file (>90%%) is IGNORED: %r" % biggest_file_abs) + elif len(handlers) > 1: + logging.error("Multiple handlers for %r => %r: %r" % (biggest_file_abs, handler_result, [type(x) for x in handlers])) + raise MultipleHandlerMatchError() + else: + handler = handlers[0] + logging.info("Selected handler %r for %r" % (type(handler), biggest_file_abs)) + # handlers_found_pass2 may contain boot/recovery handler + return handlers_found_pass2 + [handler] + else: + logging.info("Ignoring biggest file file %r since no handler matches" % biggest_file_abs) + unmatched_size += sr.st_size + # Still here? => Don't know what to do, just list biggest files for now + path2size = {} + total_size = 0 + for dirpath, dirnames, filenames in os.walk(self.abs_dir): + for file in filenames: + abs_fn = os.path.join(self.abs_dir, dirpath, file) + if os.path.islink(abs_fn): + continue + assert os.path.exists(abs_fn), "File %r doesn't exist" % abs_fn + if os.path.isfile(abs_fn): + sr = os.stat(abs_fn) + path2size[os.path.join(dirpath, file)] = sr.st_size + total_size += sr.st_size + logging.error("ArchiveDirectoryHandler.get_handlers(): Don't know what to do. Biggest files (sorted by size):") + for path in sorted(path2size.keys(), key=lambda tmp_path: -path2size[tmp_path]): + logging.error(" %.3fMiB: %s" % (path2size[path] / 1024.0 / 1024.0, path.decode(errors='ignore'))) + raise ValueError("ArchiveDirectoryHandler.get_handlers(): Don't know what to do.") + + +class SystemDirectoryHandler: + def __init__(self, extractor, system_dir_rel): + self.extractor = extractor + self.system_dir_rel = system_dir_rel + self.system_dir_abs = self.extractor.abs_fn(system_dir_rel) + + def get_system_dir(self): + return self.system_dir_abs + + +class TopLevelFileHandler: + def __init__(self, extractor, input_path_rel, image_type=None, top_level_file=False): + self.extractor = extractor + self.input_path_rel = input_path_rel + self.abs_fn = self.extractor.abs_fn(input_path_rel) + self.image_type = image_type + self.top_level_file: bool = top_level_file + + def get_next_handler(self): + handler_list = [ + TopLevelZipHandler, + TarHandler, + GzipHandler, + Bzip2Handler, + XzHandler, + PacHandler, + OzipHandler, + SevenZipHandler, + RarHandler, + KdzHandler, + DzHandler, + ExtfsHandler, + ErofsHandler, + CpbHandler + ] + handlers_found = [] + for handler_type in handler_list: + handler = handler_type(self.extractor, self.input_path_rel, file_type=get_file_type(self.abs_fn)) + check_result = handler.check() + if check_result == CheckFileResult.HANDLER_NO_MATCH: + continue + handlers_found.append(handler) + if len(handlers_found) == 0: + raise NoHandlerMatchError("No handler for %r (file_type=%r)" % (self.abs_fn, get_file_type(self.abs_fn))) + if len(handlers_found) > 1: + logging.error("Multiple handlers for %r: %r" % (self.input_path_rel, [type(x) for x in handlers_found])) + raise MultipleHandlerMatchError() + return handlers_found[0] + + +class QueueItem: + def __init__(self, handler, handler_name, stage_dir=None, handler_check_result=None): + self.handler = handler + self.handler_name = handler_name + self.handler_check_result = handler_check_result + self.stage_dir = stage_dir + + +class FirmwareExtractor: + def __init__(self, firmware_file_or_dir): + firmware_file_or_dir = os.path.abspath(firmware_file_or_dir) + if isinstance(firmware_file_or_dir, str): + firmware_file_or_dir = firmware_file_or_dir.encode() + self.firmware_file_or_dir = firmware_file_or_dir + self.tmpdir: bytes = tempfile.mkdtemp(prefix="ANDROID_EXTRACT_").encode() + logging.info("tmpdir=%r" % self.tmpdir) + self.stage_num: int = 0 + self.mounted_handlers = [] + self.system_handler = None + self.vendor_handler = None + self.boot_image_handler = None + self.recovery_image_handler = None + + def extract(self, output_system_tar=None, output_system_dir=None, make_world_readable=True, output_boot_img_path=None, output_recovery_img_path=None, allow_missing_vendor=False): + if output_system_dir is not None and isinstance(output_system_dir, str): + output_system_dir = output_system_dir.encode() + stage_queue = deque() + if os.path.isdir(self.firmware_file_or_dir): + handler_initial = ArchiveDirectoryHandler(self, self.firmware_file_or_dir) + stage_queue.append(QueueItem(handler=handler_initial, handler_name="handler_initial")) + else: + assert os.path.isfile(self.firmware_file_or_dir) + handler_initial = TopLevelFileHandler(self, None) + stage_dir = self.get_stage_dir("UnknownFileHandler") + stage_queue.append(QueueItem(handler=handler_initial, handler_name="handler_initial", stage_dir=stage_dir)) + try: + while len(stage_queue) > 0: + queue_item = stage_queue.popleft() + handler = queue_item.handler + # self.log_extraction_step("abs_stage_dir = self.create_stage_dir(%r)" % queue_item.stage_dir) + if hasattr(handler, "extract_file2dir"): + self.create_stage_dir(queue_item.stage_dir) + self.log_extraction_step("self.create_stage_dir(%r)" % queue_item.stage_dir) + self.log_extraction_step("%s.extract_file2dir(%r)" % (queue_item.handler_name, queue_item.stage_dir)) + # assert False, abs_stage_dir + handler.extract_file2dir(queue_item.stage_dir) + next_handler = ArchiveDirectoryHandler(self, queue_item.stage_dir) + next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__) + next_handler_name = "handler_%s" % next_stage_dir.decode() + self.log_extraction_step("%s = ArchiveDirectoryHandler(self, %r)" % (next_handler_name, queue_item.stage_dir)) + next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir) + stage_queue.append(next_queue_item) + elif hasattr(handler, "get_next_handler"): + next_handler = handler.get_next_handler() + next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__) + next_handler_name = "handler_%s" % next_stage_dir.decode() + self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name)) + next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir) + stage_queue.append(next_queue_item) + elif hasattr(handler, "get_next_handlers"): + next_handlers = handler.get_next_handlers() + for next_handler in next_handlers: + next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__) + next_handler_name = "handler_%s" % next_stage_dir.decode() + # TODO: Log + # self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name)) + next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir) + stage_queue.append(next_queue_item) + elif hasattr(handler, "extract_and_get_next_handlers"): + next_handlers = handler.extract_and_get_next_handlers(queue_item.stage_dir) + for next_handler in next_handlers: + next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__) + next_handler_name = "handler_%s" % next_stage_dir.decode() + # TODO: Log + # self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name)) + next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir) + stage_queue.append(next_queue_item) + elif hasattr(handler, "extract_and_get_next_handler"): + next_handler = handler.extract_and_get_next_handler(queue_item.stage_dir) + next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__) + next_handler_name = "handler_%s" % next_stage_dir.decode() + self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name)) + next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir) + stage_queue.append(next_queue_item) + elif isinstance(handler, MountableImage) or isinstance(handler, FilesystemExtractor): + assert handler.image_type in (ImageType.SYSTEM, ImageType.VENDOR), "Bad handler.image_type %r for %r" % (handler.image_type, handler.__class__.__name__) + if handler.image_type == ImageType.SYSTEM: + assert self.system_handler is None, "Duplicate system_handler: %s:%r <=> %s:%s" % (self.system_handler.__class__.__name__, self.system_handler.abs_fn, handler.__class__.__name__, handler.abs_fn) + self.system_handler = handler + logging.info("Found system handler") + elif handler.image_type == ImageType.VENDOR: + if self.vendor_handler is not None: + logging.error("OLD: %s => %s", self.vendor_handler.__class__.__name__, self.vendor_handler.abs_fn) + subprocess.call(["file", self.vendor_handler.abs_fn]) + logging.error("NEW: %s => %s", handler.__class__.__name__, handler.abs_fn) + subprocess.call(["file", handler.abs_fn]) + assert self.vendor_handler is None, "Duplicate vendor_handler: %s:%r <=> %s:%s" % (self.vendor_handler.__class__.__name__, self.vendor_handler.abs_fn, handler.__class__.__name__, handler.abs_fn) + self.vendor_handler = handler + logging.info("Found vendor handler") + elif isinstance(handler, SystemDirectoryHandler): + assert self.system_handler is None + logging.info("Found system handler via SystemDirectoryHandler") + self.system_handler = handler + elif isinstance(handler, BootImageHandler): + assert self.boot_image_handler is None + self.boot_image_handler = handler + elif isinstance(handler, RecoveryImageHandler): + assert self.recovery_image_handler is None + self.recovery_image_handler = handler + else: + raise ValueError("Don't know what to do with handler %r" % handler.__class__.__name__) + logging.info("Finished Queue") + if self.system_handler is None: + logging.error("No system_handler afer finishing queue") + raise ValueError("No system_handler afer finishing queue") + if output_system_dir is None: + output_system_dir = self.create_stage_dir("system") + else: + assert output_system_tar is None, "Can only generate output_system_dir or output_system_tar" + if not output_system_dir.endswith(b'/'): + output_system_dir += b'/' + if isinstance(self.system_handler, MountableImage): + system_mountpoint = self.create_stage_dir("system_mnt") + self.system_handler.mount("system_mnt") + self.mounted_handlers.append(self.system_handler) + mounted_system_dir = system_mountpoint + # Some images have the root filesystem in the "system" partition, with /system/ just being a directory within the filesystem. + if not os.path.exists(os.path.join(mounted_system_dir, b'build.prop')): + if os.path.exists(os.path.join(mounted_system_dir, b'system', b'build.prop')): + mounted_system_dir = os.path.join(mounted_system_dir, b'system') + assert os.path.exists(os.path.join(mounted_system_dir, b'build.prop')), "Could not find build.prop in system partition" + # Append slash for correct rsync operation + if not mounted_system_dir.endswith(b'/'): + mounted_system_dir += b'/' + cmd = ["rsync", "-a", mounted_system_dir, output_system_dir] + logging.info("FirmwareExtractor.extract(): system rsync cmd: %r" % cmd) + subprocess.check_call(cmd) + elif isinstance(self.system_handler, FilesystemExtractor): + self.system_handler.extract_filesystem(output_system_dir) + # Sometimes the extracted system.img contains "system/" as a directory, not in the root of the filesystem + if (not os.path.exists(os.path.join(output_system_dir, b"build.prop"))) and \ + os.path.isdir(os.path.join(output_system_dir, b"system")) and \ + os.path.exists(os.path.join(output_system_dir, b"system", b"build.prop")): + os.mkdir(os.path.join(output_system_dir, b"system", b"rootfs")) + for fn in os.listdir(output_system_dir): + if fn == b'system': + continue + os.rename(os.path.join(output_system_dir, fn), os.path.join(output_system_dir, b"system", b"rootfs", fn)) + os.rename(os.path.join(output_system_dir, b"system"), os.path.join(output_system_dir, b"system.tmp")) + for fn in os.listdir(os.path.join(output_system_dir, b"system.tmp")): + os.rename(os.path.join(output_system_dir, b"system.tmp", fn), os.path.join(output_system_dir, fn)) + elif isinstance(self.system_handler, SystemDirectoryHandler): + system_dir_src = self.system_handler.get_system_dir() + # Append slash for correct rsync operation + if not system_dir_src.endswith(b'/'): + system_dir_src += b'/' + cmd = ["rsync", "-a", system_dir_src, output_system_dir] + logging.info("FirmwareExtractor.extract(): system rsync cmd: %r" % cmd) + subprocess.check_call(cmd) + else: + assert False, "Don't know what to do with self.system_handler type %s" % self.system_handler.__class__.__name__ + output_vendor_dir = os.path.join(output_system_dir, b"vendor") + if os.path.islink(output_vendor_dir): + if self.vendor_handler is not None: + os.unlink(output_vendor_dir) + else: + assert allow_missing_vendor, "System contains vendor symlink but we didn't find a vendor paritition!" + if os.path.isdir(output_vendor_dir): + vendor_dir_contents = os.listdir(output_vendor_dir) + if self.vendor_handler is not None: + assert len(vendor_dir_contents) == 0, "sytem/vendor directory not empty: %r" % vendor_dir_contents + else: + assert not os.path.exists(output_vendor_dir), "system/vendor is not a directory and not a symlink" + if self.vendor_handler is not None: + if not os.path.exists(output_vendor_dir): + os.mkdir(output_vendor_dir) + if isinstance(self.vendor_handler, MountableImage): + vendor_mountpoint = self.create_stage_dir("vendor_mnt") + self.vendor_handler.mount("vendor_mnt") + # Append slash for correct rsync operation + if not vendor_mountpoint.endswith(b'/'): + vendor_mountpoint += b'/' + if not output_vendor_dir.endswith(b'/'): + output_vendor_dir += b'/' + self.mounted_handlers.append(self.vendor_handler) + cmd = ["rsync", "-a", vendor_mountpoint, output_vendor_dir] + logging.info("FirmwareExtractor.extract(): vendor rsync cmd: %r" % cmd) + subprocess.check_call(cmd) + elif isinstance(self.vendor_handler, FilesystemExtractor): + self.vendor_handler.extract_filesystem(output_vendor_dir) + else: + assert False, "Don't know what to do with self.vendor_handler type %s" % self.vendor_handler.__class__.__name__ + if make_world_readable: + cmd = ["chmod", "-R", "a+r", output_system_dir] + logging.info("FirmwareExtractor.extract(): make readable cmd: %r" % cmd) + subprocess.check_call(cmd) + if output_system_tar is not None: + output_system_tar = os.path.abspath(output_system_tar) + cmd = ["tar", "cf", output_system_tar, "system/"] + cwd = os.path.dirname(os.path.dirname(output_system_dir)) # Double dirname since output_system_dir ends with trailing slash, ".../system/" + logging.info("FirmwareExtractor.extract(): system tar cmd: %r cwd=%r" % (cmd, cwd)) + subprocess.check_call(cmd, cwd=cwd) + if output_boot_img_path is not None and self.boot_image_handler is not None: + with open(output_boot_img_path, 'wb') as f: + self.boot_image_handler.write_image(f) + if output_recovery_img_path is not None and self.recovery_image_handler is not None: + with open(output_recovery_img_path, 'wb') as f: + self.recovery_image_handler.write_image(f) + finally: + self.cleanup() + + def cleanup(self): + assert b'ANDROID_EXTRACT_' in self.tmpdir + for handler in self.mounted_handlers: + # noinspection PyBroadException + try: + handler.umount() + except Exception: + logging.exception("Unmounting exception") + self.mounted_handlers = [] + if os.path.exists(self.tmpdir): + subprocess.call(["rm", "-rf", self.tmpdir]) + + # noinspection PyMethodMayBeStatic + def log_extraction_step(self, extraction_step): + logging.info("EXTRACTION_STEP: %s" % extraction_step) + + def get_stage_dir(self, stage_name): + result = ("stage_%d_%s" % (self.stage_num, stage_name)) + self.stage_num += 1 + return result.encode() + + def create_stage_dir(self, stage_dir): + if isinstance(stage_dir, str): + stage_dir = stage_dir.encode() + abs_dir = os.path.join(self.tmpdir, stage_dir) + os.mkdir(abs_dir) + return abs_dir + + def abs_fn(self, input_path_rel) -> bytes: + if input_path_rel is None: + return self.firmware_file_or_dir + if isinstance(input_path_rel, str): + input_path_rel = input_path_rel.encode() + assert isinstance(input_path_rel, bytes) + assert not input_path_rel.startswith(b'/') + return os.path.join(self.tmpdir, input_path_rel) + + def rel_path(self, abs_path): + assert isinstance(abs_path, bytes) + assert abs_path.startswith(b'/') + assert abs_path.startswith(self.tmpdir) + path = abs_path[len(self.tmpdir):] + while path.startswith(b'/'): + path = path[1:] + return path + + +def get_file_type(abs_fn): + file_output = subprocess.check_output(["file", "-"], stdin=open(abs_fn, 'rb')) + assert file_output.startswith(b"/dev/stdin:") + return file_output[len(b"/dev/stdin:"):].strip().decode() + + +if __name__ == "__main__": + main() diff --git a/init.sh b/init.sh new file mode 100755 index 0000000..d3745a1 --- /dev/null +++ b/init.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +set -e + +pushd () { + command pushd "$@" > /dev/null +} + +popd () { + command popd "$@" > /dev/null +} + +echo "*** Initializing Extractor build environment" + +# Init git submodule +git submodule update --init + +PROJECT_ROOT=`git rev-parse --show-toplevel` + +# Save current directory. +pushd . + +cd $PROJECT_ROOT/sinextract +make + +# Restore initial directory. +popd diff --git a/nokia-dumper b/nokia-dumper new file mode 160000 index 0000000..4d0c8a0 --- /dev/null +++ b/nokia-dumper @@ -0,0 +1 @@ +Subproject commit 4d0c8a027a137da6c5e4687bf70c5d0716fc84a9 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f539ead --- /dev/null +++ b/requirements.txt @@ -0,0 +1,19 @@ +certifi==2020.12.5 +cffi==1.14.3 +chardet==3.0.4 +construct==2.10.56 +crypto==1.4.1 +cryptography==3.2.1 +et-xmlfile==1.0.1 +idna==2.10 +jdcal==1.4.1 +lxml==4.6.2 +Naked==0.1.31 +protobuf==3.15.1 +pycryptodome==3.9.9 +pytlv==0.71 +PyYAML==5.3.1 +requests==2.25.0 +shellescape==3.8.1 +six==1.15.0 +urllib3==1.26.2 diff --git a/sinextract b/sinextract new file mode 160000 index 0000000..11d89e2 --- /dev/null +++ b/sinextract @@ -0,0 +1 @@ +Subproject commit 11d89e213312de6141028729e4f881d011a06227 diff --git a/splituapp b/splituapp new file mode 160000 index 0000000..ab9d3a3 --- /dev/null +++ b/splituapp @@ -0,0 +1 @@ +Subproject commit ab9d3a3651b64bd4baec768771df119badfb5f6c