MIO-KITCHEN-SOURCE/utils.py

494 lines
17 KiB
Python

from __future__ import print_function
import errno
import os
import struct
import tempfile
from os.path import exists
import blockimgdiff
DataImage = blockimgdiff.DataImage
import sparse_img
from threading import Thread
from random import randint, choice
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
# -----
# ====================================================
# FUNCTION: sdat2img img2sdat
# AUTHORS: xpirt - luxi78 - howellzhu
# DATE: 2018-10-27 10:33:21 CEST | 2018-05-25 12:19:12 CEST
# ====================================================
# -----
# ----VALUES
from os import getcwd
from lpunpack import SparseImage
elocal = getcwd()
dn = None
formats = ([b'PK', "zip"], [b'OPPOENCRYPT!', "ozip"], [b'7z', "7z"], [b'\x53\xef', 'ext', 1080],
[b'\x3a\xff\x26\xed', "sparse"], [b'\xe2\xe1\xf5\xe0', "erofs", 1024], [b"CrAU", "payload"],
[b"AVB0", "vbmeta"], [b'\xd7\xb7\xab\x1e', "dtbo"],
[b'\xd0\x0d\xfe\xed', "dtb"], [b"MZ", "exe"], [b".ELF", 'elf'],
[b"ANDROID!", "boot"], [b"VNDRBOOT", "vendor_boot"],
[b'AVBf', "avb_foot"], [b'BZh', "bzip2"],
[b'CHROMEOS', 'chrome'], [b'\x1f\x8b', "gzip"],
[b'\x1f\x9e', "gzip"], [b'\x02\x21\x4c\x18', "lz4_legacy"],
[b'\x03\x21\x4c\x18', 'lz4'], [b'\x04\x22\x4d\x18', 'lz4'],
[b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\x03', "zopfli"], [b'\xfd7zXZ', 'xz'],
[b']\x00\x00\x00\x04\xff\xff\xff\xff\xff\xff\xff\xff', 'lzma'], [b'\x02!L\x18', 'lz4_lg'],
[b'\x89PNG', 'png'], [b"LOGO!!!!", 'logo', 4000])
# ----DEFS
class aesencrypt:
@staticmethod
def encrypt(key, file_path, outfile):
cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB)
with open(outfile, "wb") as f, open(file_path, 'rb') as fd:
f.write(cipher.encrypt(pad(fd.read(), AES.block_size)))
@staticmethod
def decrypt(key, file_path, outfile):
cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB)
with open(file_path, "rb") as f:
data = cipher.decrypt(f.read())
data = data[:-data[-1]]
with open(outfile, "wb") as f:
f.write(data)
class sdat2img:
def __init__(self, TRANSFER_LIST_FILE, NEW_DATA_FILE, OUTPUT_IMAGE_FILE):
print('sdat2img binary - version: 1.3\n')
self.TRANSFER_LIST_FILE = TRANSFER_LIST_FILE
self.NEW_DATA_FILE = NEW_DATA_FILE
self.OUTPUT_IMAGE_FILE = OUTPUT_IMAGE_FILE
self.list_file = self.parse_transfer_list_file()
block_size = 4096
version = next(self.list_file)
next(self.list_file)
show = "Android {} detected!\n"
if version == 1:
print(show.format("Lollipop 5.0"))
elif version == 2:
print(show.format("Lollipop 5.1"))
elif version == 3:
print(show.format("Marshmallow 6.x"))
elif version == 4:
print(show.format("Nougat 7.x / Oreo 8.x / Pie 9.x"))
else:
print(show.format('Unknown Android version {version}!\n'))
# Don't clobber existing files to avoid accidental data loss
try:
output_img = open(self.OUTPUT_IMAGE_FILE, 'wb')
except IOError as e:
if e.errno == errno.EEXIST:
print('Error: the output file "{}" already exists'.format(e.filename))
print('Remove it, rename it, or choose a different file name.')
return
else:
raise
new_data_file = open(self.NEW_DATA_FILE, 'rb')
max_file_size = 0
for command in self.list_file:
max_file_size = max(pair[1] for pair in [i for i in command[1]]) * block_size
if command[0] == 'new':
for block in command[1]:
begin = block[0]
block_count = block[1] - begin
print('Copying {} blocks into position {}...'.format(block_count, begin))
# Position output file
output_img.seek(begin * block_size)
# Copy one block at a time
while block_count > 0:
output_img.write(new_data_file.read(block_size))
block_count -= 1
else:
print('Skipping command {}...'.format(command[0]))
# Make file larger if necessary
if output_img.tell() < max_file_size:
output_img.truncate(max_file_size)
output_img.close()
new_data_file.close()
print('Done! Output image: {}'.format(os.path.realpath(output_img.name)))
@staticmethod
def rangeset(src):
src_set = src.split(',')
num_set = [int(item) for item in src_set]
if len(num_set) != num_set[0] + 1:
print('Error on parsing following data to rangeset:\n{}'.format(src))
return
return tuple([(num_set[i], num_set[i + 1]) for i in range(1, len(num_set), 2)])
def parse_transfer_list_file(self):
with open(self.TRANSFER_LIST_FILE, 'r') as trans_list:
# First line in transfer list is the version number
# Second line in transfer list is the total number of blocks we expect to write
if (version := int(trans_list.readline())) >= 2 and (new_blocks := int(trans_list.readline())):
# Third line is how many stash entries are needed simultaneously
trans_list.readline()
# Fourth line is the maximum number of blocks that will be stashed simultaneously
trans_list.readline()
# Subsequent lines are all individual transfer commands
yield version
yield new_blocks
for line in trans_list:
line = line.split(' ')
cmd = line[0]
if cmd in ['erase', 'new', 'zero']:
yield [cmd, self.rangeset(line[1])]
else:
# Skip lines starting with numbers, they are not commands anyway
if not cmd[0].isdigit():
print('Command "{}" is not valid.'.format(cmd))
return
def gettype(file) -> str:
if not os.path.exists(file):
return "fne"
def compare(header: bytes, number: int = 0) -> int:
with open(file, 'rb') as f:
f.seek(number)
return f.read(len(header)) == header
def is_super(fil) -> any:
with open(fil, 'rb') as file_:
buf = bytearray(file_.read(4))
if len(buf) < 4:
return False
file_.seek(0, 0)
while buf[0] == 0x00:
buf = bytearray(file_.read(1))
try:
file_.seek(-1, 1)
except:
return False
buf += bytearray(file_.read(4))
return buf[1:] == b'\x67\x44\x6c\x61'
try:
if is_super(file):
return 'super'
except IndexError:
pass
for f_ in formats:
if len(f_) == 2:
if compare(f_[0]):
return f_[1]
elif len(f_) == 3:
if compare(f_[0], f_[2]):
return f_[1]
try:
if LOGODUMPER(file, str(None)).chkimg(file):
return 'logo'
except AssertionError:
pass
except struct.error:
pass
return "unknow"
def dynamic_list_reader(path):
data = {}
with open(path, 'r', encoding='utf-8') as l_f:
for p in l_f.readlines():
if p[:1] == '#':
continue
tmp = p.strip().split()
if tmp[0] == 'remove_all_groups':
data.clear()
elif tmp[0] == 'add_group':
data[tmp[1]] = {}
data[tmp[1]]['size'] = tmp[2]
data[tmp[1]]['parts'] = []
elif tmp[0] == 'add':
data[tmp[2]]['parts'].append(tmp[1])
else:
print(f"Skip {tmp}")
return data
def generate_dynamic_list(dbfz, size, set_, lb, work):
data = ['# Remove all existing dynamic partitions and groups before applying full OTA', 'remove_all_groups']
with open(work + "dynamic_partitions_op_list", 'w', encoding='utf-8', newline='\n') as d_list:
if set_ == 1:
data.append(f'# Add group {dbfz} with maximum size {size}')
data.append(f'add_group {dbfz} {size}')
elif set_ in [2, 3]:
data.append(f'# Add group {dbfz}_a with maximum size {size}')
data.append(f'add_group {dbfz}_a {size}')
data.append(f'# Add group {dbfz}_b with maximum size {size}')
data.append(f'add_group {dbfz}_b {size}')
for part in lb:
if set_ == 1:
data.append(f'# Add partition {part} to group {dbfz}')
data.append(f'add {part} {dbfz}')
elif set_ in [2, 3]:
data.append(f'# Add partition {part}_a to group {dbfz}_a')
data.append(f'add {part}_a {dbfz}_a')
data.append(f'# Add partition {part}_b to group {dbfz}_b')
data.append(f'add {part}_b {dbfz}_b')
for part in lb:
if set_ == 1:
data.append(f'# Grow partition {part} from 0 to {os.path.getsize(work + part + ".img")}')
data.append(f'resize {part} {os.path.getsize(work + part + ".img")}')
elif set_ in [2, 3]:
data.append(f'# Grow partition {part}_a from 0 to {os.path.getsize(work + part + ".img")}')
data.append(f'resize {part}_a {os.path.getsize(work + part + ".img")}')
d_list.writelines([key + "\n" for key in data])
data.clear()
def v_code(num=6) -> str:
ret = ""
for i in range(num):
num = randint(0, 9)
# num = chr(random.randint(48,57))#ASCII表示数字
letter = chr(randint(97, 122)) # 取小写字母
Letter = chr(randint(65, 90)) # 取大写字母
s = str(choice([num, letter, Letter]))
ret += s
return ret
def qc(file_) -> None:
if not exists(file_):
return
with open(file_, 'r+', encoding='utf-8', newline='\n') as f:
data = f.readlines()
data = sorted(set(data), key=data.index)
f.seek(0)
f.truncate()
f.writelines(data)
del data
def cz(func, *args):
Thread(target=func, args=args, daemon=True).start()
def simg2img(path):
with open(path, 'rb') as fd:
if SparseImage(fd).check():
print('Sparse image detected.')
print('Process conversion to non sparse image...')
unsparse_file = SparseImage(fd).unsparse()
print('Result:[ok]')
else:
print(f"{path} not Sparse.Skip!")
try:
if os.path.exists(unsparse_file):
os.remove(path)
os.rename(unsparse_file, path)
except Exception as e:
print(e)
def img2sdat(input_image, out_dir='.', version=None, prefix='system'):
print('img2sdat binary - version: 1.7\n')
if not os.path.isdir(out_dir):
os.makedirs(out_dir)
'''
1. Android Lollipop 5.0
2. Android Lollipop 5.1
3. Android Marshmallow 6.0
4. Android Nougat 7.0/7.1/8.0/8.1
'''
blockimgdiff.BlockImageDiff(sparse_img.SparseImage(input_image, tempfile.mkstemp()[1], '0'), None, version).Compute(
out_dir + '/' + prefix)
def findfile(file, dir_) -> str:
for root, dirs, files in os.walk(dir_, topdown=True):
if file in files:
if os.name == 'nt':
return (root + os.sep + file).replace("\\", '/')
else:
return root + os.sep + file
else:
pass
def findfolder(dir__, folder_name):
for root, dirnames, filenames in os.walk(dir__):
for dirname in dirnames:
if dirname == folder_name:
return os.path.join(root, dirname).replace("\\", '/')
return None
# ----CLASSES
class jzxs(object):
def __init__(self, master):
self.master = master
self.set()
def set(self):
self.master.geometry('+{}+{}'.format(int(self.master.winfo_screenwidth() / 2 - self.master.winfo_width() / 2),
int(self.master.winfo_screenheight() / 2 - self.master.winfo_height() / 2)))
class vbpatch:
def __init__(self, file_):
self.file = file_
def checkmagic(self):
if os.access(self.file, os.F_OK):
magic = b'AVB0'
with open(self.file, "rb") as f:
buf = f.read(4)
return magic == buf
else:
print("File dose not exist!")
def readflag(self):
if not self.checkmagic():
return False
if os.access(self.file, os.F_OK):
with open(self.file, "rb") as f:
f.seek(123, 0)
flag = f.read(1)
if flag == b'\x00':
return 0 # Verify boot and dm-verity is on
elif flag == b'\x01':
return 1 # Verify boot but dm-verity is off
elif flag == b'\x02':
return 2 # All verity is off
else:
return flag
else:
print("File does not exist!")
def patchvb(self, flag):
if not self.checkmagic():
return False
if os.access(self.file, os.F_OK):
with open(self.file, 'rb+') as f:
f.seek(123, 0)
f.write(flag)
print("Done!")
else:
print("File not Found")
def restore(self):
self.patchvb(b'\x00')
def disdm(self):
self.patchvb(b'\x01')
def disavb(self):
self.patchvb(b'\x02')
class DUMPCFG:
blksz = 0x1 << 0xc
headoff = 0x4000
magic = b"LOGO!!!!"
imgnum = 0
imgblkoffs = []
imgblkszs = []
class BMPHEAD(object):
def __init__(self, buf: bytes = None): # Read bytes buf and use this struct to parse
assert buf is not None, f"buf Should be bytes not {type(buf)}"
# print(buf)
self.structstr = "<H6I"
(
self.magic,
self.fsize,
self.reserved,
self.hsize,
self.dib,
self.width,
self.height,
) = struct.unpack(self.structstr, buf)
class XIAOMI_BLKSTRUCT(object):
def __init__(self, buf: bytes):
self.structstr = "2I"
(
self.imgoff,
self.blksz,
) = struct.unpack(self.structstr, buf)
class LOGODUMPER(object):
def __init__(self, img: str, out: str, dir__: str = "pic"):
self.out = out
self.img = img
self.dir = dir__
self.structstr = "<8s"
self.cfg = DUMPCFG()
self.chkimg(img)
def chkimg(self, img: str):
assert os.access(img, os.F_OK), f"{img} does not found!"
with open(img, 'rb') as f:
f.seek(self.cfg.headoff, 0)
self.magic = struct.unpack(
self.structstr, f.read(struct.calcsize(self.structstr))
)[0]
while True:
m = XIAOMI_BLKSTRUCT(f.read(8))
if m.imgoff != 0:
self.cfg.imgblkszs.append(m.blksz << 0xc)
self.cfg.imgblkoffs.append(m.imgoff << 0xc)
self.cfg.imgnum += 1
else:
break
assert self.magic == b"LOGO!!!!", "File does not match xiaomi logo magic!"
return True
def unpack(self):
with open(self.img, 'rb') as f:
print("Unpack:\n"
"BMP\tSize\tWidth\tHeight")
for i in range(self.cfg.imgnum):
f.seek(self.cfg.imgblkoffs[i], 0)
bmph = BMPHEAD(f.read(26))
f.seek(self.cfg.imgblkoffs[i], 0)
print("%d\t%d\t%d\t%d" % (i, bmph.fsize, bmph.width, bmph.height))
with open(os.path.join(self.out, "%d.bmp" % i), 'wb') as o:
o.write(f.read(bmph.fsize))
print("\tDone!")
def repack(self):
with open(self.out, 'wb') as o:
off = 0x5
for i in range(self.cfg.imgnum):
print("Write BMP [%d.bmp] at offset 0x%X" % (i, off << 0xc))
with open(os.path.join(self.dir, "%d.bmp" % i), 'rb') as b:
bhead = BMPHEAD(b.read(26))
b.seek(0, 0)
self.cfg.imgblkszs[i] = (bhead.fsize >> 0xc) + 1
self.cfg.imgblkoffs[i] = off
o.seek(off << 0xc)
o.write(b.read(bhead.fsize))
off += self.cfg.imgblkszs[i]
o.seek(self.cfg.headoff)
o.write(self.magic)
for i in range(self.cfg.imgnum):
o.write(struct.pack("<I", self.cfg.imgblkoffs[i]))
o.write(struct.pack("<I", self.cfg.imgblkszs[i]))
print("\tDone!")