Skip to content

Instantly share code, notes, and snippets.

@tosunkaya
Forked from socram8888/relay.py
Created August 14, 2024 09:21
Show Gist options
  • Save tosunkaya/f2143dad19490c297be867d84b9cf00b to your computer and use it in GitHub Desktop.
Save tosunkaya/f2143dad19490c297be867d84b9cf00b to your computer and use it in GitHub Desktop.
Tp-Link Tapo C200 video relaying test
#!/usr/bin/env pyton3
import uuid
import requests
import ssl
import socket
import http.client
import json
import sys
# Tp-Link mail
cloudmail = '--redacted--'
# Tp-Link password
cloudpass = '--redacted--'
# User-assigned camera name
cloudcamera = 'Entrance'
# 'VGA' or 'HD' for my Tapo C200s
resolution = 'HD'
# Hardcoded in application
accountserver = 'https://n-wap-gw.tplinkcloud.com'
#certs = 'tplinkcloud-com.pem'
certs = False
clientuuid = str(uuid.uuid4())
loginbody = {
'method': 'login',
'params': {
'appType': 'Tapo_Android',
'cloudUserName': cloudmail,
'cloudPassword': cloudpass,
'terminalUUID': clientuuid
}
}
loginreply = requests.post(accountserver, json=loginbody, verify=certs).json()
if loginreply['error_code'] != 0:
print('Login error', file=sys.stderr)
print(loginreply, file=sys.stderr)
sys.exit(1)
token = loginreply['result']['token']
devices = []
totaldevs = 9001
while len(devices) < totaldevs:
devlistparams = {
'token': token
}
devlistbody = {
'method': 'getDeviceListByPage',
'params': {
'deviceTypeList': [
'SMART.IPCAMERA'
],
'index': len(devices),
'limit': 20
}
}
devlistreply = requests.post(accountserver, json=devlistbody, params=devlistparams, verify=certs).json()
if devlistreply['error_code'] != 0:
print('Listing error', file=sys.stderr)
print(devlistreply, file=sys.stderr)
sys.exit(1)
totaldevs = devlistreply['result']['totalNum']
devices.extend(devlistreply['result']['deviceList'])
camerainfo = [x for x in devices if x['alias'] == cloudcamera]
if len(camerainfo) != 1:
print('Cannot find camera "%s"' % cloudcamera, file=sys.stderr)
print('Linked devices:', file=sys.stderr)
print(devices, file=sys.stderr)
sys.exit(1)
camerainfo = camerainfo[0]
# This is the logic in the app, no kidding
relayserver = 'use1-relay-dcipc.i.tplinknbu.com'
if 'aps' in camerainfo['appServerUrl']:
relayserver = 'aps1-relay-dcipc.i.tplinknbu.com'
elif 'euw' in camerainfo['appServerUrl']:
relayserver = 'euw1-relay-dcipc.i.tplinknbu.com'
ptparams = {
'token': token
}
ptbody = {
'method': 'passthrough',
'params': {
'deviceId': camerainfo['deviceId'],
'requestData': {
'method':'do',
'relay': {
'request_relay': {
'token': token,
'version': '1.3',
'stream_type': 0,
'protocol': 0,
'relay_server': relayserver,
'relay_port': 80,
'relays_port': 443,
'relay_req_url': '/relayservice?deviceid=' + camerainfo['deviceId'] + '&type=video&resolution=' + resolution,
'local_req_url': '/stream'
}
}
}
}
}
ptreply = requests.post(camerainfo['appServerUrl'], params=ptparams, json=ptbody, verify=certs).json()
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with open('body.bin', 'wb') as f:
with socket.create_connection((relayserver, 443)) as raw_sock:
with context.wrap_socket(raw_sock, server_hostname=relayserver) as ssl_sock:
ssl_file = ssl_sock.makefile('rwb')
# Write relay header
relayrequestheaders = (
'POST /relayservice?deviceid=' + camerainfo['deviceId'] + '&type=video&resolution=' + resolution + ' HTTP/1.1\r\n' +
'User-Agent: Client=TP-Link_Tapo_Android Android 2.2.36/1.3\r\n' +
'Keep-Relay: 3600\r\n' +
'Accept: */*\r\n' +
'Host: ' + relayserver + ':443\r\n' +
'Content-Type: multipart/mixed;boundary=--client-stream-boundary--\r\n' +
'Content-Length: 9223372036854775807\r\n' +
'X-token: ' + token + '\r\n' +
'X-Client-Model: SM-A202F\r\n' +
'X-Client-UUID: ' + clientuuid + '\r\n' +
'X-Client-SessionID: ' + ptreply['result']['responseData']['result']['sid'] + '\r\n' +
'X-Redirect-Times: 0\r\n' +
'Cookie: ' + ptreply['result']['responseData']['result']['elb_cookie'] + '\r\n' +
'\r\n'
)
relayrequestheaders = relayrequestheaders.encode('ascii')
ssl_file.write(relayrequestheaders)
# Write boundary marker
ssl_file.write(b'----client-stream-boundary--\r\n')
# Write stream header
streamrequestbody = {
"type": "request",
"seq": 1,
"params": {
"method": "get",
"preview": {
"channels": [0],
"resolutions": [resolution],
"audio": ["default"]
}
}
}
streamrequestbody = json.dumps(streamrequestbody).encode('ascii')
streamrequestheader = (
'Content-Type: application/json\r\n' +
'Content-Length: ' + str(len(streamrequestbody)) + '\r\n' +
'\r\n'
)
streamrequestheader = streamrequestheader.encode('ascii')
ssl_file.write(streamrequestheader)
ssl_file.write(streamrequestbody)
# Flush as the virtual file is buffered
ssl_file.flush()
# Read relay header
relaystatus = ssl_file.readline().decode('ascii').rstrip('\r\n')
if relaystatus != 'HTTP/1.1 200 OK':
print('Unexpected relay status: ' + relaystatus, file=sys.stderr)
sys.exit(1)
relayreplyheaders = http.client.parse_headers(ssl_file)
while True:
boundaryline = ssl_file.readline().decode('ascii')
if len(boundaryline) == 0:
print('Reached EOL', file=sys.stderr)
break
if boundaryline != '--' + relayreplyheaders.get_boundary() + '\r\n':
print('Unexpected boundary: %s' % boundaryline, file=sys.stderr)
sys.exit(1)
chunkheaders = http.client.parse_headers(ssl_file)
chunktype = chunkheaders.get_content_type()
chunklength = int(chunkheaders.get('content-length'))
if not chunktype.startswith('video/'):
print('Ignoring ' + chunktype, file=sys.stderr)
ssl_file.read(chunklength)
else:
sys.stdout.buffer.write(ssl_file.read(chunklength))
boundaryline = ssl_file.readline().decode('ascii')
if boundaryline != '\r\n':
print('End of chunk new line missing', file=sys.stderr)
print(boundaryline, file=sys.stderr)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment