Created
March 18, 2023 10:51
-
-
Save neko-neko-nyan/0ddf9563f89a6b9952a459c491069ec3 to your computer and use it in GitHub Desktop.
Simple API client / demo for KeepassXC (simulates browser plugin) in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import random | |
import nacl.public | |
import win32file | |
def _check_nonce_length(nonce: str): | |
return len(base64.decodebytes(nonce.encode())) == nacl.public.Box.NONCE_SIZE | |
def _get_nonce() -> str: | |
return base64.b64encode(random.randbytes(24)).decode() | |
def _incremented_nonce(nonce: str) -> str: | |
old_nonce = base64.decodebytes(nonce.encode()) | |
new_nonce = bytearray(old_nonce) | |
i = 0 | |
c = 1 | |
while i < len(new_nonce): | |
c += new_nonce[i] | |
new_nonce[i] = c | |
c >>= 8 | |
i += 1 | |
return base64.b64encode(new_nonce).decode() | |
def _get_nonces() -> tuple[str, str]: | |
nonce = _get_nonce() | |
incremented_nonce = _incremented_nonce(nonce) | |
return nonce, incremented_nonce | |
def _is_valid_response(response: dict, nonce: str): | |
return response.get('success') == 'true' and\ | |
_check_nonce_length(response.get('nonce')) and\ | |
response.get('nonce') == nonce | |
class KeepassError(Exception): | |
pass | |
class KeepassApiError(KeepassError): | |
def __init__(self, code, message, action, request_id=None): | |
super().__init__(message) | |
self.code = code | |
self.message = message | |
self.action = action | |
self.request_id = request_id | |
class InvalidResponseError(KeepassError): | |
def __init__(self): | |
super().__init__("Invalid response") | |
class KeepassApi: | |
def __init__(self): | |
self._client_id = None | |
self.generate_client_id() | |
self._handle = win32file.CreateFile( | |
r'\\.\pipe\org.keepassxc.KeePassXC.BrowserServer_neko', | |
win32file.GENERIC_READ | win32file.GENERIC_WRITE, | |
0, | |
None, | |
win32file.OPEN_EXISTING, | |
0, | |
None | |
) | |
self._private_key = nacl.public.PrivateKey.generate() | |
self._server_public_key = None | |
self._server_version = None | |
def generate_client_id(self): | |
self._client_id = base64.b64encode(random.randbytes(24)).decode() | |
def _send_native_message(self, action: str, req: dict, *, enable_timeout=False, trigger_unlock=False) -> dict: | |
req['action'] = action | |
req["clientID"] = self._client_id | |
if trigger_unlock: | |
req['triggerUnlock'] = 'true' | |
req_str = json.dumps(req).encode() | |
win32file.WriteFile(self._handle, req_str) | |
code, resp_str = win32file.ReadFile(self._handle, 64*1024) | |
resp = json.loads(resp_str.decode()) | |
if resp.get('action') != action: | |
raise InvalidResponseError() | |
return resp | |
def _encrypt(self, data: dict, nonce: str) -> str: | |
message_data = json.dumps(data).encode() | |
message_nonce = base64.decodebytes(nonce.encode()) | |
if self._server_public_key is not None: | |
message = nacl.public.Box(self._private_key, self._server_public_key)\ | |
.encrypt(message_data, message_nonce).ciphertext | |
if message: | |
return base64.b64encode(message).decode() | |
return '' | |
def _decrypt(self, data: str, nonce: str) -> str: | |
data = base64.decodebytes(data.encode()) | |
nonce = base64.decodebytes(nonce.encode()) | |
res = nacl.public.Box(self._private_key, self._server_public_key).decrypt(data, nonce) | |
return res.decode() | |
def send_message(self, action: str, *, | |
enable_timeout=False, trigger_unlock=False, **data): | |
data['action'] = action | |
nonce, incremented_nonce = _get_nonces() | |
request = { | |
"nonce": nonce, | |
"message": self._encrypt(data, nonce) | |
} | |
if "requestID" in data: | |
request['requestID'] = data['requestID'] | |
response = self._send_native_message(action, request, enable_timeout=enable_timeout, | |
trigger_unlock=trigger_unlock) | |
return self._process_raw_message(response, incremented_nonce) | |
def _process_raw_message(self, response: dict, nonce: str): | |
if "message" not in response or "nonce" not in response: | |
if "error" in response and "errorCode" in response: | |
raise KeepassApiError(response['errorCode'], response['error'], response['action'], | |
response.get('requestID')) | |
raise InvalidResponseError() | |
res = self._decrypt(response["message"], response["nonce"]) | |
if not res: | |
raise InvalidResponseError() | |
parsed = json.loads(res) | |
return self._process_encrypted_message(parsed, nonce) | |
def _process_encrypted_message(self, parsed: dict, nonce: str): | |
if not _is_valid_response(parsed, nonce): | |
self._connected = False | |
raise InvalidResponseError() | |
del parsed['success'] | |
del parsed['nonce'] | |
version = parsed.get('version') | |
if version: | |
del parsed['version'] | |
self._server_version = version | |
self._connected = True | |
return parsed | |
def change_public_keys(self): | |
self._connected = False | |
nonce, incremented_nonce = _get_nonces() | |
key = base64.b64encode(bytes(self._private_key.public_key)).decode() | |
resp = self._send_native_message("change-public-keys", dict(nonce=nonce, publicKey=key)) | |
server_key = self._process_encrypted_message(resp, incremented_nonce).get('publicKey') | |
if server_key: | |
self._server_public_key = nacl.public.PublicKey(base64.decodebytes(resp.get('publicKey').encode())) | |
class KeepassXCClient: | |
def __init__(self): | |
self.api = KeepassApi() | |
self._db_hash = None | |
self._connected = False | |
self._key_ring = {} | |
def _send_message(self, action: str, *, nonce: str | None = None, | |
enable_timeout=False, trigger_unlock=False, **data): | |
return self.api.send_message(action, nonce=nonce, enable_timeout=enable_timeout, trigger_unlock=trigger_unlock, | |
**data) | |
def get_database_hash(self, connected_keys: list[str] | None = None): | |
if connected_keys is None: | |
hash = self._send_message("get-databasehash").get('hash') | |
else: | |
response = self._send_message("get-databasehash", connectedKeys=connected_keys) | |
old_hash = response.get('oldHash') | |
hash = response.get('hash') | |
self._db_hash = hash | |
if hash == '': | |
return None | |
return hash | |
def associate(self, id_key: str | None = None): | |
key = base64.b64encode(bytes(self.api._private_key.public_key)).decode() | |
if id_key is None: | |
response = self._send_message("associate", key=key) | |
else: | |
response = self._send_message("associate", key=key, idKey=id_key) | |
hash = response.get('hash') | |
id_key = id_key or key | |
self._key_ring[self._db_hash] = (self._db_hash, response['id'], id_key) | |
def test_associate(self, id_key: str | None = None): | |
key = base64.b64encode(bytes(self.api._private_key.public_key)).decode() | |
return self._send_message("test-associate", key=key, id=id_key or key)["id", "hash"] | |
def get_logins(self, url: str, id: str = "", form_url: str = "", http_auth: bool = False): | |
# id unused, returned in response | |
return self._send_message("get-logins", url=url, id=id, submitUrl=form_url, | |
keys=[dict(id=id, key=key) for hash, id, key in self._key_ring.values()], | |
httpAuth="true" if http_auth else "false")['entries', "id", "hash", "count"] | |
actions = ( | |
"generate-password", # async, with requestID | |
"set-login", # count=null, entries=null, error="success" or "error", hash | |
"lock-database", # <empty> | |
"get-database-groups", # groups | |
"create-new-group", # name, uuid | |
"get-totp", # totp | |
"delete-entry", # success="true" or "false" | |
"request-autotype" # <empty> | |
) | |
def pipe_client(): | |
client = KeepassXCClient() | |
client.api.change_public_keys() | |
print(client.get_database_hash()) | |
print(client.associate()) | |
print(client.get_logins("https://vk.com/", form_url="https://vk.com/")) | |
if __name__ == '__main__': | |
pipe_client() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment