|
#!/usr/bin/env python3 |
|
from time import sleep |
|
from subprocess import Popen, PIPE |
|
from tempfile import TemporaryDirectory |
|
import sys |
|
from datetime import datetime |
|
import re |
|
import os |
|
import shutil |
|
import tarfile |
|
import bz2 |
|
sys.path.append("TiBUdecrypter") |
|
import tibudecrypt |
|
import logging |
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
class Root: |
|
def __init__(self, su_command=("su",)): |
|
logging.info("Starting %s...", su_command) |
|
self.su = Popen(su_command, stdin=PIPE, stdout=PIPE) |
|
logging.info("Checking, whether uid is 0...") |
|
assert self.run("id -u") == ["0"] |
|
logging.info("We are root.") |
|
|
|
def run(self, command): |
|
logging.debug("Running command %s...", command) |
|
command += "\n" |
|
command = command.encode() |
|
self.su.stdin.write(command) |
|
self.su.stdin.write(b"echo ...END...\n") |
|
self.su.stdin.flush() |
|
lines = list() |
|
while True: |
|
line = self.su.stdout.readline() |
|
logging.debug("Received line: %s", line) |
|
line = line.decode().strip() |
|
if line != "...END...": |
|
lines.append(line) |
|
else: |
|
break |
|
logging.debug("Finished command.") |
|
return lines |
|
|
|
def exit(self): |
|
self.su.stdin.write(b"exit\n") |
|
self.su.stdin.flush() |
|
self.su.stdout.readline() |
|
logging.info("Root shell exited with %s.", self.su.poll()) |
|
|
|
def parse_argv(argv): |
|
logging.info("Parsing file name...") |
|
assert len(argv) == 2 |
|
file_path = os.path.abspath(argv[1]) |
|
file_name = file_path.split("/")[-1] |
|
match = re.match(r"(\S+)-(\d+)-(\d+)[.]tar[.]bz2", file_name) |
|
if not match: |
|
print("The filename has a weird format.") |
|
exit(-1) |
|
date = datetime.strptime("{}-{}".format(*match.groups()[1:3]), "%Y%m%d-%H%M%S") |
|
return { |
|
"path": file_path, |
|
"name": file_name, |
|
"package_name": match.groups()[0], |
|
"date": date, |
|
} |
|
|
|
def search_data_dir(root, package_name): |
|
logging.info("Searching data dir for app %s...", package_name) |
|
all_dirs = root.run("ls /data/data") |
|
if not package_name in all_dirs: |
|
print("This package wasn't found.") |
|
exit(-1) |
|
path = os.path.join("/data/data", package_name) |
|
assert os.path.exists(path) |
|
return path |
|
|
|
def decompress(filename): |
|
logging.info("Decompressing...") |
|
logging.debug("Opening...") |
|
with bz2.BZ2File("decrypted-{}".format(filename)) as bz: |
|
with tarfile.TarFile(fileobj=bz) as tar: |
|
logging.debug("Files in this archive: %s", tar.getnames()) |
|
tar.extractall() |
|
logging.info("Done decompressing.") |
|
logging.info("Deleting archive...") |
|
os.remove("decrypted-{}".format(filename)) |
|
|
|
def kill_app(root, user): |
|
logging.info("Killing all processes for user %s...", user) |
|
for pid in root.run("find /proc -maxdepth 1 -user {} | cut -d/ -f3".format(user)): |
|
logging.debug("Killing %s...", pid) |
|
root.run("kill {}".format(pid)) |
|
|
|
def move_and_setup(root, file_info): |
|
logging.info("Gathering information about the old directory...") |
|
package_path = os.path.join("/data/data", file_info["package_name"]) |
|
stat_output = root.run("stat -c '%a;%u;%U;%g;%G' {}".format(package_path))[0] |
|
stat_info = { |
|
"mod": stat_output.split(";")[0], |
|
"uid": stat_output.split(";")[1], |
|
"user": stat_output.split(";")[2], |
|
"gid": stat_output.split(";")[3], |
|
"group": stat_output.split(";")[4], |
|
} |
|
logging.debug("stat: %s", stat_info) |
|
kill_app(root, stat_info["user"]) |
|
logging.info("Deleting the old directory...") |
|
root.run("rm -rf {}".format(package_path)) |
|
logging.info("Moving the new directory...") |
|
extracted_path = os.path.abspath(os.path.join(os.path.join("data", "data"), file_info["package_name"])) |
|
root.run("mv {} {}".format(extracted_path, package_path)) |
|
logging.info("Setting permissions...") |
|
root.run("chown -R {}:{} {}".format(stat_info["uid"], stat_info["gid"], package_path)) |
|
root.run("chmod -R {} {}".format(stat_info["mod"], package_path)) |
|
|
|
def main(): |
|
file_info = parse_argv(sys.argv) |
|
root = Root() |
|
data_dir = search_data_dir(root, file_info["package_name"]) |
|
old_pwd = os.path.abspath(os.curdir) |
|
with TemporaryDirectory() as tempdir: |
|
logging.debug("Got temporary directory: %s", tempdir) |
|
os.chdir(tempdir) |
|
logging.info("Making a copy of the file (%s to %s)...", file_info["path"], os.path.abspath(".")) |
|
shutil.copyfile(file_info["path"], file_info["name"]) |
|
logging.info("Decrypting...") |
|
tibudecrypt.main({ |
|
"<file>": file_info["name"], |
|
}) |
|
assert os.path.exists("decrypted-{}".format(file_info["name"])) |
|
logging.info("Done decrypting.") |
|
logging.info("Deleting copy...") |
|
os.remove(file_info["name"]) |
|
decompress(file_info["name"]) |
|
move_and_setup(root, file_info) |
|
os.chdir(old_pwd) |
|
logging.info("Done.") |
|
root.exit() |
|
|
|
if __name__ == '__main__': |
|
main() |