-
-
Save gxfxyz/48072a72be3a169bc43549e676713201 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Basic Dahua RPC wrapper. | |
Example: | |
from dahua_rpc import DahuaRpc | |
dahua = DahuaRpc(host="192.168.1.10", username="admin", password="password") | |
dahua.login() | |
# Get the current time on the device | |
print(dahua.current_time()) | |
# Set display to 4 grids with first view group | |
dahua.set_split(mode=4, view=1) | |
# Make a raw RPC request to get serial number | |
print(dahua.request(method="magicBox.getSerialNo")) | |
Dependencies: | |
pip install requests | |
""" | |
import sys | |
import hashlib | |
import requests | |
if (sys.version_info > (3, 0)): | |
unicode = str | |
class DahuaRpc(object): | |
def __init__(self, host, username, password): | |
self.host = host | |
self.username = username | |
self.password = password | |
self.s = requests.Session() | |
self.session_id = None | |
self.id = 0 | |
def request(self, method, params=None, object_id=None, extra=None, url=None): | |
"""Make a RPC request.""" | |
self.id += 1 | |
data = {'method': method, 'id': self.id} | |
if params is not None: | |
data['params'] = params | |
if object_id: | |
data['object'] = object_id | |
if extra is not None: | |
data.update(extra) | |
if self.session_id: | |
data['session'] = self.session_id | |
if not url: | |
url = "http://{}/RPC2".format(self.host) | |
r = self.s.post(url, json=data) | |
return r.json() | |
def login(self): | |
"""Dahua RPC login. | |
Reversed from rpcCore.js (login, getAuth & getAuthByType functions). | |
Also referenced: | |
https://gist.github.com/avelardi/1338d9d7be0344ab7f4280618930cd0d | |
""" | |
# login1: get session, realm & random for real login | |
url = 'http://{}/RPC2_Login'.format(self.host) | |
method = "global.login" | |
params = {'userName': self.username, | |
'password': "", | |
'clientType': "Dahua3.0-Web3.0"} | |
r = self.request(method=method, params=params, url=url) | |
self.session_id = r['session'] | |
realm = r['params']['realm'] | |
random = r['params']['random'] | |
# Password encryption algorithm | |
# Reversed from rpcCore.getAuthByType | |
pwd_phrase = self.username + ":" + realm + ":" + self.password | |
if isinstance(pwd_phrase, unicode): | |
pwd_phrase = pwd_phrase.encode('utf-8') | |
pwd_hash = hashlib.md5(pwd_phrase).hexdigest().upper() | |
pass_phrase = self.username + ':' + random + ':' + pwd_hash | |
if isinstance(pass_phrase, unicode): | |
pass_phrase = pass_phrase.encode('utf-8') | |
pass_hash = hashlib.md5(pass_phrase).hexdigest().upper() | |
# login2: the real login | |
params = {'userName': self.username, | |
'password': pass_hash, | |
'clientType': "Dahua3.0-Web3.0", | |
'authorityType': "Default", | |
'passwordType': "Default"} | |
r = self.request(method=method, params=params, url=url) | |
if r['result'] is False: | |
raise LoginError(str(r)) | |
def set_config(self, params): | |
"""Set configurations.""" | |
method = "configManager.setConfig" | |
r = self.request(method=method, params=params) | |
if r['result'] is False: | |
raise RequestError(str(r)) | |
def reboot(self): | |
"""Reboot the device.""" | |
# Get object id | |
method = "magicBox.factory.instance" | |
params = "" | |
r = self.request(method=method, params=params) | |
object_id = r['result'] | |
# Reboot | |
method = "magicBox.reboot" | |
r = self.request(method=method, params=params, object_id=object_id) | |
if r['result'] is False: | |
raise RequestError(str(r)) | |
def current_time(self): | |
"""Get the current time on the device.""" | |
method = "global.getCurrentTime" | |
r = self.request(method=method) | |
if r['result'] is False: | |
raise RequestError(str(r)) | |
return r['params']['time'] | |
def ntp_sync(self, address, port, time_zone): | |
"""Synchronize time with NTP.""" | |
# Get object id | |
method = "netApp.factory.instance" | |
params = "" | |
r = self.request(method=method, params=params) | |
object_id = r['result'] | |
# NTP sync | |
method = "netApp.adjustTimeWithNTP" | |
params = {'Address': address, 'Port': port, 'TimeZone': time_zone} | |
r = self.request(method=method, params=params, object_id=object_id) | |
if r['result'] is False: | |
raise RequestError(str(r)) | |
def get_split(self): | |
"""Get display split mode.""" | |
# Get object id | |
method = "split.factory.instance" | |
params = {'channel': 0} | |
r = self.request(method=method, params=params) | |
object_id = r['result'] | |
# Get split mode | |
method = "split.getMode" | |
params = "" | |
r = self.request(method=method, params=params, object_id=object_id) | |
if r['result'] is False: | |
raise RequestError(str(r)) | |
mode = int(r['params']['mode'][5:]) | |
view = int(r['params']['group']) + 1 | |
return mode, view | |
def set_split(self, mode, view): | |
"""Set display split mode.""" | |
if isinstance(mode, int): | |
mode = "Split{}".format(mode) | |
group = view - 1 | |
# Get object id | |
method = "split.factory.instance" | |
params = {'channel': 0} | |
r = self.request(method=method, params=params) | |
object_id = r['result'] | |
# Set split mode | |
method = "split.setMode" | |
params = {'displayType': "General", | |
'workMode': "Local", | |
'mode': mode, | |
'group': group} | |
r = self.request(method=method, params=params, object_id=object_id) | |
if r['result'] is False: | |
raise RequestError(str(r)) | |
class LoginError(Exception): | |
pass | |
class RequestError(Exception): | |
pass |
Hi,
do you have any specs about Dahua RPC2 protocol? Where may I find it?
Thanks in advance.
Hi, the code was reverse-engineered from the web interface. I have no idea where to find the specs.
Just wanted to say thank you for providing this. I'm going to use it it for parts of https://github.com/rroller/dahua
Thanks again. I'll make sure to credit you in the code.
Just wanted to say thank you for providing this. I'm going to use it it for parts of https://github.com/rroller/dahua
Thanks again. I'll make sure to credit you in the code.
Nice project. I'm also a Home Assistant user and going to use it later. Thanks for your work. 👍
Very very cool, thanks for your work!
Just wanted to say thank you for providing this. I'm going to use it it for parts of https://github.com/rroller/dahua
Thanks again. I'll make sure to credit you in the code.Nice
Just wanted to say thank you for providing this. I'm going to use it it for parts of https://github.com/rroller/dahua
Thanks again. I'll make sure to credit you in the code.Nice project. I'm also a Home Assistant user and going to use it later. Thanks for your work. 👍
Hi both,
I’m just a beginner with very very little knowledge about programming, I’ve integrated everything I use with Home assistant but I’m stuck with Ezviz DB2C wireless chime, I’ll be happy to provide you the china open ezviz account access to get the api and they already a few things available and I really don’t know where to start with.
Since I’m from myanmar I have very little choice for buying hardware.
i really do hope you two can help me out I do believe both brands are base on the similar system.
Just wanted to say thank you for providing this. I'm going to use it it for parts of https://github.com/rroller/dahua
Thanks again. I'll make sure to credit you in the code.Nice
Just wanted to say thank you for providing this. I'm going to use it it for parts of https://github.com/rroller/dahua
Thanks again. I'll make sure to credit you in the code.Nice project. I'm also a Home Assistant user and going to use it later. Thanks for your work. 👍
Hi both,
I’m just a beginner with very very little knowledge about programming, I’ve integrated everything I use with Home assistant but I’m stuck with Ezviz DB2C wireless chime, I’ll be happy to provide you the china open ezviz account access to get the api and they already a few things available and I really don’t know where to start with. Since I’m from myanmar I have very little choice for buying hardware.
i really do hope you two can help me out I do believe both brands are base on the similar system.
Just to let you know it took me weeks to get a debian 11 on my Pi4B, all I’m using is local media server for kids (it’s already done with some torrent downloading automation) and good useful homeassistant for my wife and kids to use on apple home kit. The door bell is the only one left for me.
Sorry @nicakp, I can't help you. I'm not familiar with these anymore, and life is too busy.
Hi,
do you have any specs about Dahua RPC2 protocol? Where may I find it?
Thanks in advance.