Created
March 18, 2023 10:52
-
-
Save neko-neko-nyan/82b6119a23eb65cc5b0222bb8448310b to your computer and use it in GitHub Desktop.
ROFL file parser (riot game / league of legends replay file)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import dataclasses | |
import enum | |
import gzip | |
import io | |
import json | |
import struct | |
from pprint import pprint | |
from Crypto.Cipher import Blowfish | |
ROFL_FILE = "C:\\Users\\neko\\Documents\\League of Legends\\Replays\\RU-428503030.rofl" | |
@dataclasses.dataclass | |
class Metadata: | |
gameLength: int | |
gameVersion: str | |
lastGameChunkId: int | |
lastKeyFrameId: int | |
players: list[dict[str, str]] | |
@dataclasses.dataclass | |
class Header: | |
magic: bytes | |
magic2: bytes | |
header_length: int | |
file_length: int | |
metadata_offset: int | |
metadata_length: int | |
payload_header_offset: int | |
payload_header_length: int | |
payload_offset: int | |
@classmethod | |
def read(cls, f): | |
magic = f.read(6) | |
magic2 = f.read(256) | |
hl = int.from_bytes(f.read(2), 'little') | |
fl = int.from_bytes(f.read(4), 'little') | |
mo = int.from_bytes(f.read(4), 'little') | |
ml = int.from_bytes(f.read(4), 'little') | |
pho = int.from_bytes(f.read(4), 'little') | |
phl = int.from_bytes(f.read(4), 'little') | |
po = int.from_bytes(f.read(4), 'little') | |
return cls(magic, magic2, hl, fl, mo, ml, pho, phl, po) | |
def check(self): | |
if self.magic != b'RIOT\0\0': | |
raise RuntimeError("Invalid magic number") | |
if self.header_length != 288: | |
raise RuntimeError("Invalid header length") | |
@dataclasses.dataclass | |
class PayloadHeader: | |
game_id: int | |
game_length: int | |
keyframe_count: int | |
chunk_count: int | |
end_startup_chunk_id: int | |
start_game_chunk_id: int | |
keyframe_interval: int | |
encryption_key_length: int | |
encryption_key: bytes | |
@classmethod | |
def read(cls, f): | |
gi = int.from_bytes(f.read(8), 'little') | |
gl = int.from_bytes(f.read(4), 'little') | |
cc = int.from_bytes(f.read(4), 'little') | |
esc = int.from_bytes(f.read(4), 'little') | |
sgc = int.from_bytes(f.read(4), 'little') | |
ki = int.from_bytes(f.read(4), 'little') | |
unk = int.from_bytes(f.read(4), 'little') | |
ekl = int.from_bytes(f.read(2), 'little') | |
ek = base64.b64decode(f.read(ekl)) | |
dec = Blowfish.new(str(gi).encode('ascii'), Blowfish.MODE_ECB) | |
ek = dec.decrypt(ek) | |
ek = ek[:-ek[-1]] | |
return cls(gi, gl, cc, esc, sgc, ki, unk, ekl, ek) | |
@dataclasses.dataclass | |
class ChunkHeader: | |
id: int | |
is_keyframe: bool | |
length: int | |
next_id: int | |
offset: int | |
@classmethod | |
def read(cls, f): | |
curr_id = int.from_bytes(f.read(4), 'little') | |
kf = f.read(1) == b'\x01' | |
length = int.from_bytes(f.read(4), 'little') | |
next_id = int.from_bytes(f.read(4), 'little') | |
offset = int.from_bytes(f.read(4), 'little') | |
return cls(curr_id, kf, length, next_id, offset) | |
class Chunk: | |
def __init__(self, header: ChunkHeader, data: bytes, key: bytes): | |
dec = Blowfish.new(key, Blowfish.MODE_ECB) | |
data = dec.decrypt(data) | |
data = data[:-data[-1]] | |
data = gzip.decompress(data) | |
self.header = header | |
self.data = data | |
class ReplayFile: | |
def __init__(self, header: Header, file: io.IOBase): | |
self._header = header | |
self._f = file | |
self._metadata = None | |
self._payload_header = None | |
self._payload = None | |
@classmethod | |
def read(cls, f): | |
header = Header.read(f) | |
header.check() | |
return cls(header, f) | |
def get_metadata(self): | |
if self._metadata is None: | |
self._f.seek(self._header.metadata_offset) | |
metadata = self._f.read(self._header.metadata_length) | |
metadata = json.loads(metadata.decode("utf-8")) | |
metadata['players'] = json.loads(metadata.pop('statsJson')) | |
self._metadata = Metadata(**metadata) | |
return self._metadata | |
def get_payload_header(self): | |
if self._payload_header is None: | |
self._f.seek(self._header.payload_header_offset) | |
payload_header = self._f.read(self._header.payload_header_length) | |
with io.BytesIO(payload_header) as f: | |
payload_header = PayloadHeader.read(f) | |
self._payload_header = payload_header | |
return self._payload_header | |
def get_data(self): | |
plh = self.get_payload_header() | |
self._f.seek(self._header.payload_offset) | |
headers = [] | |
for _ in range(plh.chunk_count + plh.keyframe_count): | |
headers.append(ChunkHeader.read(self._f)) | |
chunks = [] | |
base = self._header.payload_offset + (plh.chunk_count + plh.keyframe_count) * 17 | |
for h in headers: | |
self._f.seek(base + h.offset) | |
data = self._f.read(h.length) | |
chunks.append(Chunk(h, data, plh.encryption_key)) | |
return chunks | |
class BlockFlag(enum.IntFlag): | |
ONE_BYTE_CONTENT_LENGTH = 1 << 0 | |
ONE_BYTE_PARAM = 1 << 1 | |
SAME_TYPE = 1 << 2 | |
RELATIVE_TIME = 1 << 3 | |
@dataclasses.dataclass | |
class Block: | |
flags: BlockFlag | |
channel: int | |
timestamp: float | |
length: int | |
type: int | |
param: bytes | |
content: bytes | |
@classmethod | |
def read(cls, f, prev: 'Block'): | |
marker = f.read(1) | |
if not marker: | |
return None | |
marker = int.from_bytes(marker, 'little') | |
flags = BlockFlag(marker >> 4) | |
channel = marker & 0xF | |
ts = int.from_bytes(f.read(1), 'little') if flags & BlockFlag.RELATIVE_TIME else struct.unpack('<f', f.read(4))[0] | |
cl = int.from_bytes(f.read(1), 'little') if flags & BlockFlag.ONE_BYTE_CONTENT_LENGTH else int.from_bytes(f.read(4), 'little') | |
t = prev.type if flags & BlockFlag.SAME_TYPE else int.from_bytes(f.read(2), 'little') | |
p = f.read(1) if flags & BlockFlag.ONE_BYTE_PARAM else f.read(4) | |
c = f.read(cl) | |
return cls(flags, channel, ts, cl, t, p, c) | |
def parse_chunk(f): | |
pb = None | |
while True: | |
b = Block.read(f, pb) | |
if b is None: | |
return | |
pb = b | |
yield b | |
with open(ROFL_FILE, "rb") as f: | |
rf = ReplayFile.read(f) | |
# print(rf.get_payload_header()) | |
for i in rf.get_data(): | |
with io.BytesIO(i.data) as f2: | |
for x in parse_chunk(f2): | |
if x.channel == 3: | |
print(x) | |
input("?") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment