Created
January 12, 2022 21:54
-
-
Save rosmo/29200c1aedb991ce55942c4ae8b54edd to your computer and use it in GitHub Desktop.
TP-Link X90 Deco API example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from requests.api import request | |
from Crypto.Cipher import PKCS1_OAEP, PKCS1_v1_5 | |
from Crypto.Cipher import AES | |
from Crypto.PublicKey import RSA | |
from Crypto.Util.Padding import pad | |
from Crypto.Util.number import bytes_to_long | |
import base64 | |
import requests | |
import string | |
import random | |
from aiohttp.hdrs import ( | |
ACCEPT, | |
COOKIE, | |
PRAGMA, | |
REFERER, | |
CONNECTION, | |
KEEP_ALIVE, | |
USER_AGENT, | |
CONTENT_TYPE, | |
CACHE_CONTROL, | |
ACCEPT_ENCODING, | |
ACCEPT_LANGUAGE | |
) | |
import time | |
import re | |
import hashlib | |
import json | |
import codecs | |
import logging | |
class C6: | |
def __init__(self, host, username, password): | |
self.host = host | |
self.username = username | |
self.password = password | |
self.pubkey = '' | |
self.jsessionId = '' | |
self.token = '' | |
self.stok = '' | |
self.sysauth = '' | |
self.rsa_public_key = [] | |
self.public_key = '' | |
self.password_rsa_public_key = [] | |
self.rsa_seq = '' | |
self.aes_key = '1641928074282809' | |
self.aes_iv = '1641928074282186' | |
self.pad_chars = '' | |
for c in range(16): | |
self.pad_chars += chr(c) | |
hash_str = "{}{}".format(self.username, self.password) | |
self.password_hash = hashlib.md5(hash_str.encode("utf-8")).hexdigest() | |
self.parse_macs_hyphens = re.compile('[0-9A-F]{2}-[0-9A-F]{2}-' + | |
'[0-9A-F]{2}-[0-9A-F]{2}-' + | |
'[0-9A-F]{2}-[0-9A-F]{2}') | |
self.parse_macs_colons = re.compile('[0-9A-F]{2}:[0-9A-F]{2}:' + | |
'[0-9A-F]{2}:[0-9A-F]{2}:' + | |
'[0-9A-F]{2}:[0-9A-F]{2}') | |
def scan_devices(self): | |
self._update_info() | |
self._log_out() | |
print(self.last_results.keys()) | |
def get_device_name(self, device): | |
return self.last_results.get(device) | |
def _get_auth_tokens(self): | |
print("Retrieving auth tokens...") | |
url = 'http://{}/cgi-bin/luci/;stok=/login?form=login' \ | |
.format(self.host) | |
referer = 'http://{}/webpages/login.html'.format(self.host) | |
# If possible implement RSA encryption of password here. | |
response = requests.post( | |
url, params={'operation': 'login', 'username': self.username, | |
'password': self.password}, | |
headers={REFERER: referer}, timeout=4) | |
try: | |
self.stok = response.json().get('data').get('stok') | |
print(self.stok) | |
regex_result = re.search( | |
'sysauth=(.*);', response.headers['set-cookie']) | |
self.sysauth = regex_result.group(1) | |
print(self.sysauth) | |
return True | |
except (ValueError, KeyError, AttributeError) as _: | |
print("Couldn't fetch auth tokens! Response was: %s", | |
response.text) | |
return False | |
def _get_auth_tokens_rsa(self): | |
print("Getting public RSA key") | |
referer = 'http://{}/webpages/login.html'.format(self.host) | |
# Fetch 1024-bit RSA public key used specifically for encrypting | |
# the user's password | |
url = 'http://{}/cgi-bin/luci/;stok=/login' \ | |
.format(self.host) | |
response = requests.post( | |
url, params={'form': 'keys'}, | |
data={'operation': 'read'}, | |
headers={REFERER: referer}, timeout=4) | |
try: | |
self.password_rsa_public_key = response.json().get('result').get('password') | |
except (ValueError, KeyError, AttributeError) as _: | |
print("Couldn't fetch password RSA keys! Response was: %s", | |
response.text) | |
return False | |
# Fetch 512-bit RSA public key used to encrypt the "signature" | |
# (which is actually URL-encoded string with AES key, IV, hash of | |
# username + password and sequence number) | |
url = 'http://{}/cgi-bin/luci/;stok=/login' \ | |
.format(self.host) | |
response = requests.post( | |
url, params={'form': 'auth'}, | |
data={'operation': 'read'}, | |
headers={REFERER: referer}, timeout=4) | |
try: | |
self.rsa_public_key = response.json().get('result').get('key') | |
self.rsa_seq = response.json().get('result').get('seq') | |
except (ValueError, KeyError, AttributeError) as _: | |
print("Couldn't fetch data RSA public key! Response was: %s", | |
response.text) | |
return False | |
# Create MD5 hash of username concatenated with password | |
rsa_modulus_n = bytes_to_long(bytes.fromhex(self.password_rsa_public_key[0])) | |
rsa_public_exp = bytes_to_long(bytes.fromhex(self.password_rsa_public_key[1])) | |
public_key = RSA.construct((rsa_modulus_n, rsa_public_exp)) | |
pkcs1_encryptor = PKCS1_v1_5.new(public_key) | |
password_encrypted = pkcs1_encryptor.encrypt(self.password.encode('utf-8')) | |
password_hex = codecs.encode(password_encrypted, 'hex').decode('utf-8') | |
auth_data = json.dumps({ | |
"params": {"password": password_hex.upper()}, | |
"operation": "login", | |
}) | |
# Encrypt the authentication JSON with 128-bit AES in CBC mode | |
cipher = AES.new(self.aes_key.encode('utf-8'), AES.MODE_CBC, self.aes_iv.encode('utf-8')) | |
ciphertext = cipher.encrypt(pad(auth_data.encode('utf-8'), 16)) | |
# Encode ciphertext in base64 | |
ciphertext_base64 = base64.b64encode(ciphertext) | |
# Construct the "sign" parameter | |
encrypt_str = "k={}&i={}&h={}&s={}".format(self.aes_key, self.aes_iv, self.password_hash, self.rsa_seq + len(ciphertext_base64)) | |
rsa_modulus_n = bytes_to_long(bytes.fromhex(self.rsa_public_key[0])) | |
rsa_public_exp = bytes_to_long(bytes.fromhex(self.rsa_public_key[1])) | |
self.public_key = RSA.construct((rsa_modulus_n, rsa_public_exp)) | |
pkcs1_encryptor = PKCS1_v1_5.new(self.public_key) | |
if len(encrypt_str) > 53: | |
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str[0:53].encode('utf-8')) | |
encrypted_str += pkcs1_encryptor.encrypt(encrypt_str[53:].encode('utf-8')) | |
else: | |
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str.encode('utf-8')) | |
# Convert signature to hex characters | |
signature = codecs.encode(encrypted_str, 'hex').decode('utf-8') | |
print("Retrieving auth tokens...") | |
referer = 'http://{}/webpages/index.html'.format(self.host) | |
url = 'http://{}/cgi-bin/luci/;stok=/login'.format(self.host) | |
# If possible implement RSA encryption of password here. | |
response = requests.post( | |
url, params={'form': 'login'}, | |
data={'sign': signature, 'data': ciphertext_base64}, | |
headers={ | |
REFERER: referer, | |
"Content-Type": "application/json", | |
"X-Requested-With": "XMLHttpRequest" # This header is required for sure | |
}) | |
try: | |
result = response.json().get('data') | |
result_decoded = base64.b64decode(result) | |
cipher = AES.new(self.aes_key.encode("utf-8"), AES.MODE_CBC, self.aes_iv.encode("utf-8")) | |
# Result plaintext will be JSON encoded | |
result_plaintext = cipher.decrypt(result_decoded).decode("utf-8") | |
# Decrypted data may be right-padded | |
result_json = json.loads(result_plaintext.rstrip(self.pad_chars)) | |
self.stok = result_json['result']['stok'] | |
regex_result = re.search( | |
'sysauth=(.*);', response.headers['set-cookie']) | |
self.sysauth = regex_result.group(1) | |
return True | |
except (ValueError, KeyError, AttributeError) as e: | |
print("Couldn't fetch auth tokens! Response was %d: %s" % | |
(response.status_code, response.text)) | |
return False | |
def _get_signature_and_data(self, data={}): | |
encoded_data = json.dumps(data) | |
cipher = AES.new(self.aes_key.encode('utf-8'), AES.MODE_CBC, self.aes_iv.encode('utf-8')) | |
ciphertext = cipher.encrypt(pad(encoded_data.encode('utf-8'), 16)) | |
ciphertext_base64 = base64.b64encode(ciphertext) | |
encrypt_str = "h={}&s={}".format(self.password_hash, self.rsa_seq + len(ciphertext_base64)) | |
pkcs1_encryptor = PKCS1_v1_5.new(self.public_key) | |
if len(encrypt_str) > 53: | |
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str[0:53].encode('utf-8')) | |
encrypted_str += pkcs1_encryptor.encrypt(encrypt_str[53:].encode('utf-8')) | |
else: | |
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str.encode('utf-8')) | |
# Convert signature to hex characters | |
signature = codecs.encode(encrypted_str, 'hex').decode('utf-8') | |
return signature, ciphertext_base64 | |
def _get_data(self, data): | |
return ciphertext_base64 | |
def _decrypt_json_response(self, data): | |
try: | |
cipher = AES.new(self.aes_key.encode('utf-8'), AES.MODE_CBC, self.aes_iv.encode('utf-8')) | |
plaintext = cipher.decrypt(base64.b64decode(data)) | |
return json.loads(plaintext.decode('utf-8').rstrip(self.pad_chars)) | |
except Exception as e: | |
print(e) | |
def _update_info(self): | |
print("[C6] Loading wireless clients...") | |
if (self.stok == '') or (self.sysauth == ''): | |
self._get_auth_tokens_rsa() | |
url = 'http://{}/cgi-bin/luci/;stok={}/admin/client'.format(self.host, self.stok) | |
referer = 'http://{}/webpages/index.html'.format(self.host) | |
origin = 'http://{}'.format(self.host) | |
signature, data = self._get_signature_and_data({ | |
"operation": "read", | |
"params": { "device_mac": "default" }, | |
}) | |
response = requests.post( | |
url, headers={ | |
REFERER: referer, | |
"Origin": origin, | |
"Content-Type": "application/json", | |
"X-Requested-With": "XMLHttpRequest" | |
}, | |
params={"form": "client_list"}, | |
data={ | |
"sign": signature, | |
"data": data, | |
}, | |
cookies={'sysauth': self.sysauth}, timeout=5) | |
try: | |
json_response = self._decrypt_json_response(response.json().get('data')) | |
if json_response.get('error_code') == 0: | |
result = json_response.get('result').get('client_list') | |
else: | |
print( | |
"An unknown error happened while fetching data") | |
return False | |
except ValueError: | |
print(response.text) | |
print("Router didn't respond with JSON. " | |
"Check if credentials are correct") | |
return False | |
if result: | |
self.last_results = { | |
device['mac'].replace('-', ':'): device['mac'] | |
for device in result | |
} | |
return True | |
return False | |
def _log_out(self): | |
print("Logging out of router admin interface...") | |
url = ('http://{}/cgi-bin/luci/;stok={}/admin/system?' | |
'form=logout').format(self.host, self.stok) | |
referer = 'http://{}/webpages/index.html'.format(self.host) | |
requests.post( | |
url, params={'operation': 'write'}, headers={REFERER: referer}, | |
cookies={'sysauth': self.sysauth}) | |
self.stok = '' | |
self.sysauth = '' | |
# Use this to dump HTTP requests: | |
#import http.client as http_client | |
#http_client.HTTPConnection.debuglevel = 1 | |
#logging.basicConfig() | |
#logging.getLogger().setLevel(logging.DEBUG) | |
#requests_log = logging.getLogger("requests.packages.urllib3") | |
#requests_log.setLevel(logging.DEBUG) | |
#requests_log.propagate = True | |
c6 = C6('192.168.1.1', 'admin', 'YOUR-PASSWORD') | |
c6.scan_devices() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment