TIK/ext4.py

980 lines
40 KiB
Python

import ctypes
import functools
import io
import math
import queue
def wcscmp(str_a, str_b):
for a, b in zip(str_a, str_b):
tmp = ord(a) - ord(b)
if tmp != 0:
return -1 if tmp < 0 else 1
tmp = len(str_a) - len(str_b)
return -1 if tmp < 0 else 1 if tmp > 0 else 0
class Ext4Error(Exception):
pass
class BlockMapError(Ext4Error):
pass
class EndOfStreamError(Ext4Error):
pass
class MagicError(Ext4Error):
pass
# ----------------------------- LOW LEVEL ------------------------------
class ext4_struct(ctypes.LittleEndianStructure):
def __getattr__(self, name):
try:
# Combining *_lo and *_hi fields
lo_field = ctypes.LittleEndianStructure.__getattribute__(type(self), name + "_lo")
size = lo_field.size
lo = lo_field.__get__(self)
hi = ctypes.LittleEndianStructure.__getattribute__(self, name + "_hi")
return (hi << (8 * size)) | lo
except AttributeError:
return ctypes.LittleEndianStructure.__getattribute__(self, name)
def __setattr__(self, name, value):
try:
# Combining *_lo and *_hi fields
lo_field = ctypes.LittleEndianStructure.__getattribute__(type(self), name + "_lo")
size = lo_field.size
lo_field.__set__(self, value & ((1 << (8 * size)) - 1))
ctypes.LittleEndianStructure.__setattr__(self, name + "_hi", value >> (8 * size))
except AttributeError:
ctypes.LittleEndianStructure.__setattr__(self, name, value)
class ext4_dir_entry_2(ext4_struct):
_fields_ = [
("inode", ctypes.c_uint), # 0x0
("rec_len", ctypes.c_ushort), # 0x4
("name_len", ctypes.c_ubyte), # 0x6
("file_type", ctypes.c_ubyte) # 0x7
# Variable length field "name" missing at 0x8
]
def _from_buffer_copy(raw, offset=0, platform64=True):
struct = ext4_dir_entry_2.from_buffer_copy(raw, offset)
struct.name = raw[offset + 0x8: offset + 0x8 + struct.name_len]
return struct
class ext4_extent(ext4_struct):
_fields_ = [
("ee_block", ctypes.c_uint), # 0x0000
("ee_len", ctypes.c_ushort), # 0x0004
("ee_start_hi", ctypes.c_ushort), # 0x0006
("ee_start_lo", ctypes.c_uint) # 0x0008
]
class ext4_extent_header(ext4_struct):
_fields_ = [
("eh_magic", ctypes.c_ushort), # 0x0000, Must be 0xF30A
("eh_entries", ctypes.c_ushort), # 0x0002
("eh_max", ctypes.c_ushort), # 0x0004
("eh_depth", ctypes.c_ushort), # 0x0006
("eh_generation", ctypes.c_uint) # 0x0008
]
class ext4_extent_idx(ext4_struct):
_fields_ = [
("ei_block", ctypes.c_uint), # 0x0000
("ei_leaf_lo", ctypes.c_uint), # 0x0004
("ei_leaf_hi", ctypes.c_ushort), # 0x0008
("ei_unused", ctypes.c_ushort) # 0x000A
]
class ext4_group_descriptor(ext4_struct):
_fields_ = [
("bg_block_bitmap_lo", ctypes.c_uint), # 0x0000
("bg_inode_bitmap_lo", ctypes.c_uint), # 0x0004
("bg_inode_table_lo", ctypes.c_uint), # 0x0008
("bg_free_blocks_count_lo", ctypes.c_ushort), # 0x000C
("bg_free_inodes_count_lo", ctypes.c_ushort), # 0x000E
("bg_used_dirs_count_lo", ctypes.c_ushort), # 0x0010
("bg_flags", ctypes.c_ushort), # 0x0012
("bg_exclude_bitmap_lo", ctypes.c_uint), # 0x0014
("bg_block_bitmap_csum_lo", ctypes.c_ushort), # 0x0018
("bg_inode_bitmap_csum_lo", ctypes.c_ushort), # 0x001A
("bg_itable_unused_lo", ctypes.c_ushort), # 0x001C
("bg_checksum", ctypes.c_ushort), # 0x001E
# 64-bit fields
("bg_block_bitmap_hi", ctypes.c_uint), # 0x0020
("bg_inode_bitmap_hi", ctypes.c_uint), # 0x0024
("bg_inode_table_hi", ctypes.c_uint), # 0x0028
("bg_free_blocks_count_hi", ctypes.c_ushort), # 0x002C
("bg_free_inodes_count_hi", ctypes.c_ushort), # 0x002E
("bg_used_dirs_count_hi", ctypes.c_ushort), # 0x0030
("bg_itable_unused_hi", ctypes.c_ushort), # 0x0032
("bg_exclude_bitmap_hi", ctypes.c_uint), # 0x0034
("bg_block_bitmap_csum_hi", ctypes.c_ushort), # 0x0038
("bg_inode_bitmap_csum_hi", ctypes.c_ushort), # 0x003A
("bg_reserved", ctypes.c_uint), # 0x003C
]
@staticmethod
def _from_buffer_copy(raw, platform64=True):
struct = ext4_group_descriptor.from_buffer_copy(raw)
if not platform64:
struct.bg_block_bitmap_hi = 0
struct.bg_inode_bitmap_hi = 0
struct.bg_inode_table_hi = 0
struct.bg_free_blocks_count_hi = 0
struct.bg_free_inodes_count_hi = 0
struct.bg_used_dirs_count_hi = 0
struct.bg_itable_unused_hi = 0
struct.bg_exclude_bitmap_hi = 0
struct.bg_block_bitmap_csum_hi = 0
struct.bg_inode_bitmap_csum_hi = 0
struct.bg_reserved = 0
return struct
class ext4_inode(ext4_struct):
EXT2_GOOD_OLD_INODE_SIZE = 128
# Every field passing 128 bytes is "additional data", whose size is specified by i_extra_isize.
# i_mode
S_IXOTH = 0x1 # Others can execute
S_IWOTH = 0x2 # Others can write
S_IROTH = 0x4 # Others can read
S_IXGRP = 0x8 # Group can execute
S_IWGRP = 0x10 # Group can write
S_IRGRP = 0x20 # Group can read
S_IXUSR = 0x40 # Owner can execute
S_IWUSR = 0x80 # Owner can write
S_IRUSR = 0x100 # Owner can read
S_ISVTX = 0x200 # Sticky bit (only owner can delete)
S_ISGID = 0x400 # Set GID (execute with privileges of group owner of the file's group)
S_ISUID = 0x800 # Set UID (execute with privileges of the file's owner)
S_IFIFO = 0x1000 # FIFO device (named pipe)
S_IFCHR = 0x2000 # Character device (raw, unbuffered, aligned, direct access to hardware storage)
S_IFDIR = 0x4000 # Directory
S_IFBLK = 0x6000 # Block device (buffered, arbitrary access to storage)
S_IFREG = 0x8000 # Regular file
S_IFLNK = 0xA000 # Symbolic link
S_IFSOCK = 0xC000 # Socket
# i_flags
EXT4_INDEX_FL = 0x1000 # Uses hash trees
EXT4_EXTENTS_FL = 0x80000 # Uses extents
EXT4_EA_INODE_FL = 0x200000 # Inode stores large xattr
EXT4_INLINE_DATA_FL = 0x10000000 # Has inline data
_fields_ = [
("i_mode", ctypes.c_ushort), # 0x0000
("i_uid_lo", ctypes.c_ushort), # 0x0002, Originally named i_uid
("i_size_lo", ctypes.c_uint), # 0x0004
("i_atime", ctypes.c_uint), # 0x0008
("i_ctime", ctypes.c_uint), # 0x000C
("i_mtime", ctypes.c_uint), # 0x0010
("i_dtime", ctypes.c_uint), # 0x0014
("i_gid_lo", ctypes.c_ushort), # 0x0018, Originally named i_gid
("i_links_count", ctypes.c_ushort), # 0x001A
("i_blocks_lo", ctypes.c_uint), # 0x001C
("i_flags", ctypes.c_uint), # 0x0020
("osd1", ctypes.c_uint), # 0x0024
("i_block", ctypes.c_uint * 15), # 0x0028
("i_generation", ctypes.c_uint), # 0x0064
("i_file_acl_lo", ctypes.c_uint), # 0x0068
("i_size_hi", ctypes.c_uint), # 0x006C, Originally named i_size_high
("i_obso_faddr", ctypes.c_uint), # 0x0070
("i_osd2_blocks_high", ctypes.c_ushort), # 0x0074, Originally named i_osd2.linux2.l_i_blocks_high
("i_file_acl_hi", ctypes.c_ushort), # 0x0076, Originally named i_osd2.linux2.l_i_file_acl_high
("i_uid_hi", ctypes.c_ushort), # 0x0078, Originally named i_osd2.linux2.l_i_uid_high
("i_gid_hi", ctypes.c_ushort), # 0x007A, Originally named i_osd2.linux2.l_i_gid_high
("i_osd2_checksum_lo", ctypes.c_ushort), # 0x007C, Originally named i_osd2.linux2.l_i_checksum_lo
("i_osd2_reserved", ctypes.c_ushort), # 0x007E, Originally named i_osd2.linux2.l_i_reserved
("i_extra_isize", ctypes.c_ushort), # 0x0080
("i_checksum_hi", ctypes.c_ushort), # 0x0082
("i_ctime_extra", ctypes.c_uint), # 0x0084
("i_mtime_extra", ctypes.c_uint), # 0x0088
("i_atime_extra", ctypes.c_uint), # 0x008C
("i_crtime", ctypes.c_uint), # 0x0090
("i_crtime_extra", ctypes.c_uint), # 0x0094
("i_version_hi", ctypes.c_uint), # 0x0098
("i_projid", ctypes.c_uint), # 0x009C
]
class ext4_superblock(ext4_struct):
EXT2_DESC_SIZE = 0x20 # Default value for s_desc_size, if INCOMPAT_64BIT is not set (NEEDS CONFIRMATION)
EXT2_MIN_DESC_SIZE = 0x20
EXT2_MIN_DESC_SIZE_64BIT = 0x40
# s_feature_incompat
INCOMPAT_64BIT = 0x80 # Uses 64-bit features (e.g. *_hi structure fields in ext4_group_descriptor)
INCOMPAT_32BIT = 0x66
INCOMPAT_FILETYPE = 0x2 # Directory entries record file type (instead of inode flags)
_fields_ = [
("s_inodes_count", ctypes.c_uint), # 0x0000
("s_blocks_count_lo", ctypes.c_uint), # 0x0004
("s_r_blocks_count_lo", ctypes.c_uint), # 0x0008
("s_free_blocks_count_lo", ctypes.c_uint), # 0x000C
("s_free_inodes_count", ctypes.c_uint), # 0x0010
("s_first_data_block", ctypes.c_uint), # 0x0014
("s_log_block_size", ctypes.c_uint), # 0x0018
("s_log_cluster_size", ctypes.c_uint), # 0x001C
("s_blocks_per_group", ctypes.c_uint), # 0x0020
("s_clusters_per_group", ctypes.c_uint), # 0x0024
("s_inodes_per_group", ctypes.c_uint), # 0x0028
("s_mtime", ctypes.c_uint), # 0x002C
("s_wtime", ctypes.c_uint), # 0x0030
("s_mnt_count", ctypes.c_ushort), # 0x0034
("s_max_mnt_count", ctypes.c_ushort), # 0x0036
("s_magic", ctypes.c_ushort), # 0x0038, Must be 0xEF53
("s_state", ctypes.c_ushort), # 0x003A
("s_errors", ctypes.c_ushort), # 0x003C
("s_minor_rev_level", ctypes.c_ushort), # 0x003E
("s_lastcheck", ctypes.c_uint), # 0x0040
("s_checkinterval", ctypes.c_uint), # 0x0044
("s_creator_os", ctypes.c_uint), # 0x0048
("s_rev_level", ctypes.c_uint), # 0x004C
("s_def_resuid", ctypes.c_ushort), # 0x0050
("s_def_resgid", ctypes.c_ushort), # 0x0052
("s_first_ino", ctypes.c_uint), # 0x0054
("s_inode_size", ctypes.c_ushort), # 0x0058
("s_block_group_nr", ctypes.c_ushort), # 0x005A
("s_feature_compat", ctypes.c_uint), # 0x005C
("s_feature_incompat", ctypes.c_uint), # 0x0060
("s_feature_ro_compat", ctypes.c_uint), # 0x0064
("s_uuid", ctypes.c_ubyte * 16), # 0x0068
("s_volume_name", ctypes.c_char * 16), # 0x0078
("s_last_mounted", ctypes.c_char * 64), # 0x0088
("s_algorithm_usage_bitmap", ctypes.c_uint), # 0x00C8
("s_prealloc_blocks", ctypes.c_ubyte), # 0x00CC
("s_prealloc_dir_blocks", ctypes.c_ubyte), # 0x00CD
("s_reserved_gdt_blocks", ctypes.c_ushort), # 0x00CE
("s_journal_uuid", ctypes.c_ubyte * 16), # 0x00D0
("s_journal_inum", ctypes.c_uint), # 0x00E0
("s_journal_dev", ctypes.c_uint), # 0x00E4
("s_last_orphan", ctypes.c_uint), # 0x00E8
("s_hash_seed", ctypes.c_uint * 4), # 0x00EC
("s_def_hash_version", ctypes.c_ubyte), # 0x00FC
("s_jnl_backup_type", ctypes.c_ubyte), # 0x00FD
("s_desc_size", ctypes.c_ushort), # 0x00FE
("s_default_mount_opts", ctypes.c_uint), # 0x0100
("s_first_meta_bg", ctypes.c_uint), # 0x0104
("s_mkfs_time", ctypes.c_uint), # 0x0108
("s_jnl_blocks", ctypes.c_uint * 17), # 0x010C
# 64-bit fields
("s_blocks_count_hi", ctypes.c_uint), # 0x0150
("s_r_blocks_count_hi", ctypes.c_uint), # 0x0154
("s_free_blocks_count_hi", ctypes.c_uint), # 0x0158
("s_min_extra_isize", ctypes.c_ushort), # 0x015C
("s_want_extra_isize", ctypes.c_ushort), # 0x015E
("s_flags", ctypes.c_uint), # 0x0160
("s_raid_stride", ctypes.c_ushort), # 0x0164
("s_mmp_interval", ctypes.c_ushort), # 0x0166
("s_mmp_block", ctypes.c_ulonglong), # 0x0168
("s_raid_stripe_width", ctypes.c_uint), # 0x0170
("s_log_groups_per_flex", ctypes.c_ubyte), # 0x0174
("s_checksum_type", ctypes.c_ubyte), # 0x0175
("s_reserved_pad", ctypes.c_ushort), # 0x0176
("s_kbytes_written", ctypes.c_ulonglong), # 0x0178
("s_snapshot_inum", ctypes.c_uint), # 0x0180
("s_snapshot_id", ctypes.c_uint), # 0x0184
("s_snapshot_r_blocks_count", ctypes.c_ulonglong), # 0x0188
("s_snapshot_list", ctypes.c_uint), # 0x0190
("s_error_count", ctypes.c_uint), # 0x0194
("s_first_error_time", ctypes.c_uint), # 0x0198
("s_first_error_ino", ctypes.c_uint), # 0x019C
("s_first_error_block", ctypes.c_ulonglong), # 0x01A0
("s_first_error_func", ctypes.c_ubyte * 32), # 0x01A8
("s_first_error_line", ctypes.c_uint), # 0x01C8
("s_last_error_time", ctypes.c_uint), # 0x01CC
("s_last_error_ino", ctypes.c_uint), # 0x01D0
("s_last_error_line", ctypes.c_uint), # 0x01D4
("s_last_error_block", ctypes.c_ulonglong), # 0x01D8
("s_last_error_func", ctypes.c_ubyte * 32), # 0x01E0
("s_mount_opts", ctypes.c_ubyte * 64), # 0x0200
("s_usr_quota_inum", ctypes.c_uint), # 0x0240
("s_grp_quota_inum", ctypes.c_uint), # 0x0244
("s_overhead_blocks", ctypes.c_uint), # 0x0248
("s_backup_bgs", ctypes.c_uint * 2), # 0x024C
("s_encrypt_algos", ctypes.c_ubyte * 4), # 0x0254
("s_encrypt_pw_salt", ctypes.c_ubyte * 16), # 0x0258
("s_lpf_ino", ctypes.c_uint), # 0x0268
("s_prj_quota_inum", ctypes.c_uint), # 0x026C
("s_checksum_seed", ctypes.c_uint), # 0x0270
("s_reserved", ctypes.c_uint * 98), # 0x0274
("s_checksum", ctypes.c_uint) # 0x03FC
]
@staticmethod
def _from_buffer_copy(raw, platform64=True):
struct = ext4_superblock.from_buffer_copy(raw)
if not platform64:
struct.s_blocks_count_hi = 0
struct.s_r_blocks_count_hi = 0
struct.s_free_blocks_count_hi = 0
struct.s_min_extra_isize = 0
struct.s_want_extra_isize = 0
struct.s_flags = 0
struct.s_raid_stride = 0
struct.s_mmp_interval = 0
struct.s_mmp_block = 0
struct.s_raid_stripe_width = 0
struct.s_log_groups_per_flex = 0
struct.s_checksum_type = 0
struct.s_reserved_pad = 0
struct.s_kbytes_written = 0
struct.s_snapshot_inum = 0
struct.s_snapshot_id = 0
struct.s_snapshot_r_blocks_count = 0
struct.s_snapshot_list = 0
struct.s_error_count = 0
struct.s_first_error_time = 0
struct.s_first_error_ino = 0
struct.s_first_error_block = 0
struct.s_first_error_func = 0
struct.s_first_error_line = 0
struct.s_last_error_time = 0
struct.s_last_error_ino = 0
struct.s_last_error_line = 0
struct.s_last_error_block = 0
struct.s_last_error_func = 0
struct.s_mount_opts = 0
struct.s_usr_quota_inum = 0
struct.s_grp_quota_inum = 0
struct.s_overhead_blocks = 0
struct.s_backup_bgs = 0
struct.s_encrypt_algos = 0
struct.s_encrypt_pw_salt = 0
struct.s_lpf_ino = 0
struct.s_prj_quota_inum = 0
struct.s_checksum_seed = 0
struct.s_reserved = 0
struct.s_checksum = 0
# if (struct.s_feature_incompat & ext4_superblock.INCOMPAT_64BIT) == 0:
# struct.s_desc_size = ext4_superblock.EXT2_DESC_SIZE
if struct.s_desc_size == 0:
if (struct.s_feature_incompat & ext4_superblock.INCOMPAT_64BIT) == 0:
struct.s_desc_size = ext4_superblock.EXT2_MIN_DESC_SIZE
else:
struct.s_desc_size = ext4_superblock.EXT2_MIN_DESC_SIZE_64BIT
return struct
class ext4_xattr_entry(ext4_struct):
_fields_ = [
("e_name_len", ctypes.c_ubyte), # 0x00
("e_name_index", ctypes.c_ubyte), # 0x01
("e_value_offs", ctypes.c_ushort), # 0x02
("e_value_inum", ctypes.c_uint), # 0x04
("e_value_size", ctypes.c_uint), # 0x08
("e_hash", ctypes.c_uint) # 0x0C
# Variable length field "e_name" missing at 0x10
]
def _from_buffer_copy(raw, offset=0, platform64=True):
struct = ext4_xattr_entry.from_buffer_copy(raw, offset)
struct.e_name = raw[offset + 0x10: offset + 0x10 + struct.e_name_len]
return struct
@property
def _size(self): return 4 * ((ctypes.sizeof(type(self)) + self.e_name_len + 3) // 4) # 4-byte alignment
class ext4_xattr_header(ext4_struct):
_fields_ = [
("h_magic", ctypes.c_uint), # 0x0, Must be 0xEA020000
("h_refcount", ctypes.c_uint), # 0x4
("h_blocks", ctypes.c_uint), # 0x8
("h_hash", ctypes.c_uint), # 0xC
("h_checksum", ctypes.c_uint), # 0x10
("h_reserved", ctypes.c_uint * 3), # 0x14
]
class ext4_xattr_ibody_header(ext4_struct):
_fields_ = [
("h_magic", ctypes.c_uint) # 0x0, Must be 0xEA020000
]
class InodeType:
UNKNOWN = 0x0 # Unknown file type
FILE = 0x1 # Regular file
DIRECTORY = 0x2 # Directory
CHARACTER_DEVICE = 0x3 # Character device
BLOCK_DEVICE = 0x4 # Block device
FIFO = 0x5 # FIFO
SOCKET = 0x6 # Socket
SYMBOLIC_LINK = 0x7 # Symbolic link
CHECKSUM = 0xDE # Checksum entry; not really a file type, but a type of directory entry
# ----------------------------- HIGH LEVEL ------------------------------
class MappingEntry:
def __init__(self, file_block_idx, disk_block_idx, block_count=1):
self.file_block_idx = file_block_idx
self.disk_block_idx = disk_block_idx
self.block_count = block_count
def __iter__(self):
yield self.file_block_idx
yield self.disk_block_idx
yield self.block_count
def __repr__(self):
return "{type:s}({file_block_idx!r:s}, {disk_block_idx!r:s}, {blocK_count!r:s})".format(
blocK_count=self.block_count,
disk_block_idx=self.disk_block_idx,
file_block_idx=self.file_block_idx,
type=type(self).__name__
)
def copy(self):
return MappingEntry(self.file_block_idx, self.disk_block_idx, self.block_count)
def create_mapping(*entries):
file_block_idx = 0
result = [None] * len(entries)
for i, entry in enumerate(entries):
disk_block_idx, block_count = entry
result[i] = MappingEntry(file_block_idx, disk_block_idx, block_count)
file_block_idx += block_count
return result
def optimize(entries):
entries.sort(key=lambda entry: entry.file_block_idx)
idx = 0
while idx < len(entries):
while idx + 1 < len(entries) \
and entries[idx].file_block_idx + entries[idx].block_count == entries[idx + 1].file_block_idx \
and entries[idx].disk_block_idx + entries[idx].block_count == entries[idx + 1].disk_block_idx:
tmp = entries.pop(idx + 1)
entries[idx].block_count += tmp.block_count
idx += 1
class Volume:
ROOT_INODE = 2
def __init__(self, stream, offset=0, ignore_flags=False, ignore_magic=False):
self.ignore_flags = ignore_flags
self.ignore_magic = ignore_magic
self.offset = offset
self.platform64 = True # Initial value needed for Volume.read_struct
self.stream = stream
# Superblock
self.superblock = self.read_struct(ext4_superblock, 0x400)
self.platform64 = (self.superblock.s_feature_incompat & ext4_superblock.INCOMPAT_64BIT) != 0
if not ignore_magic and self.superblock.s_magic != 0xEF53:
raise MagicError("Invalid magic value in superblock: 0x{magic:04X} (expected 0xEF53)".format(
magic=self.superblock.s_magic))
# Group descriptors
self.group_descriptors = [None] * (self.superblock.s_inodes_count // self.superblock.s_inodes_per_group)
group_desc_table_offset = (0x400 // self.block_size + 1) * self.block_size # First block after superblock
for group_desc_idx in range(len(self.group_descriptors)):
group_desc_offset = group_desc_table_offset + group_desc_idx * self.superblock.s_desc_size
self.group_descriptors[group_desc_idx] = self.read_struct(ext4_group_descriptor, group_desc_offset)
def __repr__(self):
return "{type_name:s}(volume_name = {volume_name!r:s}, uuid = {uuid!r:s}, last_mounted = {last_mounted!r:s})".format(
last_mounted=self.superblock.s_last_mounted,
type_name=type(self).__name__,
uuid=self.uuid,
volume_name=self.superblock.s_volume_name
)
@property
def block_size(self):
return 1 << (10 + self.superblock.s_log_block_size)
def get_inode(self, inode_idx, file_type=InodeType.UNKNOWN):
group_idx, inode_table_entry_idx = self.get_inode_group(inode_idx)
inode_table_offset = self.group_descriptors[group_idx].bg_inode_table * self.block_size
inode_offset = inode_table_offset + inode_table_entry_idx * self.superblock.s_inode_size
return Inode(self, inode_offset, inode_idx, file_type)
def get_inode_group(self, inode_idx):
group_idx = (inode_idx - 1) // self.superblock.s_inodes_per_group
inode_table_entry_idx = (inode_idx - 1) % self.superblock.s_inodes_per_group
return group_idx, inode_table_entry_idx
def read(self, offset, byte_len):
if self.offset + offset != self.stream.tell():
self.stream.seek(self.offset + offset, io.SEEK_SET)
return self.stream.read(byte_len)
def read_struct(self, structure, offset, platform64=None):
raw = self.read(offset, ctypes.sizeof(structure))
if hasattr(structure, "_from_buffer_copy"):
return structure._from_buffer_copy(raw, platform64=platform64 if platform64 else self.platform64)
else:
return structure.from_buffer_copy(raw)
@property
def root(self):
return self.get_inode(Volume.ROOT_INODE, InodeType.DIRECTORY)
@property
def uuid(self):
uuid = self.superblock.s_uuid
uuid = [uuid[:4], uuid[4: 6], uuid[6: 8], uuid[8: 10], uuid[10:]]
return "-".join("".join("{0:02X}".format(c) for c in part) for part in uuid)
class Inode:
def __init__(self, volume, offset, inode_idx, file_type=InodeType.UNKNOWN):
self.inode_idx = inode_idx
self.offset = offset
self.volume = volume
self.file_type = file_type
self.inode = volume.read_struct(ext4_inode, offset)
def __len__(self):
return self.inode.i_size
def __repr__(self):
if self.inode_idx is not None:
return "{type_name:s}(inode_idx = {inode!r:s}, offset = 0x{offset:X}, volume_uuid = {uuid!r:s})".format(
inode=self.inode_idx,
offset=self.offset,
type_name=type(self).__name__,
uuid=self.volume.uuid
)
else:
return "{type_name:s}(offset = 0x{offset:X}, volume_uuid = {uuid!r:s})".format(
offset=self.offset,
type_name=type(self).__name__,
uuid=self.volume.uuid
)
def _parse_xattrs(self, raw_data, offset, prefix_override: dict = None):
prefixes = {
0: "",
1: "user.",
2: "system.posix_acl_access",
3: "system.posix_acl_default",
4: "trusted.",
6: "security.",
7: "system.",
8: "system.richacl"
}
prefixes.update(prefixes)
# Iterator over ext4_xattr_entry structures
i = 0
while i < len(raw_data):
xattr_entry = ext4_xattr_entry._from_buffer_copy(raw_data, i, platform64=self.volume.platform64)
if (
xattr_entry.e_name_len | xattr_entry.e_name_index | xattr_entry.e_value_offs | xattr_entry.e_value_inum) == 0:
# End of ext4_xattr_entry list
break
if not xattr_entry.e_name_index in prefixes:
raise Ext4Error("Unknown attribute prefix {prefix:d} in inode {inode:d}".format(
inode=self.inode_idx,
prefix=xattr_entry.e_name_index
))
xattr_name = prefixes[xattr_entry.e_name_index] + xattr_entry.e_name.decode("iso-8859-2")
if xattr_entry.e_value_inum != 0:
# external xattr
xattr_inode = self.volume.get_inode(xattr_entry.e_value_inum, InodeType.FILE)
if not self.volume.ignore_flags and (xattr_inode.inode.i_flags & ext4_inode.EXT4_EA_INODE_FL) != 0:
raise Ext4Error(
"Inode {value_indoe:d} associated with the extended attribute {xattr_name!r:s} of inode {inode:d} is not marked as large extended attribute value.".format(
inode=self.inode_idx,
value_inode=xattr_inode.inode_idx,
xattr_name=xattr_name
))
# TODO Use xattr_entry.e_value_size or xattr_inode.inode.i_size?
xattr_value = xattr_inode.open_read().read()
else:
# internal xattr
xattr_value = raw_data[
xattr_entry.e_value_offs + offset: xattr_entry.e_value_offs + offset + xattr_entry.e_value_size]
yield xattr_name, xattr_value
i += xattr_entry._size
@staticmethod
def directory_entry_comparator(dir_a, dir_b):
file_name_a, _, file_type_a = dir_a
file_name_b, _, file_type_b = dir_b
if file_type_a == InodeType.DIRECTORY == file_type_b or file_type_a != InodeType.DIRECTORY != file_type_b:
tmp = wcscmp(file_name_a.lower(), file_name_b.lower())
return tmp if tmp != 0 else wcscmp(file_name_a, file_name_b)
else:
return -1 if file_type_a == InodeType.DIRECTORY else 1
directory_entry_key = functools.cmp_to_key(directory_entry_comparator)
def get_inode(self, *relative_path, decode_name=None):
if not self.is_dir:
raise Ext4Error("Inode {inode:d} is not a directory.".format(inode=self.inode_idx))
current_inode = self
for i, part in enumerate(relative_path):
if not self.volume.ignore_flags and not current_inode.is_dir:
current_path = "/".join(relative_path[:i])
raise Ext4Error("{current_path!r:s} (Inode {inode:d}) is not a directory.".format(
current_path=current_path,
inode=inode_idx
))
file_name, inode_idx, file_type = next(
filter(lambda entry: entry[0] == part, current_inode.open_dir(decode_name)), (None, None, None))
if inode_idx is None:
current_path = "/".join(relative_path[:i])
raise FileNotFoundError("{part!r:s} not found in {current_path!r:s} (Inode {inode:d}).".format(
current_path=current_path,
inode=current_inode.inode_idx,
part=part
))
current_inode = current_inode.volume.get_inode(inode_idx, file_type)
return current_inode
@property
def is_dir(self):
if (self.volume.superblock.s_feature_incompat & ext4_superblock.INCOMPAT_FILETYPE) == 0:
return (self.inode.i_mode & ext4_inode.S_IFDIR) != 0
else:
return self.file_type == InodeType.DIRECTORY
@property
def is_file(self):
if (self.volume.superblock.s_feature_incompat & ext4_superblock.INCOMPAT_FILETYPE) == 0:
return (self.inode.i_mode & ext4_inode.S_IFREG) != 0
else:
return self.file_type == InodeType.FILE
@property
def is_symlink(self):
if (self.volume.superblock.s_feature_incompat & ext4_superblock.INCOMPAT_FILETYPE) == 0:
return (self.inode.i_mode & ext4_inode.S_IFLNK) != 0
else:
return self.file_type == InodeType.SYMBOLIC_LINK
@property
def is_in_use(self):
group_idx, bitmap_bit = self.volume.get_inode_group(self.inode_idx)
inode_usage_bitmap_offset = self.volume.group_descriptors[group_idx].bg_inode_bitmap * self.volume.block_size
inode_usage_byte = self.volume.read(inode_usage_bitmap_offset + bitmap_bit // 8, 1)[0]
return ((inode_usage_byte >> (7 - bitmap_bit % 8)) & 1) != 0
@property
def mode_str(self):
special_flag = lambda letter, execute, special: {
(False, False): "-",
(False, True): letter.upper(),
(True, False): "x",
(True, True): letter.lower()
}[(execute, special)]
try:
if (self.volume.superblock.s_feature_incompat & ext4_superblock.INCOMPAT_FILETYPE) == 0:
device_type = {
ext4_inode.S_IFIFO: "p",
ext4_inode.S_IFCHR: "c",
ext4_inode.S_IFDIR: "d",
ext4_inode.S_IFBLK: "b",
ext4_inode.S_IFREG: "-",
ext4_inode.S_IFLNK: "l",
ext4_inode.S_IFSOCK: "s",
}[self.inode.i_mode & 0xF000]
else:
device_type = {
InodeType.FILE: "-",
InodeType.DIRECTORY: "d",
InodeType.CHARACTER_DEVICE: "c",
InodeType.BLOCK_DEVICE: "b",
InodeType.FIFO: "p",
InodeType.SOCKET: "s",
InodeType.SYMBOLIC_LINK: "l"
}[self.file_type]
except KeyError:
device_type = "?"
return "".join([
device_type,
"r" if (self.inode.i_mode & ext4_inode.S_IRUSR) != 0 else "-",
"w" if (self.inode.i_mode & ext4_inode.S_IWUSR) != 0 else "-",
special_flag("s", (self.inode.i_mode & ext4_inode.S_IXUSR) != 0,
(self.inode.i_mode & ext4_inode.S_ISUID) != 0),
"r" if (self.inode.i_mode & ext4_inode.S_IRGRP) != 0 else "-",
"w" if (self.inode.i_mode & ext4_inode.S_IWGRP) != 0 else "-",
special_flag("s", (self.inode.i_mode & ext4_inode.S_IXGRP) != 0,
(self.inode.i_mode & ext4_inode.S_ISGID) != 0),
"r" if (self.inode.i_mode & ext4_inode.S_IROTH) != 0 else "-",
"w" if (self.inode.i_mode & ext4_inode.S_IWOTH) != 0 else "-",
special_flag("t", (self.inode.i_mode & ext4_inode.S_IXOTH) != 0,
(self.inode.i_mode & ext4_inode.S_ISVTX) != 0),
])
def open_dir(self, decode_name=None):
# Parse args
if decode_name is None:
decode_name = lambda raw: raw.decode("utf8")
if not self.volume.ignore_flags and not self.is_dir:
raise Ext4Error("Inode ({inode:d}) is not a directory.".format(inode=self.inode_idx))
# # Hash trees are compatible with linear arrays
if (self.inode.i_flags & ext4_inode.EXT4_INDEX_FL) != 0:
pass
# Read raw directory content
raw_data = self.open_read().read()
offset = 0
while offset < len(raw_data):
dirent = ext4_dir_entry_2._from_buffer_copy(raw_data, offset, platform64=self.volume.platform64)
if dirent.file_type != InodeType.CHECKSUM:
yield decode_name(dirent.name), dirent.inode, dirent.file_type
offset += dirent.rec_len
def open_read(self):
if (self.inode.i_flags & ext4_inode.EXT4_EXTENTS_FL) != 0:
# Obtain mapping from extents
mapping = [] # List of MappingEntry instances
nodes = queue.Queue()
nodes.put_nowait(self.offset + ext4_inode.i_block.offset)
while nodes.qsize() != 0:
header_offset = nodes.get_nowait()
header = self.volume.read_struct(ext4_extent_header, header_offset)
if not self.volume.ignore_magic and header.eh_magic != 0xF30A:
raise MagicError(
"Invalid magic value in extent header at offset 0x{header_offset:X} of inode {inode:d}: 0x{header_magic:04X} (expected 0xF30A)".format(
header_magic=header.eh_magic,
header_offset=self.inode_idx,
inode=self.inode_idx
))
if header.eh_depth != 0:
indices = self.volume.read_struct(ext4_extent_idx * header.eh_entries,
header_offset + ctypes.sizeof(ext4_extent_header))
for idx in indices: nodes.put_nowait(idx.ei_leaf * self.volume.block_size)
else:
extents = self.volume.read_struct(ext4_extent * header.eh_entries,
header_offset + ctypes.sizeof(ext4_extent_header))
for extent in extents:
mapping.append(MappingEntry(extent.ee_block, extent.ee_start, extent.ee_len))
MappingEntry.optimize(mapping)
return BlockReader(self.volume, len(self), mapping)
else:
# Inode uses inline data
i_block = self.volume.read(self.offset + ext4_inode.i_block.offset, ext4_inode.i_block.size)
return io.BytesIO(i_block[:self.inode.i_size])
@property
def size_readable(self):
if self.inode.i_size < 1024:
return "{0:d} bytes".format(self.inode.i_size) if self.inode.i_size != 1 else "1 byte"
else:
units = ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
unit_idx = min(int(math.log(self.inode.i_size, 1024)), len(units))
return "{size:.2f} {unit:s}".format(
size=self.inode.i_size / (1024 ** unit_idx),
unit=units[unit_idx - 1]
)
def xattrs(self, check_inline=True, check_block=True, force_inline=False, prefix_override: dict = None):
# Inline xattrs
inline_data_offset = self.offset + ext4_inode.EXT2_GOOD_OLD_INODE_SIZE + self.inode.i_extra_isize
inline_data_length = self.offset + self.volume.superblock.s_inode_size - inline_data_offset
if check_inline and inline_data_length > ctypes.sizeof(ext4_xattr_ibody_header):
inline_data = self.volume.read(inline_data_offset, inline_data_length)
xattrs_header = ext4_xattr_ibody_header.from_buffer_copy(inline_data)
# TODO Find way to detect inline xattrs without checking the h_magic field to enable error detection with the h_magic field.
if force_inline or xattrs_header.h_magic == 0xEA020000:
offset = 4 * ((ctypes.sizeof(
ext4_xattr_ibody_header) + 3) // 4) # The ext4_xattr_entry following the header is aligned on a 4-byte boundary
try:
for xattr_name, xattr_value in self._parse_xattrs(inline_data[offset:], 0,
prefix_override=prefix_override):
yield xattr_name, xattr_value
except:
pass
# xattr block(s)
if check_block and self.inode.i_file_acl != 0:
xattrs_block_start = self.inode.i_file_acl * self.volume.block_size
xattrs_block = self.volume.read(xattrs_block_start, self.volume.block_size)
xattrs_header = ext4_xattr_header.from_buffer_copy(xattrs_block)
if not self.volume.ignore_magic and xattrs_header.h_magic != 0xEA020000:
try:
raise MagicError(
"Invalid magic value in xattrs block header at offset 0x{xattrs_block_start:X} of inode {inode:d}: 0x{xattrs_header} (expected 0xEA020000)".format(
inode=self.inode_idx,
xattrs_block_start=xattrs_block_start,
xattrs_header=xattrs_header.h_magic
))
except:
pass
if xattrs_header.h_blocks != 1:
raise Ext4Error(
"Invalid number of xattr blocks at offset 0x{xattrs_block_start:X} of inode {inode:d}: {xattrs_header:d} (expected 1)".format(
inode=self.inode_idx,
xattrs_header=xattrs_header.h_blocks,
xattrs_block_start=xattrs_block_start
))
offset = 4 * ((ctypes.sizeof(
ext4_xattr_header) + 3) // 4) # The ext4_xattr_entry following the header is aligned on a 4-byte boundary
for xattr_name, xattr_value in self._parse_xattrs(xattrs_block[offset:], -offset,
prefix_override=prefix_override):
yield xattr_name, xattr_value
class BlockReader:
# OSError
EINVAL = 22
def __init__(self, volume, byte_size, block_map):
self.byte_size = byte_size
self.volume = volume
self.cursor = 0
block_map = list(map(MappingEntry.copy, block_map))
# Optimize mapping (stich together)
MappingEntry.optimize(block_map)
self.block_map = block_map
def __repr__(self):
return "{type_name:s}(byte_size = {size!r:s}, block_map = {block_map!r:s}, volume_uuid = {uuid!r:s})".format(
block_map=self.block_map,
size=self.byte_size,
type_name=type(self).__name__,
uuid=self.volume.uuid
)
def get_block_mapping(self, file_block_idx):
disk_block_idx = None
# Find disk block
for entry in self.block_map:
if entry.file_block_idx <= file_block_idx < entry.file_block_idx + entry.block_count:
block_diff = file_block_idx - entry.file_block_idx
disk_block_idx = entry.disk_block_idx + block_diff
break
return disk_block_idx
def read(self, byte_len=-1):
# Parse args
if byte_len < -1:
raise ValueError("byte_len must be non-negative or -1")
bytes_remaining = self.byte_size - self.cursor
byte_len = bytes_remaining if byte_len == -1 else max(0, min(byte_len, bytes_remaining))
if byte_len == 0:
return b""
# Reading blocks
start_block_idx = self.cursor // self.volume.block_size
end_block_idx = (self.cursor + byte_len - 1) // self.volume.block_size
end_of_stream_check = byte_len
blocks = [self.read_block(i) for i in range(start_block_idx, end_block_idx - start_block_idx + 1)]
start_offset = self.cursor % self.volume.block_size
if start_offset != 0:
blocks[0] = blocks[0][start_offset:]
byte_len = (byte_len + start_offset - self.volume.block_size - 1) % self.volume.block_size + 1
blocks[-1] = blocks[-1][:byte_len]
result = b"".join(blocks)
# Check read
if len(result) != end_of_stream_check:
raise EndOfStreamError(
"The volume's underlying stream ended {0:d} bytes before EOF.".format(byte_len - len(result)))
self.cursor += len(result)
return result
def read_block(self, file_block_idx):
disk_block_idx = self.get_block_mapping(file_block_idx)
if disk_block_idx is not None:
return self.volume.read(disk_block_idx * self.volume.block_size, self.volume.block_size)
else:
return bytes([0] * self.volume.block_size)
def seek(self, seek, seek_mode=io.SEEK_SET):
if seek_mode == io.SEEK_CUR:
seek += self.cursor
elif seek_mode == io.SEEK_END:
seek += self.byte_size
# elif seek_mode == io.SEEK_SET:
# seek += 0
if seek < 0:
raise OSError(BlockReader.EINVAL, "Invalid argument") # Exception behavior copied from IOBase.seek
self.cursor = seek
return seek
def tell(self):
return self.cursor