Created
December 5, 2023 13:24
-
-
Save udf/62c24984b89de206fc7ae725cbb9b738 to your computer and use it in GitHub Desktop.
Scripts from https://blog.withsam.org/blog/exfat-data-recovery/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Code to recursively extract a directory from a damaged exFAT filesystem | |
from collections import OrderedDict | |
from io import BytesIO | |
import os | |
from pathlib import Path | |
from dissect.fat.c_exfat import ( | |
DIR_ENTRY_SIZE, | |
FILE_ENTRY, | |
c_exfat, | |
) | |
from dissect.fat.exfat import RootDirectory | |
_construct_filename = RootDirectory._construct_filename | |
IMAGE_HANDLE = open('/home/sam/vita.img', 'rb') | |
DATA_START = 0x43F0000 | |
CLUSTER_SIZE = 0x8000 | |
FIRST_VALID_CLUSTER = 499394 | |
def cluster_to_address(cluster_no): | |
return DATA_START + cluster_no * CLUSTER_SIZE | |
def address_to_cluster(address): | |
cluster_no, rem = divmod(address - DATA_START, CLUSTER_SIZE) | |
if rem != 0: | |
raise ValueError(f'{address=:X} is not at a cluster boundary!') | |
return cluster_no | |
def read_cluster(cluster_no, length=CLUSTER_SIZE): | |
address = cluster_to_address(cluster_no) | |
IMAGE_HANDLE.seek(address) | |
return IMAGE_HANDLE.read(length) | |
def iter_clusters(start_cluster_no=0, window_size=1): | |
read_len = window_size * CLUSTER_SIZE | |
for i in range(start_cluster_no, NUM_CLUSTERS): | |
yield i, read_cluster(i, read_len) | |
def parse_file_entries(cluster_no, record_size=CLUSTER_SIZE): | |
if cluster_no < FIRST_VALID_CLUSTER: | |
print(f'warning: tried to read missing records from {cluster_no=}') | |
return OrderedDict() | |
data = read_cluster(cluster_no, record_size) | |
return _parse_file_entries(data) | |
# adapted from https://github.com/fox-it/dissect.fat/blob/b9c8dbe0f81c3377022c5bfe49f1099faba94dc3/dissect/fat/exfat.py#L248 | |
def _parse_file_entries(data): | |
entries = OrderedDict() | |
data_len = len(data) | |
fh = BytesIO(data) | |
try: | |
while fh.tell() < data_len: | |
entry = c_exfat.FILE_DIRECTORY_ENTRY(fh.read(DIR_ENTRY_SIZE)) | |
if entry.entry_type == FILE_ENTRY: # Not parsing any other types | |
# Entry is a file so we reuse it | |
metadata = entry | |
# -1 because the metadata entry includes the stream dir entry in its count | |
fnentry_count = metadata.subentry_count - 1 | |
stream = c_exfat.STREAM_DIRECTORY_ENTRY(fh.read(DIR_ENTRY_SIZE)) | |
fn_entries = [] | |
for _ in range(fnentry_count): | |
fn_entries.append(c_exfat.FILENAME_DIRECTORY_ENTRY(fh.read(DIR_ENTRY_SIZE))) | |
file_ = c_exfat.FILE(metadata=metadata, stream=stream, fn_entries=fn_entries) | |
filename = _construct_filename(file_.fn_entries) | |
entries[filename] = file_ | |
except EOFError: | |
pass | |
return entries | |
def recursive_extract(cluster_no, path, record_size=CLUSTER_SIZE): | |
if isinstance(path, str): | |
path = Path(path) | |
entries = parse_file_entries(cluster_no, record_size) | |
for filename, file_ in entries.items(): | |
new_path = path / filename | |
file_addr = cluster_to_address(file_.stream.location) | |
file_len = file_.stream.data_length | |
if not file_.stream.flags.not_fragmented: | |
# Can't assemble fragmented files without FAT | |
print( | |
f'warning: skipping fragmented file {new_path} ' | |
f'({file_len} bytes at 0x{file_addr:X})' | |
) | |
continue | |
if file_.metadata.attributes.directory: | |
recursive_extract( | |
file_.stream.location, | |
path=new_path, | |
record_size=file_len | |
) | |
continue | |
print('writing', new_path, file_len) | |
path.mkdir(parents=True, exist_ok=True) | |
IMAGE_HANDLE.seek(file_addr) | |
with open(new_path, 'wb') as f: | |
f.write(IMAGE_HANDLE.read(file_len)) | |
IMAGE_HANDLE.seek(0, os.SEEK_END) | |
NUM_CLUSTERS = address_to_cluster(IMAGE_HANDLE.tell()) | |
IMAGE_HANDLE.seek(0, os.SEEK_CUR) | |
if __name__ == '__main__': | |
# FFX | |
recursive_extract(5463310, 'PCSB00395') | |
# sdslot.dat fragmented (looks somewhat contiguous, 4 + NULL + 5) | |
# sure??? | |
with open('PCSB00395/sce_sys/sdslot.dat', 'wb') as f: | |
IMAGE_HANDLE.seek(0x29B2CD8000) | |
f.write(IMAGE_HANDLE.read(CLUSTER_SIZE * 4)) | |
IMAGE_HANDLE.seek(CLUSTER_SIZE, os.SEEK_CUR) | |
f.write(IMAGE_HANDLE.read(CLUSTER_SIZE * 5)) | |
# Cold steel (some folders) | |
recursive_extract(address_to_cluster(0x16C2090000), 'PCSB01016/sce_pfs') | |
recursive_extract(address_to_cluster(0x16D9EE8000), 'PCSB01016/sce_sys') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Tries every candidate block to find the second half of a missing thumbnail block | |
from concurrent.futures import ThreadPoolExecutor | |
from pathlib import Path | |
import shutil | |
import tempfile | |
import subprocess | |
from bitarray import bitarray | |
import extract | |
workdir = tempfile.mkdtemp() | |
srcdir = str(Path(workdir) / 'src') | |
shutil.copytree('PCSB01016', srcdir, dirs_exist_ok=True) | |
print(f'{workdir=}') | |
initial_data = extract.read_cluster(5463311) | |
def try_data(data, cluster_no): | |
tmpdir = tempfile.mkdtemp(dir=workdir) | |
shutil.copytree(srcdir, tmpdir, dirs_exist_ok=True) | |
with open(Path(tmpdir) / f'5463311+{cluster_no}.64k', 'wb') as f: | |
f.write(initial_data + data) | |
p = subprocess.check_output( | |
[ | |
'/home/sam/proj/p/vitarec/psvpfstools/cmake/output/Release/psvpfsparser', | |
'-i', tmpdir, | |
'-o', tmpdir + '_out' | |
] | |
) | |
shutil.rmtree(tmpdir) | |
return p.decode('utf-8') | |
with open('metadata.txt', 'r') as f: | |
non_metadata = bitarray(f.read()) | |
non_metadata.invert() | |
with open('orphans.txt', 'r') as f: | |
orphans = bitarray(f.read()) | |
with open('unique.txt', 'r') as f: | |
unique_32k = bitarray(f.read()) | |
with open('unknown.txt', 'r') as f: | |
unknown = bitarray(f.read()) | |
def iter_candidates(): | |
candidates = non_metadata & orphans & unique_32k & unknown | |
for cluster_no in candidates.itersearch(bitarray('1')): | |
data = extract.read_cluster(cluster_no) | |
yield cluster_no, data | |
def process_candidate(candidate): | |
cluster_no, data = candidate | |
output = try_data(data, cluster_no) | |
return cluster_no, output | |
def main(): | |
with ThreadPoolExecutor(max_workers=64) as pool: | |
for cluster_no, output in pool.map(process_candidate, iter_candidates()): | |
if 'Merkle tree is invalid' not in output: | |
print(cluster_no) | |
print(output) | |
break | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment