Created
October 8, 2023 15:44
-
-
Save irsl/e6b6e635dc8dfba9ea3e8b6263f119c1 to your computer and use it in GitHub Desktop.
Find tapo devices over LAN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
root@host:/# python3 tapo_scanner.py | |
{ | |
"device_id": "36612bba[redacted]9005d5f", | |
"owner": "E4483[redacted]BCBD1", | |
"device_type": "SMART.TAPOPLUG", | |
"device_model": "P110(EU)", | |
"ip": "10.6.8.113", | |
"mac": "28-87-BA-48-80-14", | |
"is_support_iot_cloud": true, | |
"obd_src": "tplink", | |
"factory_default": false, | |
"mgt_encrypt_schm": { | |
"is_support_https": false, | |
"encrypt_type": "AES", | |
"http_port": 80, | |
"lv": 2 | |
} | |
} | |
{ | |
"device_id": "2409a2167[redacted]dcc8ab2faf6", | |
"owner": "E4483[redacted]BCBD1", | |
"device_type": "SMART.TAPOHUB", | |
"device_model": "H100(EU)", | |
"ip": "10.6.8.134", | |
"mac": "48-22-54-2D-89-02", | |
"is_support_iot_cloud": true, | |
"obd_src": "tplink", | |
"factory_default": false, | |
"mgt_encrypt_schm": { | |
"is_support_https": false, | |
"encrypt_type": "AES", | |
"http_port": 80, | |
"lv": 2 | |
} | |
} | |
{ | |
"device_id": "59cb47973[redacted]1522ae5fd16ca", | |
"owner": "E4483[redacted]BCBD1", | |
"device_type": "SMART.TAPOPLUG", | |
"device_model": "P110(EU)", | |
"ip": "10.6.8.139", | |
"mac": "30-DE-4B-7A-C1-93", | |
"is_support_iot_cloud": true, | |
"obd_src": "tplink", | |
"factory_default": false, | |
"mgt_encrypt_schm": { | |
"is_support_https": false, | |
"encrypt_type": "AES", | |
"http_port": 80, | |
"lv": 2 | |
} | |
} | |
{ | |
"device_id": "4c03fd8[redacted]270d69dee1394", | |
"owner": "E4483[redacted]BCBD1", | |
"device_type": "SMART.TAPOPLUG", | |
"device_model": "P110(EU)", | |
"ip": "10.6.8.189", | |
"mac": "AC-15-A2-E4-3E-16", | |
"is_support_iot_cloud": true, | |
"obd_src": "tplink", | |
"factory_default": false, | |
"mgt_encrypt_schm": { | |
"is_support_https": false, | |
"encrypt_type": "AES", | |
"http_port": 80, | |
"lv": 2 | |
} | |
} | |
{ | |
"device_id": "300484c6[redacted]bdc5decb7ac7f", | |
"owner": "E4483[redacted]BCBD1", | |
"device_type": "SMART.TAPOPLUG", | |
"device_model": "P110(EU)", | |
"ip": "10.6.8.249", | |
"mac": "30-DE-4B-7A-D4-E2", | |
"is_support_iot_cloud": true, | |
"obd_src": "tplink", | |
"factory_default": false, | |
"mgt_encrypt_schm": { | |
"is_support_https": false, | |
"encrypt_type": "AES", | |
"http_port": 80, | |
"lv": 2 | |
} | |
} | |
{ | |
"device_id": "1eee33eb8[redacted]8fa78ce7cf64", | |
"owner": "E4483[redacted]BCBD1", | |
"device_type": "SMART.TAPOPLUG", | |
"device_model": "P110(EU)", | |
"ip": "10.6.8.160", | |
"mac": "30-DE-4B-37-0C-AC", | |
"is_support_iot_cloud": true, | |
"obd_src": "tplink", | |
"factory_default": false, | |
"mgt_encrypt_schm": { | |
"is_support_https": false, | |
"encrypt_type": "AES", | |
"http_port": 80, | |
"lv": 2 | |
} | |
} | |
{ | |
"device_id": "68FCDD103[redacted]A86DF4865F41", | |
"device_name": "nappali", | |
"device_type": "SMART.IPCAMERA", | |
"device_model": "C110", | |
"ip": "10.6.8.216", | |
"mac": "9C-A2-F4-F4-D5-91", | |
"hardware_version": "1.0", | |
"firmware_version": "1.3.0 Build 220830 Rel.69880n(4555)", | |
"factory_default": false, | |
"is_support_iot_cloud": true, | |
"mgt_encrypt_schm": { | |
"is_support_https": true | |
}, | |
"encrypt_info": { | |
[redacted] | |
} | |
} | |
{ | |
"device_id": "CEC1A3789[redacted]8F2EFB29", | |
"device_name": "bej\u00e1rat (bent)", | |
"device_type": "SMART.IPCAMERA", | |
"device_model": "C110", | |
"ip": "10.6.8.229", | |
"mac": "9C-A2-F4-F4-CA-F7", | |
"hardware_version": "1.0", | |
"firmware_version": "1.3.0 Build 220830 Rel.69880n(4555)", | |
"factory_default": false, | |
"is_support_iot_cloud": true, | |
"mgt_encrypt_schm": { | |
"is_support_https": true | |
}, | |
"encrypt_info": { | |
[redacted] | |
} | |
} | |
{ | |
"device_id": "C745C38[redacted]BC70533", | |
"device_name": "Levi", | |
"device_type": "SMART.IPCAMERA", | |
"device_model": "C110", | |
"ip": "10.6.8.165", | |
"mac": "5C-62-8B-89-B7-5C", | |
"hardware_version": "2.0", | |
"firmware_version": "1.3.2 Build 221208 Rel.40424n(4555)", | |
"factory_default": false, | |
"is_support_iot_cloud": true, | |
"mgt_encrypt_schm": { | |
"is_support_https": true | |
}, | |
"encrypt_info": { | |
[redacted] | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
from a ubuntu env: | |
docker run --rm -it --network host ubuntu | |
apt update ; apt install -y python3 python3-pip | |
pip3 install pycryptodome | |
""" | |
import struct | |
import sys | |
import zlib | |
import json | |
import base64 | |
import socket | |
from Crypto.Cipher import PKCS1_v1_5 | |
from Crypto.Cipher import PKCS1_OAEP | |
from Crypto.PublicKey import RSA | |
from Crypto.Cipher import AES | |
import select | |
import base64 | |
import http.server, ssl | |
import threading | |
import os | |
import time | |
DEBUG = int(os.getenv("DEBUG") or "0") | |
PKT_ONBOARD_REQUEST = b'\x11\x00' # \x02\x0D\x87\x23' | |
PKT_ONBOARD_RESPONSE = b'"\x01' # \x02\r\x87#' | |
OUR_KEY = RSA.generate(2048) # if preferred, hardcoded key could be used: RSA.import_key("-----BEGIN PRIVATE KEY-----...") | |
OUR_PUBLIC_KEY = OUR_KEY.public_key().export_key('PEM').decode() | |
OUR_CIPHER = PKCS1_OAEP.new(OUR_KEY) | |
def eprint(*args, **kwargs): | |
if not DEBUG: return | |
print(*args, **kwargs, file=sys.stderr) | |
# note: pkcs7.PKCS7Encoder().encode is broken | |
# https://stackoverflow.com/questions/43199123/encrypting-with-aes-256-and-pkcs7-padding | |
def pkcs7_pad(input_str, block_len=16): | |
return input_str + chr(block_len-len(input_str)%block_len)*(block_len-len(input_str)%16) | |
def pkcs7_unpad(ct): | |
return ct[:-ord(ct[-1])] | |
class TpLinkCipher: | |
def __init__(self, b_arr: bytearray, b_arr2: bytearray): | |
self.iv = b_arr2 | |
self.key = b_arr | |
def encrypt(self, data): | |
data = pkcs7_pad(data) | |
cipher = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) | |
encrypted = cipher.encrypt(data.encode()) | |
return base64.b64encode(encrypted).decode().replace("\r\n","") | |
def decrypt(self, data: str): | |
aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) | |
pad_text = aes.decrypt(base64.b64decode(data.encode())).decode() | |
return pkcs7_unpad(pad_text) | |
def extract_pkt_id(packet): | |
return packet[8:12] | |
def extract_payload_from_package(packet): | |
return packet[16:] | |
def extract_payload_from_package_json(packet): | |
return json.loads(packet[16:]) | |
def build_packet_for_payload(payload, pkt_type, pkt_id=b"\x01\x02\x03\x04"): | |
len_bytes = struct.pack(">h", len(payload)) | |
skeleton = b'\x02\x00\x00\x01'+len_bytes+pkt_type+pkt_id+b'\x5A\x6B\x7C\x8D'+payload | |
calculated_crc32 = zlib.crc32(skeleton) & 0xffffffff | |
calculated_crc32_bytes = struct.pack(">I", calculated_crc32) | |
re = skeleton[0:12] + calculated_crc32_bytes + skeleton[16:] | |
return re | |
def build_packet_for_payload_json(payload, pkt_type, pkt_id=b"\x01\x02\x03\x04"): | |
return build_packet_for_payload(json.dumps(payload).encode(), pkt_type, pkt_id) | |
def process_encrypted_handshake(response): | |
encryptedSessionKey = response["result"]["encrypt_info"]["key"] | |
encryptedSessionKeyBytes = base64.b64decode(encryptedSessionKey.encode()) | |
clearSessionKeyBytes = OUR_CIPHER.decrypt(encryptedSessionKeyBytes) | |
if not clearSessionKeyBytes: | |
raise ValueError("Decryption failed!") | |
b_arr = bytearray() | |
b_arr2 = bytearray() | |
for i in range(0, 16): | |
b_arr.insert(i, clearSessionKeyBytes[i]) | |
for i in range(0, 16): | |
b_arr2.insert(i, clearSessionKeyBytes[i + 16]) | |
cipher = TpLinkCipher(b_arr, b_arr2) | |
cleartextDataBytes = cipher.decrypt(response["result"]["encrypt_info"]["data"]) | |
eprint("handshake payload decrypted as", cleartextDataBytes) | |
return json.loads(cleartextDataBytes) | |
def find_tapo_devices(timeout=3): | |
packet = build_packet_for_payload_json({"params":{"rsa_key": OUR_PUBLIC_KEY}}, PKT_ONBOARD_REQUEST) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # UDP | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, 5); | |
# sock.settimeout(2) | |
sock.sendto(packet, ("255.255.255.255", 20002)) | |
eprint("packet sent", packet) | |
pollerObject = select.poll() | |
pollerObject.register(sock, select.POLLIN) | |
before = time.time() | |
while True: | |
fdVsEvent = pollerObject.poll(100) # 0.1 sec | |
if fdVsEvent: | |
handshake_packet, addr = sock.recvfrom(2048) | |
eprint("received", addr, handshake_packet) | |
try: | |
handshake_json = extract_payload_from_package_json(handshake_packet) | |
if handshake_json["error_code"]: | |
continue | |
result = handshake_json["result"] | |
yield result | |
except: | |
pass | |
now = time.time() | |
if now - before > timeout: | |
break | |
if __name__ == "__main__": | |
for td in find_tapo_devices(): | |
print(json.dumps(td, indent=3)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment