Created
October 5, 2018 05:20
-
-
Save forensicmatt/5d06acb11986c02cb61e9599fe9625f9 to your computer and use it in GitHub Desktop.
Parse the $O file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Parse $O File | |
# Copyright Matthew Seyer 2018 | |
# Apache License Version 2 | |
# | |
# decode_objfile.py FILE [OUTPUT_TEMPLATE] | |
# | |
# Examples: | |
# Output JSON lines: | |
# python .\decode_objfile.py '$O' | |
# | |
# Output via an output template | |
# python .\decode_objfile.py '$O' "{mft_reference[entry]},{object_id[uuid]},{object_id[timestamp]}" | |
# | |
import sys | |
import json | |
import struct | |
import codecs | |
import logging | |
import binascii | |
import datetime | |
logging.basicConfig( | |
level=logging.ERROR | |
) | |
class InvalidIndxPageHeader(Exception): | |
def __init__(self, message): | |
super(InvalidIndxPageHeader, self).__init__(message) | |
class FileTime(datetime.datetime): | |
"""datetime.datetime object is immutable, so we will create a class to inherit | |
datetime.datetime so we can set a custom nanosecond. | |
""" | |
def __new__(cls, *args, **kwargs): | |
return datetime.datetime.__new__(cls, *args, **kwargs) | |
@staticmethod | |
def from_dt_object(dt_object, nanoseconds=0): | |
ft = FileTime( | |
dt_object.year, | |
dt_object.month, | |
dt_object.day, | |
dt_object.hour, | |
dt_object.minute, | |
dt_object.second, | |
dt_object.microsecond | |
) | |
ft.nanoseconds = nanoseconds | |
return ft | |
def __str__(self): | |
return "{0.year}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}.{0.nanoseconds}".format(self) | |
class NtfsReference(object): | |
def __init__(self, buf): | |
self._buffer = buf | |
@property | |
def reference(self): | |
return struct.unpack("<Q", self._buffer[0:8])[0] | |
@property | |
def entry(self): | |
return struct.unpack("<IH", self._buffer[0:6])[0] | |
@property | |
def sequence(self): | |
return struct.unpack("<H", self._buffer[6:8])[0] | |
def as_dict(self): | |
return { | |
"reference": self.reference, | |
"entry": self.entry, | |
"sequence": self.sequence | |
} | |
class ObjectId(object): | |
def __init__(self, buf): | |
self._buffer = buf | |
def raw(self): | |
return bytes(self._buffer) | |
@property | |
def timestamp(self): | |
# http://computerforensics.parsonage.co.uk/downloads/TheMeaningofLIFE.pdf | |
# The file ObjectID is a time based version which means it is created using a system time. | |
# The time is a 60 bit time value, a count of 100 nanosecond intervals of UTC since midnight | |
# at the start of 15th October 1582. | |
# Get le uint64 | |
le_timestamp = struct.unpack("<Q", self._buffer[0:8])[0] | |
# remove first 4 bits used for version | |
le_timestamp = le_timestamp - (le_timestamp & 0xf000000000000000) | |
# see http://computerforensics.parsonage.co.uk/downloads/TheMeaningofLIFE.pdf | |
le_timestamp = le_timestamp - 5748192000000000 | |
dt_object = datetime.datetime(1601, 1, 1) + datetime.timedelta( | |
microseconds=le_timestamp / 10 | |
) | |
# filetime is 100 nanosecond resolution | |
nanoseconds = str(le_timestamp % 10000000).zfill(7) + '00' | |
filetime = FileTime.from_dt_object( | |
dt_object, nanoseconds=nanoseconds | |
) | |
return filetime | |
@property | |
def timestamp_uint64(self): | |
le_timestamp = struct.unpack("<Q", self._buffer[0:8])[0] | |
le_timestamp = le_timestamp - (le_timestamp & 0xf000000000000000) | |
return le_timestamp | |
@property | |
def version(self): | |
high_order = struct.unpack(">H", self._buffer[6:8])[0] | |
return high_order & 0x000f | |
@property | |
def variant(self): | |
field = struct.unpack(">H", self._buffer[8:10])[0] | |
return field >> 14 | |
@property | |
def sequence(self): | |
field = struct.unpack(">H", self._buffer[8:10])[0] | |
return field & 0x3FFF | |
@property | |
def mac(self): | |
return bytes(self._buffer[10:16]) | |
def as_dict(self): | |
return { | |
"uuid": str(self), | |
"hex": self.raw(), | |
"timestamp": str(self.timestamp), | |
"timestamp_uint64": self.timestamp_uint64, | |
"version": self.version, | |
"variant": self.variant, | |
"sequence": self.sequence, | |
"mac": self.mac | |
} | |
def __str__(self): | |
return "{:08x}-{:04x}-{:04x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format( | |
struct.unpack("<L", self._buffer[0:4])[0], | |
struct.unpack("<H", self._buffer[4:6])[0], | |
struct.unpack("<H", self._buffer[6:8])[0], | |
struct.unpack("<B", self._buffer[8:9])[0], | |
struct.unpack("<B", self._buffer[9:10])[0], | |
struct.unpack("<B", self._buffer[10:11])[0], | |
struct.unpack("<B", self._buffer[11:12])[0], | |
struct.unpack("<B", self._buffer[12:13])[0], | |
struct.unpack("<B", self._buffer[13:14])[0], | |
struct.unpack("<B", self._buffer[14:15])[0], | |
struct.unpack("<B", self._buffer[15:16])[0] | |
) | |
class IndexHeader(object): | |
def __init__(self, buf): | |
update_seq_off = struct.unpack("<H", buf[4:6])[0] | |
update_seq_size = struct.unpack("<H", buf[6:8])[0] | |
self._buffer = bytearray(buf[0:update_seq_off+update_seq_size*2]) | |
def block_size(self): | |
"""The block size of the index will be the update sequence size - 1 * 512. | |
The update sequence array gets applied every 512 bytes (the first 2 bytes is the value) | |
""" | |
return (self.update_sequence_size - 1) * 512 | |
def get_fixup_array(self): | |
"""Return the update sequence array as a list of 2 bytes each. | |
""" | |
so = self.update_sequence_offset | |
eo = self.update_sequence_offset+(self.update_sequence_size*2) | |
raw_buf = self._buffer[so:eo] | |
return [raw_buf[i:i + 2] for i in range(0, len(raw_buf), 2)] | |
@property | |
def signature(self): | |
return bytes(self._buffer[0:4]) | |
@property | |
def update_sequence_offset(self): | |
return struct.unpack("<H", self._buffer[4:6])[0] | |
@property | |
def update_sequence_size(self): | |
return struct.unpack("<H", self._buffer[6:8])[0] | |
@property | |
def logfile_sequence_number(self): | |
return struct.unpack("<Q", self._buffer[8:16])[0] | |
@property | |
def vcn(self): | |
return struct.unpack("<Q", self._buffer[16:24])[0] | |
@property | |
def index_entry_offset(self): | |
return struct.unpack("<I", self._buffer[24:28])[0] | |
@property | |
def index_entry_size(self): | |
return struct.unpack("<I", self._buffer[28:32])[0] | |
@property | |
def allocated_index_entry_size(self): | |
return struct.unpack("<I", self._buffer[32:36])[0] | |
@property | |
def leaf_node(self): | |
return struct.unpack("<B", self._buffer[36:37])[0] | |
@property | |
def update_sequence(self): | |
return binascii.b2a_hex( | |
self._buffer[40:42] | |
) | |
class IndexOEntry(object): | |
def __init__(self, buf, offset=None): | |
self._offset = offset | |
logging.debug("Index Entry at Offset: {}".format(self._offset)) | |
offset = struct.unpack("<H", buf[0:2])[0] | |
size = struct.unpack("<H", buf[2:4])[0] | |
self._buffer = buf[0:offset+size] | |
def get_offset(self): | |
return self._offset | |
@property | |
def data_offset(self): | |
"""This should be 32""" | |
return struct.unpack("<H", self._buffer[0:2])[0] | |
@property | |
def data_size(self): | |
"""This should be 56""" | |
return struct.unpack("<H", self._buffer[2:4])[0] | |
@property | |
def entry_size(self): | |
"""This should be 88""" | |
return struct.unpack("<H", self._buffer[8:10])[0] | |
@property | |
def key_size(self): | |
"""This should be 16""" | |
return struct.unpack("<H", self._buffer[10:12])[0] | |
@property | |
def flags(self): | |
"""1 = Entry has subnodes; 2 = Last Entry""" | |
return struct.unpack("<H", self._buffer[12:14])[0] | |
@property | |
def object_id(self): | |
return ObjectId(self._buffer[16:32]) | |
@property | |
def mft_reference(self): | |
return NtfsReference( | |
self._buffer[32:40] | |
) | |
@property | |
def birth_volume(self): | |
return ObjectId(self._buffer[40:56]) | |
@property | |
def birth_object(self): | |
return ObjectId(self._buffer[56:72]) | |
@property | |
def birth_domain(self): | |
return ObjectId(self._buffer[72:88]) | |
def as_dict(self): | |
return { | |
"offset": self._offset, | |
"flags": self.flags, | |
"object_id": self.object_id.as_dict(), | |
"mft_reference": self.mft_reference.as_dict(), | |
"birth_volume": self.birth_volume.as_dict(), | |
"birth_object": self.birth_object.as_dict(), | |
"birth_domain": self.birth_domain.as_dict() | |
} | |
class IndexPage(object): | |
def __init__(self, file_handle, offset): | |
self._offset = offset | |
logging.debug("Parsing Index Page at offset: {}".format(self._offset)) | |
raw_buffer = file_handle.read(64) | |
if not bytes(raw_buffer[0:4]) == b"INDX": | |
raise( | |
InvalidIndxPageHeader( | |
"Invalid Page Header Signature [{}] at offset: {}".format( | |
bytes(raw_buffer[0:4]), | |
self._offset | |
) | |
) | |
) | |
self.header = IndexHeader( | |
raw_buffer | |
) | |
block_size = self.header.block_size() | |
self._index_block_buf = bytearray( | |
raw_buffer + file_handle.read( | |
block_size - 64 | |
) | |
) | |
self._fix_raw_block() | |
def get_page_size(self): | |
return self.header.block_size() | |
def _fix_raw_block(self): | |
"""Apply the update sequence array to their respected offsets. | |
""" | |
fix_up_array = self.header.get_fixup_array() | |
# first item in array is the update sequence value | |
for i in range(self.header.update_sequence_size-1): | |
v1 = fix_up_array[i+1][0] | |
v2 = fix_up_array[i + 1][1] | |
self._index_block_buf[(i*512)+510] = v1 | |
self._index_block_buf[(i*512)+511] = v2 | |
def iter_entries(self): | |
pointer = self.header.index_entry_offset + 24 | |
entry = IndexOEntry( | |
self._index_block_buf[pointer:], | |
offset=self._offset+pointer | |
) | |
pointer += entry.entry_size | |
while True: | |
yield entry | |
if pointer >= self.header.index_entry_size: | |
break | |
entry = IndexOEntry( | |
self._index_block_buf[pointer:], | |
offset=self._offset+pointer | |
) | |
pointer += entry.entry_size | |
class ObjectIdFile(object): | |
def __init__(self, file_handle): | |
self._file_handle = file_handle | |
self._offset = 0 | |
self._file_handle.seek(0, 2) | |
self._file_size = self._file_handle.tell() | |
self._file_handle.seek(0, 0) | |
def iter_index_pages(self): | |
index = IndexPage( | |
self._file_handle, | |
offset=self._offset | |
) | |
self._offset += index.get_page_size() | |
while True: | |
yield index | |
if self._offset == self._file_size: | |
break | |
self._file_handle.seek( | |
self._offset | |
) | |
try: | |
index = IndexPage( | |
self._file_handle, | |
offset=self._offset | |
) | |
except InvalidIndxPageHeader as error: | |
logging.error(error) | |
break | |
self._offset += index.get_page_size() | |
class ComplexEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, bytes): | |
return obj.hex() | |
return json.JSONEncoder.default(self, obj) | |
def main(): | |
filename = sys.argv[1] | |
out_template = None | |
if len(sys.argv) > 2: | |
out_template = sys.argv[2] | |
with open(filename, 'rb') as fh: | |
obj_id_file = ObjectIdFile( | |
fh | |
) | |
for index_page in obj_id_file.iter_index_pages(): | |
for entry in index_page.iter_entries(): | |
if out_template: | |
print( | |
out_template.format( | |
**entry.as_dict() | |
) | |
) | |
else: | |
print( | |
json.dumps(entry.as_dict(), cls=ComplexEncoder) | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment