diff --git a/liblp.py b/liblp.py new file mode 100755 index 0000000..c1c5e87 --- /dev/null +++ b/liblp.py @@ -0,0 +1,282 @@ +#!/usr/bin/python3 + +# This file is part of Extractor. + +# Copyright (C) 2021 Security Research Labs GmbH +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import BinaryIO, List, Dict, Union +import os +import copy +import sys +import mmap +import hashlib +from construct import Struct, Int32ul, Int16ul, Int64ul, Bytes +from construct_typing import TypedContainer + + +def main(): + super_img = SuperImage(sys.argv[1]) + print("PARTITIONS: %r" % super_img.get_partition_names()) + for partition_name in super_img.get_partition_names(): + if partition_name in ("system", "vendor", "product"): + with open("%s.img" % partition_name, 'wb') as f: + super_img.write_partition(partition_name, f) + + +def check_magic(f: BinaryIO): + buf = f.read(4096) + if buf != b'\0' * 4096: + return False + buf = f.read(4) + return buf == b'\x67\x44\x6c\x61' + + +class SuperImage: + """ + Class to parse a liblp super image. + More details at + https://android.googlesource.com/platform/system/core + => fs_mgr/liblp/include/liblp/metadata_format.h + """ + filename: str + file_size: int + fh: BinaryIO + mmap: mmap.mmap + geometry: "LpMetadataGeometry" + metadata_header: "LpMetadataHeader" + partitions: List["LpMetadataPartition"] + extents: List["LpMetadataExtent"] + block_device: "LpMetadataBlockDevice" + partition_name_to_nr: Dict[str, int] + + def __init__(self, filename: Union[str, bytes]): + self.filename = filename + self.file_size = os.stat(self.filename).st_size + self.fh = open(self.filename, 'rb') + self.mmap = mmap.mmap(self.fh.fileno(), 0, access=mmap.ACCESS_READ) + # Read and validate LpMetadataGeometry + lmg = LpMetadataGeometry.parse(self.mmap[0x1000:0x1000 + LpMetadataGeometry.sizeof()]) + lmg.validate() + lmg_copy = LpMetadataGeometry.parse(self.mmap[0x2000:0x2000 + LpMetadataGeometry.sizeof()]) + lmg_copy.validate() + assert lmg == lmg_copy + # Read and validate + self.metadata_header = LpMetadataHeader.parse(self.mmap[0x3000:0x3000 + LpMetadataHeader.sizeof()]) + self.metadata_header.validate() + table_data = self.mmap[0x3000 + self.metadata_header.header_size:0x3000 + self.metadata_header.header_size + self.metadata_header.tables_size] + self.metadata_header.validate_table_data(table_data) + # Read partitions from table_data + self.partition_name_to_nr = {} + self.partitions = [] + for partition_nr in range(self.metadata_header.partitions.num_entries): + pos = self.metadata_header.partitions.offset + partition_nr * LpMetadataPartition.sizeof() + partition = LpMetadataPartition.parse(table_data[pos:pos + LpMetadataPartition.sizeof()]) + print("partition %d: %r" % (partition_nr, partition)) + self.partitions.append(partition) + assert partition.get_name() not in self.partition_name_to_nr, "Duplicate partition %r" % partition.get_name() + self.partition_name_to_nr[partition.get_name()] = partition_nr + # Read extents from table_data + self.extents = [] + for extent_nr in range(self.metadata_header.extents.num_entries): + pos = self.metadata_header.extents.offset + extent_nr * LpMetadataExtent.sizeof() + extent = LpMetadataExtent.parse(table_data[pos:pos + LpMetadataExtent.sizeof()]) + print("Extent %d: %r" % (extent_nr, extent)) + self.extents.append(extent) + # Read block devices from table_data + assert self.metadata_header.block_devices.num_entries == 1, "Not exactly one block device: self.metadata_header.block_devices.num_entries=%r" % self.metadata_header.block_devices.num_entries + pos = self.metadata_header.block_devices.offset + self.block_device = LpMetadataBlockDevice.parse(table_data[pos:pos + LpMetadataBlockDevice.sizeof()]) + assert self.block_device.alignment % 512 == 0 + + def get_partition_names(self) -> List[str]: + return [partition.get_name() for partition in self.partitions] + + def close(self): + self.mmap.close() + self.fh.close() + + def write_partition(self, partition_name: str, f: BinaryIO): + partition_nr = self.partition_name_to_nr[partition_name] + partition = self.partitions[partition_nr] + assert partition.num_extents == 1, "Not exactly one extent: %d" % partition.num_extents + extent = self.extents[partition.first_extent_index] + assert extent.target_source == 0 + assert extent.target_type == 0 # LP_TARGET_TYPE_LINEAR + start_pos = extent.target_data * 512 + assert start_pos % self.block_device.alignment == 0, "Alignment error: start_pos=%d self.block_device.alignment=%d offset=%r" % (start_pos, self.block_device.alignment, start_pos % self.block_device.alignment) + end_pos = start_pos + extent.num_sectors * 512 + pos = start_pos + while pos < end_pos: + next_pos = min(end_pos, pos + 1024**2) + f.write(self.mmap[pos:next_pos]) + pos = next_pos + + +class LpMetadataGeometry(TypedContainer): + magic: int + struct_size: int + checksum: bytes + metadata_max_size: int + metadata_slot_count: int + logical_block_size: int + # noinspection PyUnresolvedReferences + construct_struct = Struct( + "magic" / Int32ul, + "struct_size" / Int32ul, + "checksum" / Bytes(32), + "metadata_max_size" / Int32ul, + "metadata_slot_count" / Int32ul, + "logical_block_size" / Int32ul, + ) + + def validate(self): + assert self.magic == 0x616c4467 + assert self.struct_size == LpMetadataGeometry.sizeof() + tmp = copy.copy(self) + tmp.checksum = b'\0' * 32 + tmp_encoded = tmp.build() + digest = hashlib.sha256(tmp_encoded).digest() + assert self.checksum == digest + + +assert LpMetadataGeometry.sizeof() == 52 + + +class LpMetadataTableDescriptor(TypedContainer): + offset: int + num_entries: int + entry_size: int + # noinspection PyUnresolvedReferences + construct_struct = Struct( + "offset" / Int32ul, + "num_entries" / Int32ul, + "entry_size" / Int32ul + ) + + +assert LpMetadataTableDescriptor.sizeof() == 12 + + +class LpMetadataHeader(TypedContainer): + magic: int + major_version: int + minor_version: int + header_size: int + header_checksum: bytes + tables_size: int + tables_checksum: bytes + partitions: LpMetadataTableDescriptor + extents: LpMetadataTableDescriptor + groups: LpMetadataTableDescriptor + block_devices: LpMetadataTableDescriptor + # flags: int + # reserved: bytes + # noinspection PyUnresolvedReferences + construct_struct = Struct( + "magic" / Int32ul, + "major_version" / Int16ul, + "minor_version" / Int16ul, + "header_size" / Int32ul, + "header_checksum" / Bytes(32), + "tables_size" / Int32ul, + "tables_checksum" / Bytes(32), + "partitions" / LpMetadataTableDescriptor.as_inner_type(), + "extents" / LpMetadataTableDescriptor.as_inner_type(), + "groups" / LpMetadataTableDescriptor.as_inner_type(), + "block_devices" / LpMetadataTableDescriptor.as_inner_type() + ) + + def validate(self): + assert self.magic == 0x414C5030 + assert self.header_size == LpMetadataHeader.sizeof(), "Bad LpMetadataHeader.header_size %d, should be %d" % (self.header_size, LpMetadataHeader.sizeof()) + tmp = copy.copy(self) + tmp.header_checksum = b'\0' * 32 + tmp_encoded = tmp.build() + digest = hashlib.sha256(tmp_encoded).digest() + assert self.header_checksum == digest + assert self.partitions.entry_size == LpMetadataPartition.sizeof(), "Bad LpMetadataHeader.partitions.entry_size %d, should be %d" % (self.partitions.entry_size, LpMetadataPartition.sizeof()) + assert self.extents.entry_size == LpMetadataExtent.sizeof(), "Bad LpMetadataHeader.extents.entry_size %d, should be %d" % (self.extents.entry_size, LpMetadataExtent.sizeof()) + assert self.tables_size < 1e6 + + def validate_table_data(self, buf: bytes): + assert len(buf) == self.tables_size + digest = hashlib.sha256(buf).digest() + assert self.tables_checksum == digest + + +class LpMetadataPartition(TypedContainer): + name: bytes + attributes: int + first_extent_index: int + num_extents: int + group_index: int + # noinspection PyUnresolvedReferences + construct_struct = Struct( + "name" / Bytes(36), + "attributes" / Int32ul, + "first_extent_index" / Int32ul, + "num_extents" / Int32ul, + "group_index" / Int32ul + ) + + def get_name(self) -> str: + return self.name.rstrip(b'\0').decode() + + +assert LpMetadataPartition.sizeof() == 52 + + +class LpMetadataExtent(TypedContainer): + num_sectors: int + target_type: int + target_data: int + target_source: int + # noinspection PyUnresolvedReferences + construct_struct = Struct( + "num_sectors" / Int64ul, + "target_type" / Int32ul, + "target_data" / Int64ul, + "target_source" / Int32ul, + ) + + +assert LpMetadataExtent.sizeof() == 24 + + +class LpMetadataBlockDevice(TypedContainer): + first_logical_sector: int + alignment: int + alignment_offset: int + size: int + partition_name: bytes + flags: int + # noinspection PyUnresolvedReferences + construct_struct = Struct( + "first_logical_sector" / Int64ul, + "alignment" / Int32ul, + "alignment_offset" / Int32ul, + "size" / Int64ul, + "partition_name" / Bytes(36), + "flags" / Int32ul + ) + + +assert LpMetadataBlockDevice.sizeof() == 64 + + +if __name__ == "__main__": + main()