extractor/extractor.py

2291 lines
117 KiB
Python
Executable File

#!/usr/bin/env python3
# This file is part of Extractor.
# Copyright (C) 2021 Security Research Labs GmbH
# SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import logging
import sys
from collections import defaultdict, deque
import re
import argparse
import tempfile
from enum import Enum, auto
from Crypto.Cipher import AES
import struct
# noinspection PyPep8Naming
import xml.etree.ElementTree as ET
import json
from typing import List, Optional, DefaultDict
import shutil
import shlex
from lxml import etree
import liblp
base_dir = os.path.dirname(os.path.realpath(__file__))
def main():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)-12s %(levelname)-8s: %(message)s')
if os.getuid() != 0:
logging.error("Not running as root, exiting")
sys.exit(1)
parser = argparse.ArgumentParser(description='Android firmware extraction tool')
parser.add_argument("input")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--tar-output", help="Path to system.tar file to generate")
group.add_argument("--system-dir-output", help="Path to store system dir, without intermediate tar file")
group.add_argument("--no-output", action="store_true", help="Only run extraction but ignore output")
parser.add_argument("--boot-recovery-output", help="Directory where boot/recovery img should be stored")
parser.add_argument("--allow-missing-vendor", action="store_true", help="Allow missing vendor partition for extraction, required for system-only updates (=> Project Treble), e.g. for some LineageOS images")
args = parser.parse_args()
extractor = FirmwareExtractor(args.input)
try:
output_boot_img_path = None
output_recovery_img_path = None
if args.boot_recovery_output is not None:
output_boot_img_path = os.path.join(os.path.abspath(args.boot_recovery_output), "boot.img")
output_recovery_img_path = os.path.join(os.path.abspath(args.boot_recovery_output), "recovery.img")
extractor.extract(output_system_tar=args.tar_output, output_system_dir=args.system_dir_output, output_boot_img_path=output_boot_img_path, output_recovery_img_path=output_recovery_img_path, allow_missing_vendor=args.allow_missing_vendor)
finally:
extractor.cleanup()
class CheckFileResult(Enum):
ARCHIVE = auto()
SYSTEM_IMG = auto()
VENDOR_IMG = auto()
BOOT_IMG = auto()
RECOVERY_IMG = auto()
SYSTEM_OR_VENDOR = auto()
HANDLER_NO_MATCH = auto()
HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE = auto()
IGNORE = auto()
class ImageType(Enum):
SYSTEM = auto()
VENDOR = auto()
class FileHandler:
def __init__(self, extractor: "FirmwareExtractor", input_path_rel, file_type, image_type: ImageType = None):
self.extractor: FirmwareExtractor = extractor
self.input_path_rel = input_path_rel
self.abs_fn = self.extractor.abs_fn(input_path_rel)
assert isinstance(self.abs_fn, bytes), "abs_fn must be of type bytes"
assert isinstance(file_type, str), "file_type must be of type str"
assert image_type in (None, ImageType.SYSTEM, ImageType.VENDOR), "Invalid image_type=%r" % image_type
self.fn = self.abs_fn.split(b'/')[-1]
self.file_type = file_type
self.image_type: ImageType = image_type
def check(self) -> CheckFileResult:
raise NotImplementedError("check() must be implemented in subclass (%s)" % self.__class__.__name__)
def get_extra_handled_size(self):
return 0
class ZipHandler(FileHandler):
"""
Generic Zip Handler, often used as top-level container format
"""
def check(self) -> CheckFileResult:
if not self.is_good_extension():
return CheckFileResult.HANDLER_NO_MATCH
if not (self.file_type.lower().startswith("zip") or self.file_type.lower().startswith("java archive data")):
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.ARCHIVE
def is_good_extension(self) -> bool:
if self.abs_fn.lower().endswith(b".zip"):
return True
if self.abs_fn.lower().endswith(b".ftf"):
# Sony ftf format
return True
if self.abs_fn.lower().endswith(b".ozip"):
# Oppo ozip, in some cases custom format (see OzipHandler), in other cases just a zip file
return True
if self.abs_fn.lower().endswith(b".up"):
# Some ZTE firmwares use ".up" for zip files
return True
return False
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["unzip", "-q", self.abs_fn]
logging.info("ZipHandler: cmd=%r cwd=%r" % (cmd, abs_output_path))
exitcode = subprocess.call(cmd, cwd=abs_output_path, stdin=subprocess.DEVNULL)
# 0: OK, 1: Finished with warnings
if exitcode in (0, 1):
return
logging.info("Extracting zip file with 'unzip' command failed (exit code %d), retrying with 'jar xf'", exitcode)
# unzip failed, clean up stage dir and try other extractor
assert b'/tmp/AND' in abs_output_path, "abs_output_path %r doesn't contain /tmp/AND" % abs_output_path
shutil.rmtree(abs_output_path)
os.mkdir(abs_output_path)
# Try jar as a second extractor, there is a known issue with unzip and large (>4GB) files:
# https://stackoverflow.com/a/31084012
cmd = ["jar", "xf", self.abs_fn]
logging.info("ZipHandler fallback to jar: cmd=%r cwd=%r" % (cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class TopLevelZipHandler(ZipHandler):
"""
Generic Zip Handler for top level format, also supports arbitrary file extensions, to be used for initial input file only
"""
def is_good_extension(self) -> bool:
return True
class SevenZipHandler(FileHandler):
"""
Generic 7z Handler, sometimes used as top-level container format
"""
def check(self) -> CheckFileResult:
good_extension = False
if self.abs_fn.lower().endswith(b".7z"):
good_extension = True
if not good_extension:
return CheckFileResult.HANDLER_NO_MATCH
if not self.file_type.lower().startswith("7-zip archive data"):
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["7z", "x", self.abs_fn]
logging.info("ZipHandler: cmd=%r cwd=%r" % (cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class RarHandler(FileHandler):
"""
Generic rar Handler, sometimes used as top-level container format
"""
def check(self) -> CheckFileResult:
# Disabled extension check, sometimes there is no .rar extension (but the file type should be reliable enough)
# good_extension = False
# if self.abs_fn.lower().endswith(b".rar"):
# good_extension = True
# if not good_extension:
# return CheckFileResult.HANDLER_NO_MATCH
if not self.file_type.lower().startswith("rar archive data"):
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["unrar", "x", "-psamdownloads.de", self.abs_fn]
logging.info("RarHandler: cmd=%r cwd=%r" % (cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class OzipHandler(FileHandler):
"""
OPPO encrypted image
"""
def check(self) -> CheckFileResult:
good_extension = False
if self.abs_fn.lower().endswith(b".ozip"):
good_extension = True
if not good_extension:
return CheckFileResult.HANDLER_NO_MATCH
magic = open(self.abs_fn, 'rb').read(12)
if magic != b'OPPOENCRYPT!':
if self.file_type.lower().startswith("java archive"):
# Some .ozip files are actually zip, e.g. CPH1809EX_11_OTA_0180_all_OlU3r4ImvcSX_local.ozip
return CheckFileResult.HANDLER_NO_MATCH
assert False, "Invalid ozip magic %r" % magic
return CheckFileResult.ARCHIVE
def extract_and_get_next_handler(self, stage_dir_rel):
# Replace .ozip with .zip
assert self.abs_fn.lower().endswith(b".ozip")
out_filename = os.path.basename(self.abs_fn)[0:-5] + b".zip"
out_path_rel = os.path.join(stage_dir_rel, out_filename)
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
out_path_abs = os.path.join(abs_stage_dir, out_filename)
ozip = AES.new(b'\xD6\xDC\xCF\x0A\xD5\xAC\xD4\xE0\x29\x2E\x52\x2D\xB7\xC1\x38\x1E', AES.MODE_ECB)
with open(self.abs_fn, 'rb') as ifs:
ifs.seek(0x1050, 0)
with open(out_path_abs, 'wb') as ofs:
while True:
data = ifs.read(16)
ofs.write(ozip.decrypt(data))
data = ifs.read(0x4000)
if len(data) == 0:
break
ofs.write(data)
return ZipHandler(self.extractor, out_path_rel, file_type=get_file_type(out_path_abs))
class PacHandler(FileHandler):
"""
Mediatek PAC image
"""
def check(self) -> CheckFileResult:
good_extension = False
if self.abs_fn.lower().endswith(b".pac"):
good_extension = True
if not good_extension:
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
sr = os.stat(self.abs_fn)
with open(self.abs_fn, 'rb') as pac:
buf = pac.read(1024 * 1024)
last_file_end = 0
for pos in range(60, 69721, 2580):
file_item = buf[pos:pos + 2580]
name = file_item[0:0x40].decode("utf-16").replace("\x00", "")
start_pos = struct.unpack("<I", file_item[0x40c:0x410])[0]
length = struct.unpack("<I", file_item[0x400:0x404])[0]
if start_pos == 0:
continue
assert start_pos >= last_file_end, "start_pos >= last_file_end: %d >= %x" % (start_pos, last_file_end)
if start_pos + length > sr.st_size:
continue
if name in ("system.img", "system_raw.img", "boot.img", "recovery.img"):
logging.debug("Doing %s", name)
with open(os.path.join(abs_output_path, name.encode()), 'wb') as out:
pac.seek(start_pos)
bytes_done = 0
while bytes_done < length:
chunk_len = min(1024 * 1024, length - bytes_done)
out.write(pac.read(chunk_len))
bytes_done += chunk_len
last_file_end = start_pos + length
class IgnoreBadTarMd5Handler(FileHandler):
"""
Ignore same non-wanted .tar.md5 files
"""
def check(self) -> CheckFileResult:
good_extension = False
if self.abs_fn.lower().endswith(b".tar.md5"): # Samsung
good_extension = True
if not good_extension:
return CheckFileResult.HANDLER_NO_MATCH
if 'POSIX tar archive' not in self.file_type:
return CheckFileResult.HANDLER_NO_MATCH
if self.fn.startswith(b'USERDATA_'):
# USERDATA_*.tar.md5 is present on some Samsung images, but it contains only useless stuff such as dalvik caches
# Needs to be ignored so that ArchiveDirectoryHandler can accept handling only other files
return CheckFileResult.IGNORE
if self.fn.startswith(b'CSC_') or self.fn.startswith(b'HOME_CSC_'):
# Needs to be ignored so that ArchiveDirectoryHandler can accept handling only other files
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class TarHandler(FileHandler):
"""
Handler for tar files (and Samsung .tar.md5 files, which are actually tar archives)
"""
def check(self) -> CheckFileResult:
if not self.is_good_extension():
return CheckFileResult.HANDLER_NO_MATCH
# Not every tar archive is a "POSIX" tar archive.
if 'POSIX tar archive' in self.file_type or self.file_type == 'tar archive':
if self.fn.startswith(b'USERDATA_') and self.fn.endswith(b'.tar.md5'):
# USERDATA_*.tar.md5 is present on some Samsung images, but it contains only useless stuff such as dalvik caches
# Needs to be ignored so that ArchiveDirectoryHandler can accept handling only other files
return CheckFileResult.IGNORE
return CheckFileResult.ARCHIVE
else:
return CheckFileResult.HANDLER_NO_MATCH
def is_good_extension(self) -> bool:
if self.abs_fn.lower().endswith(b".tar"):
return True
if self.abs_fn.lower().endswith(b".tar.md5"): # Samsung
return True
return False
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["tar", "xvf", self.abs_fn]
logging.info("TarHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class TarHandlerIgnoreExtension(TarHandler):
def is_good_extension(self) -> bool:
return True
class HuaweiAppHandler(FileHandler):
"""
Handler for Huawei .app images
"""
def check(self) -> CheckFileResult:
if not self.abs_fn.lower().endswith(b".app"):
return CheckFileResult.HANDLER_NO_MATCH
assert self.fn.lower().startswith(b'update')
# No file_type check, is typically "data"
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["%s/splituapp/splituapp" % base_dir, "-f", self.abs_fn, "-o", abs_output_path, "--no-crc"]
logging.info("HuaweiAppHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class KdzHandler(FileHandler):
"""
Handler for LG kdz format
"""
def check(self) -> CheckFileResult:
if not self.abs_fn.lower().endswith(b".kdz"):
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["%s/kdzextractor/unkdz.py" % base_dir, "-x", "-f", self.abs_fn]
logging.info("KdzHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class DzHandler(FileHandler):
"""
Handler for LG kdz format
"""
def check(self) -> CheckFileResult:
if not self.abs_fn.lower().endswith(b".dz"):
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["python", "%s/kdzextractor/undz.py" % base_dir, "-x", "-f", self.abs_fn]
logging.info("DzHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
# undz creates a number of system_<num>.bin files.
# <num> is the block number whete this file is in the final system image.
# The block size is variable, can be found based on the size of the first image
# and the offset of the second image.
# pdb.set_trace()
dzextracted_path = os.path.join(abs_output_path, b'dzextracted')
listing = os.listdir(dzextracted_path)
file_num_to_fn = dict()
for fn in listing:
if fn.startswith(b'vendor_') and fn.endswith(b'.bin'):
assert False, "Please implement DzHandler Vendor extraction"
if not fn.startswith(b'system_'):
continue
if not fn.endswith(b'.bin'):
continue
num = int(fn[7:-4].decode())
file_num_to_fn[num] = os.path.join(dzextracted_path, fn)
files_sorted = list(sorted(file_num_to_fn.keys()))
offset = files_sorted[0]
value = files_sorted[1] - files_sorted[0]
info = os.stat(os.path.join(output_path_rel, file_num_to_fn[files_sorted[0]]))
# noinspection PyUnusedLocal
bs: int
if (value * 512) >= info.st_size:
bs = 512
elif (value * 1024) >= info.st_size:
bs = 1024
elif (value * 2048) >= info.st_size:
bs = 2048
elif (value * 4096) >= info.st_size:
bs = 4096
else:
assert False, "Failed to find block size"
abs_system_img = os.path.join(abs_output_path, b'system.img')
with open(abs_system_img, 'wb') as out_fh:
for file_num in files_sorted:
file_name = file_num_to_fn[file_num]
pos = bs * (file_num - offset)
out_fh.seek(pos)
with open(file_name, 'rb') as in_fh:
while True:
buf = in_fh.read(1024 * 1024)
if len(buf) == 0:
break
out_fh.write(buf)
os.unlink(file_name) # Unlink is required so that the next ArchiveDirectoryHandler will not be confused by the low handled size
# logging.info("Please check the results once")
# pdb.set_trace()
class SinHandler(FileHandler):
"""
Handler for system.sin files (Sony)
"""
def check(self) -> CheckFileResult:
if self.fn.lower() == b'system.sin':
return CheckFileResult.ARCHIVE
elif self.fn.lower() == b'vendor.sin':
assert False, "TODO: Check and implement extraction of vendor.sin"
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_file2dir(self, output_path_rel):
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["%s/sinextract/sinextract" % base_dir, abs_output_path, self.abs_fn]
logging.info("SinHandler.extract_file2dir(%r): cmd=%r cwd=%r" % (output_path_rel, cmd, abs_output_path))
subprocess.check_call(cmd, cwd=abs_output_path)
class RawprogramUnsparseHandler(FileHandler):
system_parts_with_pos: List[List]
vendor_parts_with_pos: List[List]
super_parts_with_pos: List[List]
extra_ignored_size: int
def check(self) -> CheckFileResult:
self.system_parts_with_pos = []
self.vendor_parts_with_pos = []
self.super_parts_with_pos = []
self.extra_ignored_size = 0
if self.fn == b"contents.xml":
return self.check_contents_xml(self.abs_fn)
else:
xml_files_by_priority = (
b'rawprogram_unsparse.xml',
b'rawprogram0.xml',
b'rawprogram_unsparse(US).xml',
b'rawprogram0_unsparse.xml',
b'rawprogram_unsparse0.xml',
b'rawprogram0_unsparse_upgrade.xml',
b'rawprogram_upgrade.xml',
b'rawprogram_unsparse_upgrade.xml'
)
if self.fn not in xml_files_by_priority:
return CheckFileResult.HANDLER_NO_MATCH
input_dir_abs = os.path.dirname(self.abs_fn)
direct_system_img_path = os.path.join(input_dir_abs, b'system.img')
if os.path.exists(direct_system_img_path) and os.stat(direct_system_img_path).st_size > 128 * 1024 * 1024:
return CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE # Some images contain a system.img file directly and some non-working xml
# If contents.xml exists, it should be used and this handler should return HANDLER_NO_MATCH for all other
# xml files. However, in some cases contents.xml is broken and we need to continue based on
# xml_files_by_priority
content_xml_path = os.path.join(input_dir_abs, b'contents.xml')
if os.path.exists(content_xml_path):
if self.check_contents_xml(content_xml_path) != CheckFileResult.HANDLER_NO_MATCH:
return CheckFileResult.HANDLER_NO_MATCH
highest_priority_existing_file = None
for tmp_fn in reversed(xml_files_by_priority):
if os.path.exists(os.path.join(input_dir_abs, tmp_fn)):
highest_priority_existing_file = tmp_fn
assert highest_priority_existing_file is not None
if highest_priority_existing_file != self.fn:
return CheckFileResult.HANDLER_NO_MATCH # There is a better (higher priority) xml file, so let's ignore this one
logging.info("RawprogramUnsparseHandler: Checking file %r" % self.abs_fn)
return self.parse_xml_file(self.abs_fn)
def check_contents_xml(self, abs_contents_xml: bytes) -> CheckFileResult:
contents_xml_dir = os.path.dirname(abs_contents_xml)
parser = etree.XMLParser()
tree = etree.parse(open(abs_contents_xml), parser)
root = tree.getroot()
params_tags = root.xpath('//step[@filter="hlos"]/params')
if len(params_tags) == 0:
raise ValueError("Failed to parse contents.xml")
result = CheckFileResult.HANDLER_NO_MATCH
for params_tag in params_tags:
cmd_str = params_tag.text.strip()
assert "@rawprogram_xml" in cmd_str
cmd = shlex.split(cmd_str)
assert cmd[-2] == "-o"
xml_fn = cmd[-1]
abs_fn = os.path.join(contents_xml_dir, xml_fn.encode())
# assert os.path.exists(abs_fn), "File %r (referenced from %r) doesn't exist" % (abs_fn, self.abs_fn)
if os.path.exists(abs_fn):
result = self.parse_xml_file(abs_fn)
else:
logging.error("File %r (referenced from %r) doesn't exist", abs_fn, abs_contents_xml)
return result
def parse_xml_file(self, abs_xml_fn):
try:
root = ET.parse(open(abs_xml_fn))
except ET.ParseError:
# Workaround for crappy XML, e.g. document starting with </data
lines = open(abs_xml_fn).read().splitlines()
lines = [x for x in lines if "<program" in x]
logging.debug("\n".join(lines))
root = ET.XML("<data>\n" + "\n".join(lines) + "</data>")
program_tags = root.findall('program')
sector_size: Optional[int] = None
image_base_dir = os.path.dirname(abs_xml_fn)
partition_start_sector_by_label = {}
found_vendor_b: bool = False
for program_tag in program_tags:
label = program_tag.attrib["label"]
if label in ("system", "system_a", "vendor", "vendor_a", "super"):
logging.info("RawprogramUnsparseHandler: program_tag.attrib=%s" % json.dumps(program_tag.attrib, sort_keys=True))
# Sparse attribute can cause problems (sector size mismatch etc.), it will be handled directly by SuperImageHandler
if label == "super" and "sparse" in program_tag.attrib and program_tag.attrib["sparse"].lower() == "true":
continue
filename = program_tag.attrib["filename"]
abs_fn = os.path.join(image_base_dir, filename.encode())
if not os.path.exists(abs_fn):
if b'image/modem/' in abs_xml_fn:
return CheckFileResult.IGNORE
raise ValueError("File %r doesn't exist (referenced from %r)" % (abs_fn, abs_xml_fn))
if "SECTOR_SIZE_IN_BYTES" in program_tag.attrib:
if sector_size is None:
sector_size = int(program_tag.attrib["SECTOR_SIZE_IN_BYTES"])
assert sector_size in [512, 4096]
else:
assert int(program_tag.attrib["SECTOR_SIZE_IN_BYTES"]) == sector_size, "Inconsistent sector size: %r <=> %r" % (int(program_tag.attrib["SECTOR_SIZE_IN_BYTES"]), sector_size)
else:
# Found a program tag without SECTOR_SIZE_IN_BYTES => Fall back to default 512
sector_size = 512
assert os.stat(abs_fn).st_size % sector_size == 0, "File %r is not a multiple of %d bytes" % (abs_fn, sector_size)
start_sector = int(program_tag.attrib["start_sector"])
if label not in partition_start_sector_by_label:
partition_start_sector_by_label[label] = start_sector
start_pos = sector_size * (start_sector - partition_start_sector_by_label[label])
assert start_pos < 10 * 1024 * 1024 * 1024, "RawprogramUnsparseHandler: Sparse image too big (>10 GiB)"
if label.startswith("system"):
self.system_parts_with_pos.append([abs_fn, start_pos])
elif label.startswith("vendor"):
self.vendor_parts_with_pos.append([abs_fn, start_pos])
elif label.startswith("super"):
self.super_parts_with_pos.append([abs_fn, start_pos])
else:
raise ValueError("Bad label %r, this should not happen" % label)
elif label == "vendor_b":
found_vendor_b = True
elif label.startswith("custom") or label.startswith("userdata"):
filename = program_tag.attrib["filename"]
abs_fn = os.path.join(image_base_dir, filename.encode())
self.extra_ignored_size += os.stat(abs_fn).st_size
else:
# Just to make sure we aren't missing a vendor partition here
assert 'vendor' not in label.lower(), "Found unexpected program label containing 'vendor' in %r" % program_tag.attrib["label"]
if found_vendor_b:
assert len(self.vendor_parts_with_pos) > 0, "XML file %r contains vendor_b but no valid vendor" % abs_xml_fn
if len(self.system_parts_with_pos) == 0 and len(self.vendor_parts_with_pos) == 0 and len(self.super_parts_with_pos) == 0:
return CheckFileResult.HANDLER_NO_MATCH
return CheckFileResult.SYSTEM_OR_VENDOR
def get_extra_handled_size(self):
result = 0
for (part_fn, pos) in self.system_parts_with_pos:
result += os.stat(part_fn).st_size
for (part_fn, pos) in self.vendor_parts_with_pos:
result += os.stat(part_fn).st_size
for (part_fn, pos) in self.super_parts_with_pos:
result += os.stat(part_fn).st_size
return result
def has_vendor(self):
return len(self.vendor_parts_with_pos) > 0
def extract_and_get_next_handlers(self, stage_dir_rel) -> List[FileHandler]:
def extract_parts_to_file(my_parts: List[List], my_abs_out_fn):
with open(my_abs_out_fn, 'wb') as out_fh:
for item in my_parts:
# logging.info("ITEM: %r" % item)
(part_fn, pos) = item
out_fh.seek(pos)
with open(part_fn, 'rb') as in_fh:
while True:
buf = in_fh.read(1024 * 1024)
if len(buf) == 0:
break
out_fh.write(buf)
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
result: List[FileHandler] = []
if len(self.super_parts_with_pos) > 0:
assert len(self.system_parts_with_pos) == 0, "Can't have system and super image"
assert len(self.vendor_parts_with_pos) == 0, "Can't have vendor and super image"
output_fn = b'super.img'
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
extract_parts_to_file(self.super_parts_with_pos, abs_out_fn)
handler = SuperImageHandler(self.extractor, self.extractor.rel_path(abs_out_fn), file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result == CheckFileResult.HANDLER_NO_MATCH:
raise ValueError("RawprogramUnsparseHandler: Extracted super.img but SuperImageHandler returned HANDLER_NO_MATCH")
result.append(handler)
else:
for image_type in (ImageType.SYSTEM, ImageType.VENDOR):
if image_type == ImageType.VENDOR and len(self.vendor_parts_with_pos) == 0:
continue
output_fn = b'%s.img' % image_type.name.lower().encode()
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
parts: List[List]
if image_type == ImageType.SYSTEM:
parts = self.system_parts_with_pos
elif image_type == ImageType.VENDOR:
parts = self.vendor_parts_with_pos
else:
raise ValueError("Invalid image_type=%r" % image_type)
extract_parts_to_file(parts, abs_out_fn)
HANDLER_TYPES = [ExtfsHandler, ErofsHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("RawprogramUnsparseHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
result.append(handlers[0])
return result
class IgnoreRadioHandler(FileHandler):
"""
Handler to ignore radio-*.img files, e.g. radio-taimen-g8998-00253-1805232234.img
"""
def check(self) -> CheckFileResult:
if self.fn.startswith(b'radio-') and self.fn.endswith(b'.img'):
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreBootloaderHandler(FileHandler):
"""
Handler to ignore bootloader-*.img files, e.g. bootloader-taimen-tmz20k.img
"""
def check(self) -> CheckFileResult:
if self.fn.startswith(b'bootloader-') and self.fn.endswith(b'.img'):
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreVmlinuxHandler(FileHandler):
"""
Handler to ignore "vmlinux" files, helps with coverage for some images
"""
def check(self) -> CheckFileResult:
if self.fn.lower() == b"vmlinux":
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreOpImageHandler(FileHandler):
"""
Handler to ignore OP_\\d+.bin files, e.g. from /android/LG/may2017/H840AR10a_01_0906.kdz/H840AR10a_01_0906.kdz
"""
def check(self) -> CheckFileResult:
m = re.match(rb'op_\d+\.bin', self.fn.lower())
if m:
# May have various different file types, e.g. ext4 or jar. So let's just match by filename here.
logging.info("IgnoreOpImageHandler: file %r => file_type=%r" % (self.abs_fn, self.file_type))
return CheckFileResult.IGNORE
m = re.match(rb'op_\w+\.img', self.fn.lower())
if m:
# Sample: OP_OPEN_ZA.img from H84020c_00_OPEN_ZA_OP_0630.kdz
logging.info("IgnoreOpImageHandler: file %r => file_type=%r" % (self.abs_fn, self.file_type))
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreOemImgHandler(FileHandler):
"""
Handler to ignore oem.img files, e.g. from /android/2018-06/Blur_Version.28.11.15.payton_fi.google_fi.en.US.zip/Blur_Version.28.11.15.payton_fi.google_fi.en.US.zip
"""
def check(self) -> CheckFileResult:
if self.fn == b'oem.img':
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreAppsImgHandler(FileHandler):
"""
Handler to ignore apps.img (and apps_X.img) files
"""
def check(self) -> CheckFileResult:
if re.match(rb'apps(_\d+)?\.img', self.fn):
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreUpdateHwHandler(FileHandler):
"""
Handler to ignore update_full_*_hw_*.zip files
Required e.g. for /android/2018-11/9.0.0.47-c432.zip/9.0.0.47-c432.zip
Only required in ArchiveDirectoryHandler Pass2
"""
def check(self) -> CheckFileResult:
m = re.match(rb'update_full_.*_hw_\w+\.zip', self.fn.lower())
if m:
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreHuaweiUserdataAppHandler(FileHandler):
"""
Handler to ignore USERDATA.APP
Required so that the actual image (UPDATE.APP) will be >90%
"""
def check(self) -> CheckFileResult:
if self.fn.lower() == b"userdata.app":
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class IgnoreElfHandler(FileHandler):
"""
Handler to ignore elf files
Required to reach size coverage threshold
"""
def check(self) -> CheckFileResult:
if self.file_type.startswith("ELF ") and self.fn.lower().endswith(b'.elf'):
return CheckFileResult.IGNORE
return CheckFileResult.HANDLER_NO_MATCH
class SparseImageHandler(FileHandler):
abs_fn_list: List[bytes]
def check(self) -> CheckFileResult:
if self.file_type.startswith("Android sparse image, version: 1.0,"):
if self.fn.lower().startswith(b"system_other"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'hidden.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'cache'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'userdata.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'userdata_'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'persist.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'3rdmodem.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'cust.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'product.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'odm.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'oem.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'container.'):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b'apps.'):
# Vivo
return CheckFileResult.IGNORE
if self.fn.lower().endswith(b".duplicate"):
return CheckFileResult.IGNORE # splituapp duplicate file entries in Huawei UPDATE.APP
if self.fn.lower().startswith(b"op_") or self.fn.lower().startswith(b"op."):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"oem_"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"preas_"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"preas."):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"non-hlos."):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"super"):
return CheckFileResult.HANDLER_NO_MATCH # Will be covered by SuperImageHandler
self.abs_fn_list = []
if b"sparsechunk" in self.fn.lower():
if self.fn.lower().endswith(b"sparsechunk.0"):
base_abs_fn = self.abs_fn[0:-2]
for i in range(100):
abs_fn = base_abs_fn + b".%d" % i
if os.path.exists(abs_fn):
self.abs_fn_list.append(abs_fn)
else:
break
else:
return CheckFileResult.IGNORE
else:
self.abs_fn_list.append(self.abs_fn)
if self.fn.lower().startswith(b"system.") or self.fn.lower().startswith(b"system_a.") or self.fn.lower().startswith(b"system-sign."):
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
elif self.fn.lower().startswith(b"vendor.") or self.fn.lower().startswith(b"vendor_a.") or self.fn.lower().startswith(b"vendor-sign."):
self.image_type = ImageType.VENDOR
return CheckFileResult.VENDOR_IMG
elif self.fn.lower().startswith(b"system_b.") or self.fn.lower().startswith(b"vendor_b."):
return CheckFileResult.IGNORE
elif self.fn.lower().startswith(b"system_ext."):
return CheckFileResult.IGNORE
elif self.fn.lower().startswith(b"system_other."):
return CheckFileResult.IGNORE
else:
if os.stat(self.abs_fn).st_size < 32 * 1024 * 1024:
# Ignore images smaller than 32 MiB, these images can't be a valid system/vendor partition
return CheckFileResult.IGNORE
assert False, "SparseImageHandler: %r does not start with system/vendor (Size %.2f MiB)" % (self.fn, os.stat(self.abs_fn).st_size / 1024**2)
else:
return CheckFileResult.HANDLER_NO_MATCH
def get_extra_handled_size(self) -> int:
result = 0
# Only count from file 1
for fn in self.abs_fn_list[1:]:
result += os.stat(fn).st_size
return result
def extract_and_get_next_handler(self, stage_dir_rel):
output_fn = self.fn + b".SparseImageHandler"
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
cmd: List[bytes] = [b"simg2img"] + self.abs_fn_list + [abs_out_fn]
logging.info("SparseImageHandler: cmd=%r" % cmd)
subprocess.check_call(cmd)
assert os.path.exists(abs_out_fn)
HANDLER_TYPES = [ExtfsHandler, AsusMagicHandler, ErofsHandler, MotoPivHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("SparseImageHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class MotoPivHandler(FileHandler):
def check(self) -> CheckFileResult:
with open(self.abs_fn, 'rb') as f:
buf = f.read(32)
if buf[0:4] == b'MOTO' and b'MOT_PIV_FULL256' in buf:
if self.image_type == ImageType.SYSTEM:
return CheckFileResult.SYSTEM_IMG
elif self.image_type == ImageType.VENDOR:
return CheckFileResult.VENDOR_IMG
else:
raise ValueError("Bad image_type %r" % self.image_type)
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
output_fn = self.fn + b".MotoPivHandler"
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
with open(self.abs_fn, 'rb') as input_file:
buf = input_file.read(32)
offset = struct.unpack("<I", buf[24:28])[0]
assert offset <= 1024**2
assert offset % 512 == 0 # Just to make sure it is at least aligned to 512-byte sectors
input_file.seek(offset, 0)
with open(abs_out_fn, 'wb') as output_file:
while True:
buf = input_file.read(1024**2)
if buf == b'':
break
output_file.write(buf)
# Same structure as in SparseImageHandler, maybe we need other handlers later
HANDLER_TYPES = [ExtfsHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("MotoPivHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class SuperImageHandler(FileHandler):
is_sparse: bool
def check(self) -> CheckFileResult:
self.is_sparse = False
if self.file_type.startswith("Android sparse image, version: 1.0,"):
if self.fn.lower().startswith(b"super"):
self.is_sparse = True
return CheckFileResult.ARCHIVE
else:
return CheckFileResult.HANDLER_NO_MATCH
else:
with open(self.abs_fn, 'rb') as f:
if not liblp.check_magic(f):
return CheckFileResult.HANDLER_NO_MATCH
if not self.fn.lower().startswith(b'super'):
raise ValueError("Found liblp magic but not in super image, this should not happen")
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
super_img_fn = self.abs_fn
if self.is_sparse:
super_img_fn = self.abs_fn + b'.unsparse'
cmd: List[bytes] = [b"simg2img", self.abs_fn, super_img_fn]
subprocess.check_call(cmd)
super_img = liblp.SuperImage(super_img_fn)
abs_output_path = self.extractor.abs_fn(output_path_rel)
partition_names = super_img.get_partition_names()
found_system = False
for partition_name in ["system", "system_a", "system_b"]:
if partition_name in partition_names:
output_fn = os.path.join(abs_output_path, partition_name.encode() + b".img")
with open(output_fn, 'wb') as f:
super_img.write_partition(partition_name, f)
found_system = True
break
assert found_system, "Failed to find system in super.img"
found_vendor = False
for partition_name in ["vendor", "vendor_a", "vendor_b"]:
if partition_name in partition_names:
output_fn = os.path.join(abs_output_path, partition_name.encode() + b".img")
with open(output_fn, 'wb') as f:
super_img.write_partition(partition_name, f)
found_vendor = True
break
assert found_vendor, "Failed to find vendor in super.img"
class SignImgHandler(FileHandler):
"""
https://github.com/R0rt1z2/signimg2img
"""
def check(self) -> CheckFileResult:
magic_buf: bytes
# Read magic
with open(self.abs_fn, 'rb') as f:
magic_buf = f.read(4)
if magic_buf not in (b'BFBF', b'SSSS'):
return CheckFileResult.HANDLER_NO_MATCH
if self.fn.lower() == b"system-sign.img":
return CheckFileResult.SYSTEM_IMG
elif self.fn.lower() == b"vendor-sign.img":
return CheckFileResult.VENDOR_IMG
# TODO: Maybe also add boot/recovery images
assert b'system' not in self.fn, "Unexpected system image in SignImgHandler: %r" % self.fn
assert b'vendor' not in self.fn, "Unexpected vendor image in SignImgHandler: %r" % self.fn
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
output_fn = self.fn + b".SparseImageHandler"
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
with open(self.abs_fn, 'rb') as input_fh, open(abs_out_fn, 'wb') as output_fh:
buf = input_fh.read(1024)
magic = buf[0:4]
if magic == b'SSSS':
# https://github.com/R0rt1z2/signimg2img is using 60:64, but at least some images have offset 44:48
# Sample: TB-7305F_S000083_200703_ROW.zip
offset = struct.unpack("<I", buf[44:48])[0]
input_fh.seek(offset, 0)
while True:
buf = input_fh.read(1024 ** 2)
if buf == b'':
break
output_fh.write(buf)
else:
raise NotImplementedError("SignImgHandler: Magic %r not yet implemented" % bytes(magic))
HANDLER_TYPES = [SparseImageHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type,
file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG,
CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (
handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError(
"SignImgHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (
abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class AsusMagicHandler(FileHandler):
def check(self) -> CheckFileResult:
if self.fn.lower().startswith(b"system") or self.fn.lower().startswith(b"vendor"):
buf = open(self.abs_fn, 'rb').read(4096)
magic = buf[0x0:0xc]
if magic == b'ASUS MAGIC!\n':
if self.fn.lower().startswith(b"system"):
return CheckFileResult.SYSTEM_IMG
else:
assert self.fn.lower().startswith(b"vendor")
return CheckFileResult.VENDOR_IMG
else:
return CheckFileResult.HANDLER_NO_MATCH
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
output_fn = self.fn + b".AsusMagicHandler"
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
with open(self.abs_fn, 'rb') as input_file:
input_file.read(4096)
with open(abs_out_fn, 'wb') as output_file:
buf = input_file.read(128 * 1024)
while len(buf) > 0:
output_file.write(buf)
buf = input_file.read(128 * 1024)
HANDLER_TYPES = [ExtfsHandler, ErofsHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("AsusMagicHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class Lz4Handler(FileHandler):
def check(self) -> CheckFileResult:
if self.file_type.startswith("LZ4 compressed data"):
if not self.fn.lower().endswith(b'.lz4'):
if os.stat(self.abs_fn).st_size > 32 * 1024 * 1024:
raise ValueError("Bad LZ4 filename %r" % self.fn)
else:
return CheckFileResult.HANDLER_NO_MATCH
if self.fn.lower().startswith(b"system_other"):
return CheckFileResult.IGNORE
if self.fn.lower() == b"boot.img.lz4":
return CheckFileResult.HANDLER_NO_MATCH # Handled by BootImageHandler
if self.fn.lower() == b"recovery.img.lz4":
return CheckFileResult.HANDLER_NO_MATCH # Handled by RecoveryImageHandler
if self.fn.lower().startswith(b"persist."):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"userdata."): # userdata partition contains stuff like dalvik cache etc.
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"carrier."): # userdata partition contains stuff like dalvik cache etc.
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"product.img"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"cache.img"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"hidden.img"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"non-hlos"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"modem"):
return CheckFileResult.IGNORE
if self.fn.lower().startswith(b"system"):
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
elif self.fn.lower().startswith(b"vendor"):
self.image_type = ImageType.VENDOR
return CheckFileResult.VENDOR_IMG
else:
if os.stat(self.abs_fn).st_size < 32 * 1024 * 1024:
# Ignore images smaller than 32 MiB, these images can't be a valid system/vendor partition
return CheckFileResult.IGNORE
assert False, "Lz4Handler: %r does not start with system/vendor" % self.fn
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
assert self.fn.endswith(b'.lz4')
output_fn = self.fn[0:-4]
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
cmd = ["lz4", "-dc", self.abs_fn]
logging.info("Lz4Handler: cmd=%r, out=%r" % (cmd, abs_out_fn))
# The command "lz4 -d" is behaving differently depending on whether stdout is a console or not.
# If it is a console, it will strip the .lz4 extension and use the remaining path as output file.
# If it is not a console (e.g. if the extractor is called from another utility and stdout is captured),
# lz4 -d will just output the decompressed data to stdout. There is no command-line option to force output
# to a file, so let's force output to stdout and redirect it using subprocess
with open(abs_out_fn, 'wb') as f:
subprocess.check_call(cmd, stdout=f)
assert os.path.exists(abs_out_fn)
HANDLER_TYPES = [ExtfsHandler, SparseImageHandler, ErofsHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("Lz4Handler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class GzipHandler(FileHandler):
def check(self) -> CheckFileResult:
if self.file_type.startswith("gzip compressed data"):
assert self.fn.endswith(b'.gz') or self.fn.endswith(b'.tgz')
return CheckFileResult.ARCHIVE
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
if self.fn.endswith(b'.gz'):
output_fn = self.fn[0:-3]
elif self.fn.endswith(b'.tgz'):
output_fn = self.fn[0:-4] + b'.tar'
else:
assert False, "Invalid gzip filename %r" % self.fn
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
cmd = ["gzip", "-dc", self.abs_fn]
logging.info("GzipHandler: cmd=%r out=%r" % (cmd, abs_out_fn))
# The command "gzip -d" has no command line option to force output to a
# specific file.
with open(abs_out_fn, 'wb') as f:
retcode = subprocess.call(cmd, stdout=f)
# Exit code 2 means warning, e.g. "trailing garbage ignored"
assert retcode in (0, 2), "GzipHandler: command %r failed with exit code %r" % (cmd, retcode)
assert os.path.exists(abs_out_fn)
HANDLER_TYPES = [TarHandlerIgnoreExtension]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in [CheckFileResult.ARCHIVE], "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("GzipHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class Bzip2Handler(FileHandler):
def check(self) -> CheckFileResult:
if self.file_type.startswith("bzip2 compressed data"):
assert self.fn.endswith(b'.bz2')
return CheckFileResult.ARCHIVE
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
if self.fn.endswith(b'.bz2'):
output_fn = self.fn[0:-4]
else:
assert False, "Invalid bzip2 filename %r" % self.fn
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
cmd = ["bzip2", "-dc", self.abs_fn]
logging.info("Bzip2Handler: cmd=%r out=%r" % (cmd, abs_out_fn))
# The command "bzip2 -d" has no command line option to force output to a
# specific file.
with open(abs_out_fn, 'wb') as f:
subprocess.check_call(cmd, stdout=f)
assert os.path.exists(abs_out_fn)
HANDLER_TYPES = [TarHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in [CheckFileResult.ARCHIVE], "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("Bzip2Handler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class XzHandler(FileHandler):
def check(self) -> CheckFileResult:
if self.file_type.startswith("XZ compressed data"):
assert self.fn.endswith(b'.xz')
return CheckFileResult.ARCHIVE
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
if self.fn.endswith(b'.xz'):
output_fn = self.fn[0:-3]
else:
assert False, "Invalid xz filename %r" % self.fn
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
cmd = ["xz", "-dc", self.abs_fn]
logging.info("XzHandler: cmd=%r out=%r" % (cmd, abs_out_fn))
# The command "xz -d" has no command line option to force output to a
# specific file.
with open(abs_out_fn, 'wb') as f:
subprocess.check_call(cmd, stdout=f)
assert os.path.exists(abs_out_fn)
HANDLER_TYPES = [TarHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in [CheckFileResult.ARCHIVE], "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("XzHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class BrotliHandler(FileHandler):
def check(self) -> CheckFileResult:
# Brotli is not recognized with file
if self.fn == b'system.new.dat.br':
if os.path.exists(os.path.join(os.path.dirname(self.abs_fn), b'system.transfer.list')):
# This case is handled by TransferListHandler, which also contains brotli decompression
return CheckFileResult.HANDLER_NO_MATCH
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
elif self.fn == b'vendor.new.dat.br':
if os.path.exists(os.path.join(os.path.dirname(self.abs_fn), b'vendor.transfer.list')):
# This case is handled by TransferListHandler, which also contains brotli decompression
return CheckFileResult.HANDLER_NO_MATCH
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_and_get_next_handler(self, stage_dir_rel):
output_fn = self.fn[0:-3]
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
cmd = ["brotli", "--decompress", self.abs_fn, b"--output=%s" % abs_out_fn]
logging.info("BrotliHandler: cmd=%r" % cmd)
subprocess.check_call(cmd)
assert os.path.exists(abs_out_fn)
HANDLER_TYPES = [ExtfsHandler, ErofsHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("BrotliHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class TransferListHandler(FileHandler):
file_size: int
new_commands: List[List[int]]
data_files: List[bytes]
image_type: ImageType
force_single_file: bool
BLOCK_SIZE = 4096
def check(self) -> CheckFileResult:
if self.fn == b"system.transfer.list":
self.image_type = ImageType.SYSTEM
elif self.fn == b"vendor.transfer.list":
self.image_type = ImageType.VENDOR
else:
return CheckFileResult.HANDLER_NO_MATCH
self.file_size = 0
self.new_commands = []
self.data_files = []
for line in open(self.abs_fn):
line_split = line.split(" ")
cmd = line_split[0]
if cmd in ("new", "erase", "zero"):
assert len(line_split) == 2, "Not exactly 2 items in line %r" % line
cmd_data = line_split[1]
cmd_data_items = [int(x) for x in cmd_data.split(",")]
# First element is number of elements
assert cmd_data_items[0] == len(cmd_data_items) - 1
# Find file size based on maximum block number
for i in range(1, len(cmd_data_items), 2):
if cmd_data_items[i] * TransferListHandler.BLOCK_SIZE > self.file_size:
self.file_size = cmd_data_items[i] * TransferListHandler.BLOCK_SIZE
if cmd == "new":
self.new_commands.append(cmd_data_items[1:])
# Some firmwares append ".1", ".2", ... to the input files for individual "new" commands in system.transfer.list.
# Other firmwares use one single file for that.
self.force_single_file = False
for i in range(len(self.new_commands)):
if self.image_type == ImageType.SYSTEM:
fn = b'system.new.dat'
elif self.image_type == ImageType.VENDOR:
fn = b'vendor.new.dat'
else:
raise ValueError("Bad image_type %r" % self.image_type)
fn_with_index = fn + (".%d" % i).encode()
if i == 1:
abs_fn_with_index = os.path.join(os.path.dirname(self.abs_fn), fn_with_index)
if not os.path.exists(abs_fn_with_index) and not os.path.exists(abs_fn_with_index + b'.br'):
self.force_single_file = True
if i > 0 and not self.force_single_file:
fn = fn_with_index
abs_fn = os.path.join(os.path.dirname(self.abs_fn), fn)
if os.path.exists(abs_fn):
self.data_files.append(abs_fn)
else:
abs_fn += b'.br'
assert os.path.exists(abs_fn), "File %r (referenced from %r) doesn't exist" % (abs_fn, self.abs_fn)
self.data_files.append(abs_fn)
assert self.file_size > 0
if self.image_type == ImageType.SYSTEM:
return CheckFileResult.SYSTEM_IMG
elif self.image_type == ImageType.VENDOR:
return CheckFileResult.VENDOR_IMG
else:
raise ValueError("Bad image_type %r" % self.image_type)
def get_extra_handled_size(self) -> int:
result = 0
for fn in self.data_files:
result += os.stat(fn).st_size
return result
def extract_and_get_next_handler(self, stage_dir_rel) -> FileHandler:
output_fn = self.fn[0:-len(b'.transfer.list')] + b'.img'
abs_stage_dir = self.extractor.create_stage_dir(stage_dir_rel)
abs_out_fn = os.path.join(abs_stage_dir, output_fn)
with open(abs_out_fn, 'wb') as output_file:
assert len(self.new_commands) == len(self.data_files)
if self.force_single_file:
data_file = self.data_files[0]
if data_file.endswith(b".br"):
real_data_file = data_file[0:-3]
cmd = ["brotli", "--decompress", data_file, b"--output=%s" % real_data_file]
subprocess.check_call(cmd)
data_file = real_data_file
with open(data_file, 'rb') as input_file:
for cmd_index in range(len(self.new_commands)):
new_cmd = self.new_commands[cmd_index]
for i in range(0, len(new_cmd), 2):
begin_block = new_cmd[i]
end_block = new_cmd[i + 1]
block_cnt = end_block - begin_block
output_file.seek(begin_block * TransferListHandler.BLOCK_SIZE)
for _i in range(block_cnt):
buf = input_file.read(TransferListHandler.BLOCK_SIZE)
assert len(buf) == TransferListHandler.BLOCK_SIZE, "Short read from %r: %d bytes" % (data_file, len(buf))
output_file.write(buf)
else:
for cmd_index in range(len(self.new_commands)):
new_cmd = self.new_commands[cmd_index]
data_file = self.data_files[cmd_index]
if data_file.endswith(b".br"):
real_data_file = data_file[0:-3]
cmd = ["brotli", "--decompress", data_file, b"--output=%s" % real_data_file]
subprocess.check_call(cmd)
data_file = real_data_file
with open(data_file, 'rb') as input_file:
for i in range(0, len(new_cmd), 2):
begin_block = new_cmd[i]
end_block = new_cmd[i+1]
block_cnt = end_block - begin_block
output_file.seek(begin_block * TransferListHandler.BLOCK_SIZE)
for _i in range(block_cnt):
buf = input_file.read(TransferListHandler.BLOCK_SIZE)
assert len(buf) == TransferListHandler.BLOCK_SIZE, "Short read from %r: %d bytes" % (data_file, len(buf))
output_file.write(buf)
if output_file.tell() < self.file_size:
output_file.truncate(self.file_size)
HANDLER_TYPES = [ExtfsHandler, ErofsHandler]
handlers = []
for handler_type in HANDLER_TYPES:
handler = handler_type(self.extractor, self.extractor.rel_path(abs_out_fn), image_type=self.image_type, file_type=get_file_type(abs_out_fn))
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
assert handler_result in (CheckFileResult.SYSTEM_IMG, CheckFileResult.VENDOR_IMG, CheckFileResult.SYSTEM_OR_VENDOR), "Unexpected handler_result=%r from handler %r" % (handler_result, handler.__class__.__name__)
handlers.append(handler)
if len(handlers) > 1:
raise MultipleHandlerMatchError("File %r: %r" % (abs_out_fn, [x.__class__.__name__ for x in handlers]))
elif len(handlers) == 0:
raise NoHandlerMatchError("TransferListHandler.extract_and_get_next_handler(): Don't know what to do with %r (file_type=%r)" % (abs_out_fn, get_file_type(abs_out_fn)))
else:
return handlers[0]
class NokiaPayloadBinHandler(FileHandler):
"""
Handler for Nokia payload.bin files
"""
def check(self) -> CheckFileResult:
if self.fn.lower() != b'payload.bin':
return CheckFileResult.HANDLER_NO_MATCH
# file_type is 'data' => no check for that
assert os.stat(self.abs_fn).st_size >= 32 * 1024 * 1024
return CheckFileResult.ARCHIVE
def extract_file2dir(self, output_path_rel):
global base_dir
abs_output_path = self.extractor.abs_fn(output_path_rel)
cmd = ["python3", "%s/nokia-dumper/payload_dumper.py" % base_dir, self.abs_fn, abs_output_path]
logging.info("NokiaPayloadBinHandler.extract_file2dir(%r): cmd=%r" % (output_path_rel, cmd))
subprocess.check_call(cmd)
class MountableImage(FileHandler):
def mount(self, mountpoint):
assert False, "Must be implemented in subclass"
def umount(self):
assert False, "Must be implemented in subclass"
def check(self) -> CheckFileResult:
raise NotImplementedError()
class ExtfsHandler(MountableImage):
mountpoint: Optional[bytes]
def check(self) -> CheckFileResult:
if self.file_type.startswith("Linux rev 1.0 ext4 filesystem data") or self.file_type.startswith("Linux rev 1.0 ext2 filesystem data"):
if self.fn.lower() in (b'system.new.dat', b'vendor.new.dat'):
# These files are not the actual filesystem and need to be assembled based on system.transfer.list/vendor.transfer.list
return CheckFileResult.HANDLER_NO_MATCH
if self.image_type == ImageType.SYSTEM:
return CheckFileResult.SYSTEM_IMG
elif self.image_type == ImageType.VENDOR:
return CheckFileResult.VENDOR_IMG
# system_1.img is a potential false positive, so let's enforce filenames starting with "system."
if self.fn.lower().startswith(b"system.") or self.fn.lower().startswith(b"system_a.") or self.fn.lower().startswith(b"system_b."):
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
elif self.fn.lower().endswith(b'stock_system_image.img') and os.stat(self.abs_fn).st_size > 1024**3:
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
elif self.fn.lower().endswith(b'system_raw.img') and os.stat(self.abs_fn).st_size > 256*1024**2:
self.image_type = ImageType.SYSTEM
return CheckFileResult.SYSTEM_IMG
elif self.fn.lower().startswith(b"vendor.") or self.fn.lower().startswith(b"vendor_a.") or self.fn.lower().startswith(b"vendor_b."):
self.image_type = ImageType.VENDOR
return CheckFileResult.VENDOR_IMG
else:
logging.warning("ExtfsHandler: %r does not start with system/vendor" % self.abs_fn)
return CheckFileResult.HANDLER_NO_MATCH
else:
return CheckFileResult.HANDLER_NO_MATCH
def mount(self, mountpoint):
mountpoint = self.extractor.abs_fn(mountpoint)
assert not hasattr(self, "mountpoint") or self.mountpoint is None, "ExtfsHandler: Can only mount once"
assert os.path.exists(mountpoint), "Mountpoint %r doesn't exist" % mountpoint
assert os.path.isdir(mountpoint), "Mountpoint %r is not a directory" % mountpoint
# Increase size when required
target_size = 0
dumpe2fs_cmd = ["dumpe2fs", "-h", self.abs_fn]
logging.info("ExtfsHandler.mount(): dumpe2fs_cmd=%r" % dumpe2fs_cmd)
for line in subprocess.Popen(dumpe2fs_cmd, stdout=subprocess.PIPE).communicate()[0].splitlines():
m = re.match(r'Block count:\s*(\d+)', line.decode("ASCII"))
if m:
target_size = 4096 * int(m.group(1))
logging.debug("TARGET SIZE: %d", target_size)
logging.debug("ACTUAL SIZE: %d", os.stat(self.abs_fn).st_size)
if target_size > os.stat(self.abs_fn).st_size:
logging.debug("Increasing filesystem size to %d bytes (%.3fGB)",
target_size, target_size / (1024.0 * 1024 * 1024))
fh = open(self.abs_fn, "rb+")
fh.seek(target_size - 1)
fh.write(b'\0')
fh.close()
check_cmd = ["e2fsck", "-y", "-f", self.abs_fn]
logging.info("ExtfsHandler.mount(): check_cmd=%r" % check_cmd)
retcode = subprocess.call(check_cmd)
# 0: No errors
# 1/2: Errors fixed
# 8: Operational errors, e.g. new FEATURE_R14 for Android 10 images
assert retcode in (0, 1, 2, 8), "Failed to check/fix filesystem, e2fsck returned %d" % retcode
self.mountpoint = mountpoint
# Some Android10 images can only be mounted read-only due to new filesystem features
mount_cmd = ["mount", "-o", "loop,ro", self.abs_fn, mountpoint]
logging.info("ExtfsHandler.mount(): mount_cmd=%r" % mount_cmd)
subprocess.check_call(mount_cmd)
def umount(self):
cmd = ["umount", self.mountpoint]
logging.info("MountableImage.umount: cmd=%r" % cmd)
self.mountpoint = None
subprocess.check_call(cmd)
class FilesystemExtractor(FileHandler):
def check(self) -> CheckFileResult:
raise NotImplementedError()
def extract_filesystem(self, output_dir):
raise NotImplementedError("Must be implemented in subclass")
class ErofsHandler(FilesystemExtractor):
def check(self) -> CheckFileResult:
with open(self.abs_fn, 'rb') as f:
f.seek(0x400)
buf = f.read(4)
if buf == b'\xe2\xe1\xf5\xe0':
if self.image_type == ImageType.SYSTEM:
return CheckFileResult.SYSTEM_IMG
elif self.image_type == ImageType.VENDOR:
return CheckFileResult.VENDOR_IMG
else:
raise ValueError("ErofsHandler: Detected EROFS filesystem but self.image_type is not ImageType.SYSTEM or ImageType.VENDOR")
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_filesystem(self, output_dir):
global base_dir
erofs_tool = os.path.join(base_dir, "erofs_tool.py")
subprocess.check_call([erofs_tool, "extract", "--verify-zip", self.abs_fn, output_dir])
class CpbHandler(FileHandler):
def check(self) -> CheckFileResult:
ext = self.fn.split(b'.')[-1].lower()
if ext != b'cpb':
return CheckFileResult.HANDLER_NO_MATCH
with open(self.abs_fn, 'rb') as f:
buf = f.read(4)
if buf == b'CP\x03\x03':
return CheckFileResult.ARCHIVE
else:
return CheckFileResult.HANDLER_NO_MATCH
def extract_file2dir(self, output_path_rel):
raise NotImplementedError("TODO: Implement CPB files, see https://github.com/scue/unpackcpb/blob/master/unpackcpb.c")
class BootImageHandler(FileHandler):
def check(self) -> CheckFileResult:
if self.fn.lower() == b'boot.img':
# Some boot/recovery images have type 'data', e.g. for ryo
# assert self.file_type.lower().startswith("android bootimg")
return CheckFileResult.BOOT_IMG
elif self.fn.lower() == b'boot.img.lz4':
return CheckFileResult.BOOT_IMG
elif self.fn.lower() == b'boot.img.p':
# Some kind of binary patch. ignored for now
return CheckFileResult.IGNORE
elif self.fn.lower().startswith(b'boot.img'):
assert False, "Potential boot image: %r (file_type=%r)" % (self.abs_fn, self.file_type)
return CheckFileResult.HANDLER_NO_MATCH
def write_image(self, f):
if self.fn.lower() == b'boot.img':
f.write(open(self.abs_fn, 'rb').read())
elif self.fn.lower() == b'boot.img.lz4':
f.write(subprocess.check_output(["lz4cat", self.abs_fn]))
class RecoveryImageHandler(FileHandler):
def check(self) -> CheckFileResult:
if self.fn.lower() == b'recovery.img':
# Some boot/recovery images have type 'data', e.g. for ryo
# assert self.file_type.lower().startswith("android bootimg")
return CheckFileResult.RECOVERY_IMG
elif self.fn.lower() == b'recovery.img.lz4':
return CheckFileResult.BOOT_IMG
elif self.fn.lower() == b'recovery.img.p':
# Some kind of binary patch. ignored for now
return CheckFileResult.IGNORE
elif self.fn.lower().startswith(b'recovery.img'):
assert False, "Potential recovery image: %r (file_type=%r)" % (self.abs_fn, self.file_type)
return CheckFileResult.HANDLER_NO_MATCH
def write_image(self, f):
if self.fn.lower() == b'recovery.img':
f.write(open(self.abs_fn, 'rb').read())
elif self.fn.lower() == b'recovery.img.lz4':
f.write(subprocess.check_output(["lz4cat", self.abs_fn]))
class MultipleHandlerMatchError(Exception):
pass
class NoHandlerMatchError(Exception):
pass
class ArchiveDirectoryHandler:
def __init__(self, extractor, input_path_rel):
self.extractor = extractor
self.input_path_rel = input_path_rel
self.abs_dir = self.extractor.abs_fn(input_path_rel)
def get_next_handlers(self):
# Pass 0: Check if the directory contains the unpacked system partition already
if os.path.exists(os.path.join(self.abs_dir, b'system', b'build.prop')):
filelist = [
b'system/bin/audioserver',
b'system/lib/libstagefright.so',
b'system/lib64/libstagefright.so',
b'system/bin/sh',
b'system/framework/wifi-service.jar',
b'system/lib/libssl.so',
b'system/framework/services.jar',
b'system/framework/telephony-common.jar'
]
found_list = []
for fn in filelist:
if os.path.exists(os.path.join(self.abs_dir, fn)):
found_list.append(fn)
if len(found_list) > 0: # Some archives just contain system/build.prop but nothing else, so let's continue to normal extraction in these cases
assert len(found_list) >= 3, "Only %d items of system partition found: %r" % (len(found_list), found_list)
result = [SystemDirectoryHandler(self.extractor, os.path.join(self.input_path_rel, b'system'))]
# Also allow boot.img/recovery.img
for dirpath, dirnames, filenames in os.walk(self.abs_dir):
for file in filenames:
# We are only looking for boot images, so no need to look into system => Significant performance improvement
if b'system' in dirnames:
dirnames.remove(b'system')
abs_fn = os.path.join(self.abs_dir, dirpath, file)
rel_path = self.extractor.rel_path(os.path.join(dirpath, file))
file_type = get_file_type(abs_fn)
for handler_type in [BootImageHandler, RecoveryImageHandler]:
handler = handler_type(self.extractor, rel_path, file_type)
if handler.check() in (CheckFileResult.BOOT_IMG, CheckFileResult.RECOVERY_IMG):
result.append(handler)
return result
# Pass 0: Do rawprogram_XXX.xml, return if acceptable
handlers_found_pass0: List[RawprogramUnsparseHandler] = list()
total_handled_size = 0
total_ignored_size = 0
total_size = 0
for dirpath, _dirnames, filenames in os.walk(self.abs_dir):
for file in filenames:
abs_fn = os.path.join(self.abs_dir, dirpath, file)
rel_path = self.extractor.rel_path(os.path.join(dirpath, file))
if os.path.isfile(abs_fn):
sr = os.stat(abs_fn)
total_size += sr.st_size
if file.lower().endswith(b'.xml'):
file_type = get_file_type(abs_fn)
handler_pass0 = RawprogramUnsparseHandler(self.extractor, rel_path, file_type)
handler_result = handler_pass0.check()
if handler_result not in (CheckFileResult.HANDLER_NO_MATCH, CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE, CheckFileResult.IGNORE):
assert handler_result == CheckFileResult.SYSTEM_OR_VENDOR, "Bad handler_result %r for RawprogramUnsparseHandler" % handler_result
handlers_found_pass0.append(handler_pass0)
total_handled_size += sr.st_size
total_ignored_size += handler_pass0.extra_ignored_size
total_handled_size += handler_pass0.get_extra_handled_size()
elif file.lower().endswith(b".elf") or file.lower().endswith(b".mbn"):
total_ignored_size += sr.st_size
if len(handlers_found_pass0) == 1:
if total_handled_size + total_ignored_size > 0.8 * total_size - 100e6:
return handlers_found_pass0
elif handlers_found_pass0[0].has_vendor():
return handlers_found_pass0
else:
raise ValueError("RawprogramUnsparseHandler doesn't handle enough, total_handled_size=%.2fMiB total_size=%.2fMiB" % (total_handled_size/1024**2, total_size/1024**2))
elif len(handlers_found_pass0) > 1:
raise MultipleHandlerMatchError("Multiple RawprogramUnsparseHandler found: %r!" % [x.abs_fn for x in handlers_found_pass0])
# Pass 1: Find image handlers, accept solution and return if 95% of the size is accounted for (ignored, system/vendor img, boot/recovery img
total_size = 0
ignored_size = 0 # Files intentionally ignored
unmatched_size = 0 # Files not matched by any handler
handled_size = 0
extra_handled_size = 0 # Additional files handled by matching handler
HANDLER_LIST_PASS1 = [
ExtfsHandler,
ErofsHandler,
SparseImageHandler,
SignImgHandler,
TransferListHandler,
BrotliHandler,
Lz4Handler,
IgnoreBadTarMd5Handler,
IgnoreRadioHandler,
IgnoreBootloaderHandler,
IgnoreOpImageHandler,
IgnoreOemImgHandler,
IgnoreElfHandler,
IgnoreVmlinuxHandler,
BootImageHandler,
RecoveryImageHandler,
PacHandler,
IgnoreAppsImgHandler
]
handlers_found_pass1: List[FileHandler] = list()
ignore_size_coverage: bool = False
found_system_img: bool = False
found_vendor_img: bool = False
ignored_archive_size: int = 0
for dirpath, dirnames, filenames in os.walk(self.abs_dir):
for file in filenames:
abs_fn = os.path.join(self.abs_dir, dirpath, file)
if os.path.islink(abs_fn):
continue
ext = file.split(b".")[-1]
rel_path = self.extractor.rel_path(os.path.join(dirpath, file))
assert os.path.exists(abs_fn), "File %r doesn't exist" % abs_fn
if os.path.isfile(abs_fn):
sr = os.stat(abs_fn)
total_size += sr.st_size
handler_result_to_handlers: DefaultDict[CheckFileResult, List[FileHandler]] = defaultdict(list)
for handler_type in HANDLER_LIST_PASS1:
file_type = get_file_type(abs_fn)
# logging.info("HANDLER_TYPE: %r rel_path=%r" % (handler_type, rel_path))
handler = handler_type(self.extractor, rel_path, file_type)
handler_result = handler.check()
if handler_result == CheckFileResult.HANDLER_NO_MATCH:
pass # Handler doesn't match, ignore it
elif handler_result == CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE:
ignore_size_coverage = True
else:
handler_result_to_handlers[handler_result].append(handler)
if len(handler_result_to_handlers) > 1:
logging.error("Multiple handler results for %r" % abs_fn)
for (handler_result, handlers) in handler_result_to_handlers.items():
logging.error("%r => %r" % (handler_result, [type(x) for x in handlers]))
raise MultipleHandlerMatchError()
elif len(handler_result_to_handlers) == 1:
handler_result: CheckFileResult = list(handler_result_to_handlers.keys())[0]
handlers: List[FileHandler] = handler_result_to_handlers[handler_result]
if handler_result == CheckFileResult.IGNORE:
# Allow multiple handlers for result IGNORE
logging.info("Ignoring file %r due to %r" % (abs_fn, [type(x) for x in handlers]))
ignored_size += sr.st_size
elif len(handlers) > 1:
logging.error("Multiple handlers for %r => %r: %r" % (abs_fn, handler_result, [type(x) for x in handlers]))
raise MultipleHandlerMatchError()
else:
handler: FileHandler = handlers[0]
logging.info("Selected handler %s for %r" % (handler.__class__.__name__, abs_fn))
handled_size += sr.st_size
extra_handled_size += handler.get_extra_handled_size()
handlers_found_pass1.append(handler)
if handler_result == CheckFileResult.SYSTEM_IMG:
found_system_img = True
elif handler_result == CheckFileResult.VENDOR_IMG:
found_vendor_img = True
else:
logging.info("Ignoring file %r since no handler matches" % abs_fn)
if ext.lower() in (b"tar", b"zip", b"rar") or abs_fn.endswith(b".tar.gz"):
ignored_archive_size += sr.st_size
logging.info("ignored_archive_size += %d => %d (file %r)" % (sr.st_size, ignored_archive_size, abs_fn))
unmatched_size += sr.st_size
# Check if Pass 1 solution can be accepted
total_handled_size = (handled_size + extra_handled_size)
total_unmatched_size = unmatched_size - extra_handled_size
found_pac = False
for handler in handlers_found_pass1:
if isinstance(handler, PacHandler):
found_pac = True
logging.info("PASS1: total_handled_size=%r total_unmatched_size=%r ignored_archive_size=%r found_system_img=%r found_vendor_img=%r", total_handled_size, total_unmatched_size, ignored_archive_size, found_system_img, found_vendor_img)
if total_handled_size >= 0.85 * (handled_size + total_unmatched_size) or (total_handled_size > 0 and ignore_size_coverage):
return handlers_found_pass1
elif found_system_img and found_vendor_img and total_handled_size > 0.85 * (handled_size + total_unmatched_size - ignored_archive_size):
# Some firmwares contain a second copy of the firmware within an archive (tar/tar.gz/...).
# If we have a system/vendor image, we can check if 85% of the total size is covered while ignoring
# additional archives.
return handlers_found_pass1
elif found_pac and total_handled_size > 0.85 * (handled_size + total_unmatched_size - ignored_archive_size):
# Some firmwares contain a second copy of the firmware within an archive (tar/tar.gz/...).
# If we have a PAC image, we can check if 85% of the total size is covered while ignoring
# additional archives.
return handlers_found_pass1
elif total_handled_size >= 0.1 * (handled_size + total_unmatched_size):
logging.warning("ArchiveDirectoryHandler.get_handlers(): Rejecting pass 1 with covered percentage %.2f%%" % (100.0 * total_handled_size / (handled_size + total_unmatched_size)))
logging.info("ArchiveDirectoryHandler.get_handlers(): Going to pass 2")
# Pass 2: Find biggest file, check if is an archive file and it is at least 90% of total size
# Handle boot/recovery images and intentionally ignore unwanted files
HANDLER_LIST_PASS2 = [
IgnoreBadTarMd5Handler,
IgnoreRadioHandler,
IgnoreBootloaderHandler,
IgnoreOpImageHandler,
IgnoreOemImgHandler,
IgnoreUpdateHwHandler, # Only for Pass 2
IgnoreHuaweiUserdataAppHandler,
IgnoreElfHandler,
BootImageHandler,
RecoveryImageHandler
]
# Hanlder list for the biggest file only
# Will only be used if the biggest file reaches a certain percentage of the total
# size (excluding boot/recovery image and intentionally ignored files)
# Contains all kind of archive handlers
HANDLER_LIST_PASS2_BIGGEST_FILE = [
ZipHandler,
TarHandler,
SinHandler,
PacHandler,
OzipHandler,
HuaweiAppHandler,
DzHandler,
NokiaPayloadBinHandler,
CpbHandler,
SuperImageHandler
]
# Find biggest file
total_size = 0
unmatched_size = 0 # Files not matched by any handler
handled_size = 0
ignored_size = 0
# ignore_size_coverage = False
biggest_file_size = 0
biggest_file_abs = None
biggest_file_rel = None
handlers_found_pass2: List[FileHandler] = []
for dirpath, dirnames, filenames in os.walk(self.abs_dir):
for file in filenames:
abs_fn = os.path.join(self.abs_dir, dirpath, file)
if os.path.islink(abs_fn):
continue
rel_path = self.extractor.rel_path(os.path.join(dirpath, file))
assert os.path.exists(abs_fn), "File %r doesn't exist" % abs_fn
if os.path.isfile(abs_fn):
sr = os.stat(abs_fn)
total_size += sr.st_size
# Find biggest file
if sr.st_size > biggest_file_size:
biggest_file_size = sr.st_size
biggest_file_abs = abs_fn
biggest_file_rel = rel_path
handler_result_to_handlers = defaultdict(list)
for handler_type in HANDLER_LIST_PASS2:
file_type = get_file_type(abs_fn)
handler = handler_type(self.extractor, rel_path, file_type)
handler_result = handler.check()
if handler_result == CheckFileResult.HANDLER_NO_MATCH:
pass # Handler doesn't match, ignore it
elif handler_result == CheckFileResult.HANDLER_NO_MATCH_AND_IGNORE_SIZE_COVERAGE:
# ignore_size_coverage = True
pass
else:
handler_result_to_handlers[handler_result].append(handler)
if len(handler_result_to_handlers) > 1:
logging.error("PASS2: Multiple handler results for %r" % abs_fn)
for (handler_result, handlers) in handler_result_to_handlers.items():
logging.error("%r => %r" % (handler_result, [type(x) for x in handlers]))
raise MultipleHandlerMatchError()
elif len(handler_result_to_handlers) == 1:
handler_result = list(handler_result_to_handlers.keys())[0]
handlers = handler_result_to_handlers[handler_result]
if handler_result == CheckFileResult.IGNORE:
# Allow multiple handlers for result IGNORE
logging.info("PASS2: Ignoring file %r due to %r" % (abs_fn, [type(x) for x in handlers]))
ignored_size += sr.st_size
elif len(handlers) > 1:
logging.error("PASS2: Multiple handlers for %r => %r: %r" % (abs_fn, handler_result, [type(x) for x in handlers]))
raise MultipleHandlerMatchError()
else:
handler = handlers[0]
logging.info("PASS2: Selected handler %s for %r" % (handler.__class__.__name__, abs_fn))
handled_size += sr.st_size
extra_handled_size += handler.get_extra_handled_size()
handlers_found_pass2.append(handler)
else:
logging.info("PASS2: Ignoring file %r since no handler matches" % abs_fn)
unmatched_size += sr.st_size
total_uncovered_size = total_size - ignored_size - handled_size # ignroed_size is from pass1
logging.info("ArchiveDirectoryHandler.get_handlers(): PASS2: Biggest file: %.3fMiB/%.3fMiB (%.2f%%): %r" % (biggest_file_size / (1024 * 1024), total_uncovered_size / (1024 * 1024), 100.0 * biggest_file_size / total_uncovered_size, biggest_file_abs))
sr = os.stat(biggest_file_abs)
if sr.st_size > 0.9 * total_uncovered_size:
handler_result_to_handlers = defaultdict(list)
for handler_type in HANDLER_LIST_PASS2_BIGGEST_FILE:
file_type = get_file_type(biggest_file_abs)
handler = handler_type(self.extractor, biggest_file_rel, file_type)
handler_result = handler.check()
if handler_result != CheckFileResult.HANDLER_NO_MATCH:
handler_result_to_handlers[handler_result].append(handler)
if len(handler_result_to_handlers) > 1:
logging.error("Multiple handler results for %r" % biggest_file_abs)
for (handler_result, handlers) in handler_result_to_handlers.items():
logging.error("%r => %r" % (handler_result, [type(x) for x in handlers]))
raise MultipleHandlerMatchError()
elif len(handler_result_to_handlers) == 1:
handler_result = list(handler_result_to_handlers.keys())[0]
handlers = handler_result_to_handlers[handler_result]
if handler_result == CheckFileResult.IGNORE:
raise NoHandlerMatchError("Biggest file (>90%%) is IGNORED: %r" % biggest_file_abs)
elif len(handlers) > 1:
logging.error("Multiple handlers for %r => %r: %r" % (biggest_file_abs, handler_result, [type(x) for x in handlers]))
raise MultipleHandlerMatchError()
else:
handler = handlers[0]
logging.info("Selected handler %r for %r" % (type(handler), biggest_file_abs))
# handlers_found_pass2 may contain boot/recovery handler
return handlers_found_pass2 + [handler]
else:
logging.info("Ignoring biggest file file %r since no handler matches" % biggest_file_abs)
unmatched_size += sr.st_size
# Still here? => Don't know what to do, just list biggest files for now
path2size = {}
total_size = 0
for dirpath, dirnames, filenames in os.walk(self.abs_dir):
for file in filenames:
abs_fn = os.path.join(self.abs_dir, dirpath, file)
if os.path.islink(abs_fn):
continue
assert os.path.exists(abs_fn), "File %r doesn't exist" % abs_fn
if os.path.isfile(abs_fn):
sr = os.stat(abs_fn)
path2size[os.path.join(dirpath, file)] = sr.st_size
total_size += sr.st_size
logging.error("ArchiveDirectoryHandler.get_handlers(): Don't know what to do. Biggest files (sorted by size):")
for path in sorted(path2size.keys(), key=lambda tmp_path: -path2size[tmp_path]):
logging.error(" %.3fMiB: %s" % (path2size[path] / 1024.0 / 1024.0, path.decode(errors='ignore')))
raise ValueError("ArchiveDirectoryHandler.get_handlers(): Don't know what to do.")
class SystemDirectoryHandler:
def __init__(self, extractor, system_dir_rel):
self.extractor = extractor
self.system_dir_rel = system_dir_rel
self.system_dir_abs = self.extractor.abs_fn(system_dir_rel)
def get_system_dir(self):
return self.system_dir_abs
class TopLevelFileHandler:
def __init__(self, extractor, input_path_rel, image_type=None, top_level_file=False):
self.extractor = extractor
self.input_path_rel = input_path_rel
self.abs_fn = self.extractor.abs_fn(input_path_rel)
self.image_type = image_type
self.top_level_file: bool = top_level_file
def get_next_handler(self):
handler_list = [
TopLevelZipHandler,
TarHandler,
GzipHandler,
Bzip2Handler,
XzHandler,
PacHandler,
OzipHandler,
SevenZipHandler,
RarHandler,
KdzHandler,
DzHandler,
ExtfsHandler,
ErofsHandler,
CpbHandler
]
handlers_found = []
for handler_type in handler_list:
handler = handler_type(self.extractor, self.input_path_rel, file_type=get_file_type(self.abs_fn))
check_result = handler.check()
if check_result == CheckFileResult.HANDLER_NO_MATCH:
continue
handlers_found.append(handler)
if len(handlers_found) == 0:
raise NoHandlerMatchError("No handler for %r (file_type=%r)" % (self.abs_fn, get_file_type(self.abs_fn)))
if len(handlers_found) > 1:
logging.error("Multiple handlers for %r: %r" % (self.input_path_rel, [type(x) for x in handlers_found]))
raise MultipleHandlerMatchError()
return handlers_found[0]
class QueueItem:
def __init__(self, handler, handler_name, stage_dir=None, handler_check_result=None):
self.handler = handler
self.handler_name = handler_name
self.handler_check_result = handler_check_result
self.stage_dir = stage_dir
class FirmwareExtractor:
def __init__(self, firmware_file_or_dir):
firmware_file_or_dir = os.path.abspath(firmware_file_or_dir)
if isinstance(firmware_file_or_dir, str):
firmware_file_or_dir = firmware_file_or_dir.encode()
self.firmware_file_or_dir = firmware_file_or_dir
self.tmpdir: bytes = tempfile.mkdtemp(prefix="ANDROID_EXTRACT_").encode()
logging.info("tmpdir=%r" % self.tmpdir)
self.stage_num: int = 0
self.mounted_handlers = []
self.system_handler = None
self.vendor_handler = None
self.boot_image_handler = None
self.recovery_image_handler = None
def extract(self, output_system_tar=None, output_system_dir=None, make_world_readable=True, output_boot_img_path=None, output_recovery_img_path=None, allow_missing_vendor=False):
if output_system_dir is not None and isinstance(output_system_dir, str):
output_system_dir = output_system_dir.encode()
stage_queue = deque()
if os.path.isdir(self.firmware_file_or_dir):
handler_initial = ArchiveDirectoryHandler(self, self.firmware_file_or_dir)
stage_queue.append(QueueItem(handler=handler_initial, handler_name="handler_initial"))
else:
assert os.path.isfile(self.firmware_file_or_dir)
handler_initial = TopLevelFileHandler(self, None)
stage_dir = self.get_stage_dir("UnknownFileHandler")
stage_queue.append(QueueItem(handler=handler_initial, handler_name="handler_initial", stage_dir=stage_dir))
try:
while len(stage_queue) > 0:
queue_item = stage_queue.popleft()
handler = queue_item.handler
# self.log_extraction_step("abs_stage_dir = self.create_stage_dir(%r)" % queue_item.stage_dir)
if hasattr(handler, "extract_file2dir"):
self.create_stage_dir(queue_item.stage_dir)
self.log_extraction_step("self.create_stage_dir(%r)" % queue_item.stage_dir)
self.log_extraction_step("%s.extract_file2dir(%r)" % (queue_item.handler_name, queue_item.stage_dir))
# assert False, abs_stage_dir
handler.extract_file2dir(queue_item.stage_dir)
next_handler = ArchiveDirectoryHandler(self, queue_item.stage_dir)
next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__)
next_handler_name = "handler_%s" % next_stage_dir.decode()
self.log_extraction_step("%s = ArchiveDirectoryHandler(self, %r)" % (next_handler_name, queue_item.stage_dir))
next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir)
stage_queue.append(next_queue_item)
elif hasattr(handler, "get_next_handler"):
next_handler = handler.get_next_handler()
next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__)
next_handler_name = "handler_%s" % next_stage_dir.decode()
self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name))
next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir)
stage_queue.append(next_queue_item)
elif hasattr(handler, "get_next_handlers"):
next_handlers = handler.get_next_handlers()
for next_handler in next_handlers:
next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__)
next_handler_name = "handler_%s" % next_stage_dir.decode()
# TODO: Log
# self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name))
next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir)
stage_queue.append(next_queue_item)
elif hasattr(handler, "extract_and_get_next_handlers"):
next_handlers = handler.extract_and_get_next_handlers(queue_item.stage_dir)
for next_handler in next_handlers:
next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__)
next_handler_name = "handler_%s" % next_stage_dir.decode()
# TODO: Log
# self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name))
next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir)
stage_queue.append(next_queue_item)
elif hasattr(handler, "extract_and_get_next_handler"):
next_handler = handler.extract_and_get_next_handler(queue_item.stage_dir)
next_stage_dir = self.get_stage_dir(next_handler.__class__.__name__)
next_handler_name = "handler_%s" % next_stage_dir.decode()
self.log_extraction_step("%s = %s.get_next_handler()" % (next_handler_name, queue_item.handler_name))
next_queue_item = QueueItem(next_handler, handler_name=next_handler_name, stage_dir=next_stage_dir)
stage_queue.append(next_queue_item)
elif isinstance(handler, MountableImage) or isinstance(handler, FilesystemExtractor):
assert handler.image_type in (ImageType.SYSTEM, ImageType.VENDOR), "Bad handler.image_type %r for %r" % (handler.image_type, handler.__class__.__name__)
if handler.image_type == ImageType.SYSTEM:
assert self.system_handler is None, "Duplicate system_handler: %s:%r <=> %s:%s" % (self.system_handler.__class__.__name__, self.system_handler.abs_fn, handler.__class__.__name__, handler.abs_fn)
self.system_handler = handler
logging.info("Found system handler")
elif handler.image_type == ImageType.VENDOR:
if self.vendor_handler is not None:
logging.error("OLD: %s => %s", self.vendor_handler.__class__.__name__, self.vendor_handler.abs_fn)
subprocess.call(["file", self.vendor_handler.abs_fn])
logging.error("NEW: %s => %s", handler.__class__.__name__, handler.abs_fn)
subprocess.call(["file", handler.abs_fn])
assert self.vendor_handler is None, "Duplicate vendor_handler: %s:%r <=> %s:%s" % (self.vendor_handler.__class__.__name__, self.vendor_handler.abs_fn, handler.__class__.__name__, handler.abs_fn)
self.vendor_handler = handler
logging.info("Found vendor handler")
elif isinstance(handler, SystemDirectoryHandler):
assert self.system_handler is None
logging.info("Found system handler via SystemDirectoryHandler")
self.system_handler = handler
elif isinstance(handler, BootImageHandler):
assert self.boot_image_handler is None
self.boot_image_handler = handler
elif isinstance(handler, RecoveryImageHandler):
assert self.recovery_image_handler is None
self.recovery_image_handler = handler
else:
raise ValueError("Don't know what to do with handler %r" % handler.__class__.__name__)
logging.info("Finished Queue")
if self.system_handler is None:
logging.error("No system_handler afer finishing queue")
raise ValueError("No system_handler afer finishing queue")
if output_system_dir is None:
output_system_dir = self.create_stage_dir("system")
else:
assert output_system_tar is None, "Can only generate output_system_dir or output_system_tar"
if not output_system_dir.endswith(b'/'):
output_system_dir += b'/'
if isinstance(self.system_handler, MountableImage):
system_mountpoint = self.create_stage_dir("system_mnt")
self.system_handler.mount("system_mnt")
self.mounted_handlers.append(self.system_handler)
mounted_system_dir = system_mountpoint
# Some images have the root filesystem in the "system" partition, with /system/ just being a directory within the filesystem.
if not os.path.exists(os.path.join(mounted_system_dir, b'build.prop')):
if os.path.exists(os.path.join(mounted_system_dir, b'system', b'build.prop')):
mounted_system_dir = os.path.join(mounted_system_dir, b'system')
assert os.path.exists(os.path.join(mounted_system_dir, b'build.prop')), "Could not find build.prop in system partition"
# Append slash for correct rsync operation
if not mounted_system_dir.endswith(b'/'):
mounted_system_dir += b'/'
cmd = ["rsync", "-a", mounted_system_dir, output_system_dir]
logging.info("FirmwareExtractor.extract(): system rsync cmd: %r" % cmd)
subprocess.check_call(cmd)
elif isinstance(self.system_handler, FilesystemExtractor):
self.system_handler.extract_filesystem(output_system_dir)
# Sometimes the extracted system.img contains "system/" as a directory, not in the root of the filesystem
if (not os.path.exists(os.path.join(output_system_dir, b"build.prop"))) and \
os.path.isdir(os.path.join(output_system_dir, b"system")) and \
os.path.exists(os.path.join(output_system_dir, b"system", b"build.prop")):
os.mkdir(os.path.join(output_system_dir, b"system", b"rootfs"))
for fn in os.listdir(output_system_dir):
if fn == b'system':
continue
os.rename(os.path.join(output_system_dir, fn), os.path.join(output_system_dir, b"system", b"rootfs", fn))
os.rename(os.path.join(output_system_dir, b"system"), os.path.join(output_system_dir, b"system.tmp"))
for fn in os.listdir(os.path.join(output_system_dir, b"system.tmp")):
os.rename(os.path.join(output_system_dir, b"system.tmp", fn), os.path.join(output_system_dir, fn))
elif isinstance(self.system_handler, SystemDirectoryHandler):
system_dir_src = self.system_handler.get_system_dir()
# Append slash for correct rsync operation
if not system_dir_src.endswith(b'/'):
system_dir_src += b'/'
cmd = ["rsync", "-a", system_dir_src, output_system_dir]
logging.info("FirmwareExtractor.extract(): system rsync cmd: %r" % cmd)
subprocess.check_call(cmd)
else:
assert False, "Don't know what to do with self.system_handler type %s" % self.system_handler.__class__.__name__
output_vendor_dir = os.path.join(output_system_dir, b"vendor")
if os.path.islink(output_vendor_dir):
if self.vendor_handler is not None:
os.unlink(output_vendor_dir)
else:
assert allow_missing_vendor, "System contains vendor symlink but we didn't find a vendor paritition!"
if os.path.isdir(output_vendor_dir):
vendor_dir_contents = os.listdir(output_vendor_dir)
if self.vendor_handler is not None:
assert len(vendor_dir_contents) == 0, "sytem/vendor directory not empty: %r" % vendor_dir_contents
else:
assert not os.path.exists(output_vendor_dir), "system/vendor is not a directory and not a symlink"
if self.vendor_handler is not None:
if not os.path.exists(output_vendor_dir):
os.mkdir(output_vendor_dir)
if isinstance(self.vendor_handler, MountableImage):
vendor_mountpoint = self.create_stage_dir("vendor_mnt")
self.vendor_handler.mount("vendor_mnt")
# Append slash for correct rsync operation
if not vendor_mountpoint.endswith(b'/'):
vendor_mountpoint += b'/'
if not output_vendor_dir.endswith(b'/'):
output_vendor_dir += b'/'
self.mounted_handlers.append(self.vendor_handler)
cmd = ["rsync", "-a", vendor_mountpoint, output_vendor_dir]
logging.info("FirmwareExtractor.extract(): vendor rsync cmd: %r" % cmd)
subprocess.check_call(cmd)
elif isinstance(self.vendor_handler, FilesystemExtractor):
self.vendor_handler.extract_filesystem(output_vendor_dir)
else:
assert False, "Don't know what to do with self.vendor_handler type %s" % self.vendor_handler.__class__.__name__
if make_world_readable:
cmd = ["chmod", "-R", "a+r", output_system_dir]
logging.info("FirmwareExtractor.extract(): make readable cmd: %r" % cmd)
subprocess.check_call(cmd)
if output_system_tar is not None:
output_system_tar = os.path.abspath(output_system_tar)
cmd = ["tar", "cf", output_system_tar, "system/"]
cwd = os.path.dirname(os.path.dirname(output_system_dir)) # Double dirname since output_system_dir ends with trailing slash, ".../system/"
logging.info("FirmwareExtractor.extract(): system tar cmd: %r cwd=%r" % (cmd, cwd))
subprocess.check_call(cmd, cwd=cwd)
if output_boot_img_path is not None and self.boot_image_handler is not None:
with open(output_boot_img_path, 'wb') as f:
self.boot_image_handler.write_image(f)
if output_recovery_img_path is not None and self.recovery_image_handler is not None:
with open(output_recovery_img_path, 'wb') as f:
self.recovery_image_handler.write_image(f)
finally:
self.cleanup()
def cleanup(self):
assert b'ANDROID_EXTRACT_' in self.tmpdir
for handler in self.mounted_handlers:
# noinspection PyBroadException
try:
handler.umount()
except Exception:
logging.exception("Unmounting exception")
self.mounted_handlers = []
if os.path.exists(self.tmpdir):
subprocess.call(["rm", "-rf", self.tmpdir])
# noinspection PyMethodMayBeStatic
def log_extraction_step(self, extraction_step):
logging.info("EXTRACTION_STEP: %s" % extraction_step)
def get_stage_dir(self, stage_name):
result = ("stage_%d_%s" % (self.stage_num, stage_name))
self.stage_num += 1
return result.encode()
def create_stage_dir(self, stage_dir):
if isinstance(stage_dir, str):
stage_dir = stage_dir.encode()
abs_dir = os.path.join(self.tmpdir, stage_dir)
os.mkdir(abs_dir)
return abs_dir
def abs_fn(self, input_path_rel) -> bytes:
if input_path_rel is None:
return self.firmware_file_or_dir
if isinstance(input_path_rel, str):
input_path_rel = input_path_rel.encode()
assert isinstance(input_path_rel, bytes)
assert not input_path_rel.startswith(b'/')
return os.path.join(self.tmpdir, input_path_rel)
def rel_path(self, abs_path):
assert isinstance(abs_path, bytes)
assert abs_path.startswith(b'/')
assert abs_path.startswith(self.tmpdir)
path = abs_path[len(self.tmpdir):]
while path.startswith(b'/'):
path = path[1:]
return path
def get_file_type(abs_fn):
file_output = subprocess.check_output(["file", "-"], stdin=open(abs_fn, 'rb'))
assert file_output.startswith(b"/dev/stdin:")
return file_output[len(b"/dev/stdin:"):].strip().decode()
if __name__ == "__main__":
main()