Created
June 13, 2018 13:35
-
-
Save Killeroid/17c0695017950ca008917fda23270af2 to your computer and use it in GitHub Desktop.
Read and decrypt firefox sync records using https://github.com/mozilla-services/syncclient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import math | |
import math | |
from syncclient.client import SyncClient, FxAClient, TOKENSERVER_URL, six, hexlify, sha256 | |
from pprint import pprint | |
import json | |
import hmac | |
import hashlib | |
import base64 | |
from Crypto.Cipher import AES | |
from Crypto import Random | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="""CLI to interact with Firefox Sync""", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
parser.add_argument(dest='login', | |
help='Firefox Accounts login.') | |
parser.add_argument(dest='password', | |
help='Firefox Accounts password.') | |
args, extra = parser.parse_known_args() | |
# Login to FxA and get the keys. | |
fxaSession = FxAClient().login(args.login, args.password, keys=True) | |
fxaSession.fetch_keys() | |
# Connect to sync using FxA browserid assertion | |
bid_assertion_args = get_browserid_assertion(fxaSession) | |
client = SyncClient(*bid_assertion_args) | |
sync_keys = KeyBundle.fromMasterKey(fxaSession.keys[1], "identity.mozilla.com/picl/v1/oldsync") | |
# Fetch the sync bundle keys out of storage. | |
# They're encrypted with the account-level key. | |
keys = decrypt_payload(client.get_record('crypto', 'keys'), sync_keys) | |
pprint(("KEYS:", keys)) | |
# There's some provision for using separate key bundles for separate collections | |
# but I haven't bothered digging through to see what that's about because | |
# it doesn't seem to be in use, at least on my account. | |
if keys["collections"]: | |
raise RuntimeError("no support for per-collection key bundles yet sorry :-(") | |
# Now use those keys to decrypt the records of interest. | |
bulk_keys = KeyBundle(base64.b64decode(keys["default"][0]), base64.b64decode(keys["default"][1])) | |
for record in client.get_records('history'): | |
pprint(("HISTORY:", decrypt_payload(record, bulk_keys))) | |
def decrypt_payload(record, key_bundle): | |
j = json.loads(record["payload"]) | |
# Always check the hmac before decrypting anything. | |
expected_hmac = hmac.new(key_bundle.hmac_key, j['ciphertext'], hashlib.sha256).hexdigest() | |
if j['hmac'] != expected_hmac: | |
raise ValueError("HMAC mismatch: %s != %s" % (j['hmac'], expected_hmac)) | |
ciphertext = base64.b64decode(j['ciphertext']) | |
iv = base64.b64decode(j['IV']) | |
aes = AES.new(key_bundle.encryption_key, AES.MODE_CBC, iv) | |
plaintext = aes.decrypt(ciphertext).strip() | |
# Remove any CBC block padding, assuming it's a well-formed JSON payload. | |
plaintext = plaintext[:plaintext.rfind("}") + 1] | |
return json.loads(plaintext) | |
def get_browserid_assertion(fxaSession, tokenserver_url=TOKENSERVER_URL): | |
bid_assertion = fxaSession.get_identity_assertion(tokenserver_url) | |
_, keyB = fxaSession.keys | |
if isinstance(keyB, six.text_type): # pragma: no cover | |
keyB = keyB.encode('utf-8') | |
return bid_assertion, hexlify(sha256(keyB).digest()[0:16]) | |
class KeyBundle: | |
def __init__(self, encryption_key, hmac_key): | |
self.encryption_key = encryption_key | |
self.hmac_key = hmac_key | |
@classmethod | |
def fromMasterKey(cls, master_key, info): | |
key_material = HKDF(master_key, None, info, 2 * 32) | |
return cls(key_material[:32], key_material[32:]) | |
def HKDF_extract(salt, IKM, hashmod=hashlib.sha256): | |
"""HKDF-Extract; see RFC-5869 for the details.""" | |
if salt is None: | |
salt = b"\x00" * hashmod().digest_size | |
return hmac.new(salt, IKM, hashmod).digest() | |
def HKDF_expand(PRK, info, L, hashmod=hashlib.sha256): | |
"""HKDF-Expand; see RFC-5869 for the details.""" | |
digest_size = hashmod().digest_size | |
N = int(math.ceil(L * 1.0 / digest_size)) | |
assert N <= 255 | |
T = b"" | |
output = [] | |
for i in xrange(1, N + 1): | |
data = T + info + chr(i) | |
T = hmac.new(PRK, data, hashmod).digest() | |
output.append(T) | |
return b"".join(output)[:L] | |
def HKDF(secret, salt, info, size, hashmod=hashlib.sha256): | |
"""HKDF-extract-and-expand as a single function.""" | |
PRK = HKDF_extract(salt, secret, hashmod) | |
return HKDF_expand(PRK, info, size, hashmod) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment