Skip to content

Instantly share code, notes, and snippets.

@lyc8503
Last active November 3, 2023 10:28
Show Gist options
  • Save lyc8503/a90141f5965b516a8e7c3e17a233ca5e to your computer and use it in GitHub Desktop.
Save lyc8503/a90141f5965b516a8e7c3e17a233ca5e to your computer and use it in GitHub Desktop.
阿里云 DDNS Python
#!/usr/bin/python3
import json
from hashlib import sha1
import hmac
import base64
import datetime
import urllib.parse
from urllib.error import HTTPError
from urllib.request import urlopen
import os
ACCESS_KEY = "XXX"
ACCESS_SECRET = "XXX"
DOMAIN = "qwe.example.com"
# This method may need changes
def get_ip():
import json
info = json.loads(os.popen("ubus call network.interface.wan_6 status").read().strip())
return info['ipv6-address'][0]['address']
def push_msg(msg):
print("Push: " + msg)
# Implement your notification push here
def sign(s):
hashed = hmac.new((ACCESS_SECRET + "&").encode("ascii"), s.encode("ascii"), sha1)
return base64.b64encode(hashed.digest()).decode("ascii")
def send_requests(params):
t = datetime.datetime.utcnow().isoformat()
params['AccessKeyId'] = ACCESS_KEY
params['Format'] = 'json'
params['SignatureMethod'] = 'HMAC-SHA1'
params['SignatureNonce'] = t
params['SignatureVersion'] = '1.0'
params['Timestamp'] = t
params['Version'] = '2015-01-09'
params['Signature'] = sign("GET&%2F&" + urllib.parse.quote("&".join(
map(lambda x: x[0] + "=" + urllib.parse.quote(str(x[1])), sorted(params.items(), key=lambda x: x[0])))))
print(urllib.parse.urlencode(params))
try:
with urlopen("https://alidns.aliyuncs.com/?" + urllib.parse.urlencode(params)) as resp:
return json.loads(resp.read().decode("utf-8"))
except HTTPError as e:
return json.loads(e.read().decode("utf-8"))
def UpdateDomainRecord(rec_id, rr, ttl, type_, value):
return send_requests({
"Action": "UpdateDomainRecord",
"RecordId": rec_id,
"RR": rr,
"TTL": ttl,
"Type": type_,
"Value": value
})
def DescribeSubDomainRecords(sub_domain):
return send_requests({"SubDomain": sub_domain, "Action": "DescribeSubDomainRecords"})
if __name__ == '__main__':
records = DescribeSubDomainRecords(DOMAIN)
if records['TotalCount'] != 1:
raise Exception("TotalCount != 1, please make sure there's EXACTLY one record.")
record = records['DomainRecords']['Record'][0]
ip = get_ip()
print("Local ip: " + ip)
print("Remote ip: " + record['Value'])
if record['Value'] == ip:
raise Exception("No ip change, pass.")
result = UpdateDomainRecord(record['RecordId'], record['RR'], record['TTL'], record['Type'], ip)
if 'RecordId' in result and result['RecordId'] == record['RecordId']:
push_msg(f"[AliDDNS] 域名 {DOMAIN} IP 更新成功: {ip}")
else:
push_msg(f"[AliDDNS] 域名 {DOMAIN} IP 更新**失败**: {result}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment