Last active
March 29, 2023 12:47
-
-
Save Caligatio/b975005f26248575aba4709d093cfe4f to your computer and use it in GitHub Desktop.
Dynamic DNS script to detect and update A, AAAA, and HTTPS (with ipv4hint and ipv6hint) records at Cloudflare
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
dyndns_cloudflare | |
This is a Python 3.8+ script that will attempt to identify a computer's external IPv4 and "static" (non RFC 4941) IPv6 | |
addresses to then update Cloudflare DNS entries. It currently supports A, AAAA, and the new HTTPS DNS record types. For | |
HTTPS records, it will embed the "ipv4hints" and "ipv6hints" values into the record along with the ALPN specified in the | |
config. It has optional support for saving the IPv4 and IPv6 addresses from the last run of the program to avoid | |
unnecessary updates. | |
**Note that it only updates DNS records: the records must already exist and it will not delete any records. If you add more | |
records to the config, make sure you delete the "last_run" file as there is no logic to detect unchanging IPs but a new | |
config.** | |
It was written by Brian Turek (https://github.com/Caligatio) and released under the Unlicense (https://unlicense.org/). | |
""" | |
import argparse | |
import ipaddress | |
import logging | |
import pathlib | |
import subprocess | |
import sys | |
from typing import Dict, Final, Iterable, Literal, Optional | |
import xml.etree.ElementTree as ET | |
from yaml import load, dump | |
try: | |
from yaml import CLoader as Loader, CDumper as Dumper | |
except ImportError: | |
from yaml import Loader, Dumper | |
import requests | |
logger = logging.getLogger(__name__) | |
def get_ipv4_by_fritz_upnp(hostname: str = "fritz.box", port: int = 49000) -> Optional[ipaddress.IPv4Address]: | |
# This is some black magic copied from https://wiki.ubuntuusers.de/FritzBox/Skripte/ | |
try: | |
req = requests.post( | |
f"http://{hostname}:{port}/igdupnp/control/WANIPConn1", | |
headers={ | |
"Content-Type": 'text/xml; charset="utf-8"', | |
"SoapAction": "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress", | |
}, | |
data=b""" | |
<?xml version='1.0' encoding='utf-8'?> | |
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'> | |
<s:Body> | |
<u:GetExternalIPAddress xmlns:u='urn:schemas-upnp-org:service:WANIPConnection:1' /> | |
</s:Body> | |
</s:Envelope> | |
""", | |
) | |
req.raise_for_status() | |
except requests.HTTPError: | |
logger.debug("Failed to find IPv4 address via Fritz UPnP") | |
return None | |
root = ET.fromstring(req.content) | |
addr = ipaddress.IPv4Address(root.findall(".//NewExternalIPAddress")[0].text) | |
logger.debug("Found IPv4 address via Fritz UPnP") | |
return addr | |
def get_ipv4_by_ipify() -> Optional[ipaddress.IPv4Address]: | |
try: | |
req = requests.get("http://api.ipify.org") | |
req.raise_for_status() | |
except requests.HTTPError: | |
logger.debug("Failed to find IPv4 address via ipify") | |
return None | |
addr = ipaddress.IPv4Address(req.content.decode()) | |
logger.debug("Found IPv4 address via ipify") | |
return addr | |
def get_static_ipv6(iface: str) -> Optional[ipaddress.IPv6Address]: | |
cmd = subprocess.run(["/sbin/ip", "addr", "show", "dev", iface], capture_output=True, encoding="utf-8") | |
for line in cmd.stdout.splitlines(): | |
if "scope global" in line and "mngtmpaddr" in line and "deprecated" not in line: | |
parts = line.split() | |
addr_part, _ = parts[1].split("/") | |
addr = ipaddress.IPv6Address(addr_part) | |
logger.debug("Found IPv6 address via static interface") | |
return addr | |
logger.debug("Failed to find IPv6 address via static interface") | |
return None | |
def update_cloudflare_a_aaaa_record( | |
api_token: str, | |
zone_id: str, | |
record_id: str, | |
record_type: Literal["A", "AAAA"], | |
name: str, | |
content: str, | |
proxied: bool, | |
ttl: int = 1, | |
) -> None: | |
logger.info( | |
"Setting %s record for %s to %s with ttl %d and proxy flag set to %s", record_type, name, content, ttl, proxied | |
) | |
req = requests.put( | |
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}", | |
headers={ | |
"Authorization": f"Bearer {api_token}", | |
"Content-Type": "application/json", | |
}, | |
json={"type": record_type, "name": name, "content": content, "ttl": ttl, "proxied": proxied}, | |
) | |
req.raise_for_status() | |
def update_cloudflare_https_record( | |
api_token: str, | |
zone_id: str, | |
record_id: str, | |
name: str, | |
target: str, | |
value: str, | |
priority: int, | |
ttl: int = 1, | |
) -> None: | |
logger.info( | |
"Setting HTTPS record for %s to %s with value '%s', ttl %d, and priority %d", name, target, value, ttl, priority | |
) | |
req = requests.put( | |
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}", | |
headers={ | |
"Authorization": f"Bearer {api_token}", | |
"Content-Type": "application/json", | |
}, | |
json={ | |
"type": "HTTPS", | |
"name": name, | |
"ttl": ttl, | |
"data": {"target": target, "value": value, "priority": priority}, | |
}, | |
) | |
req.raise_for_status() | |
def main( | |
api_token: str, | |
zone_id: str, | |
ipv4_methods: Iterable[Dict], | |
ipv6_methods: Iterable[Dict], | |
a_records: Iterable[Dict], | |
aaaa_records: Iterable[Dict], | |
https_records: Iterable[Dict], | |
last_run: Optional[pathlib.Path] = None, | |
) -> int: | |
METHODS_MAPPING: Final = { | |
"fritz_upnp": get_ipv4_by_fritz_upnp, | |
"ipify": get_ipv4_by_ipify, | |
"static_iface": get_static_ipv6, | |
} | |
last_ips = {"ipv4": None, "ipv6": None} | |
if last_run: | |
try: | |
with last_run.open("r") as f_in: | |
data = load(f_in, Loader=Loader) | |
last_ips = { | |
"ipv4": ipaddress.IPv4Address(data["ipv4"]) if data["ipv4"] else None, | |
"ipv6": ipaddress.IPv6Address(data["ipv6"]) if data["ipv6"] else None, | |
} | |
logger.info("Last ran found IPv4 of %s and IPv6 of %s", last_ips["ipv4"], last_ips["ipv6"]) | |
except FileNotFoundError: | |
pass | |
if not any([ipv4_methods, ipv6_methods]): | |
logger.critial("At least one of ipv4_methods or ipv6_methods must be defined") | |
return 1 | |
if not any([a_records, aaaa_records, https_records]): | |
logger.critical("At least one of A, AAAA, or HTTPS records must be defined") | |
return 2 | |
addrs = {"ipv4": None, "ipv6": None} | |
for addr_type, methods in (("ipv4", ipv4_methods), ("ipv6", ipv6_methods)): | |
if methods: | |
for method in methods: | |
method_name = method.pop("name") | |
addr = METHODS_MAPPING[method_name](**method) | |
if addr: | |
addrs[addr_type] = addr | |
logger.info("Found %s address of %s", addr_type, addr) | |
break | |
else: | |
logger.critical("All %s address methods failed", addr_type) | |
return 3 | |
if addrs["ipv4"] and last_ips["ipv4"] != addrs["ipv4"]: | |
logger.info("IPv4 address changed!") | |
for record in a_records: | |
update_cloudflare_a_aaaa_record( | |
api_token, | |
zone_id, | |
record["id"], | |
"A", | |
record["name"], | |
str(addrs["ipv4"]), | |
record["proxied"], | |
record.get("ttl", 1), | |
) | |
else: | |
logger.info("No IPv4 change") | |
if addrs["ipv6"] and last_ips["ipv6"] != addrs["ipv6"]: | |
logger.info("IPv6 address changed!") | |
for record in aaaa_records: | |
update_cloudflare_a_aaaa_record( | |
api_token, | |
zone_id, | |
record["id"], | |
"AAAA", | |
record["name"], | |
str(addrs["ipv6"]), | |
record["proxied"], | |
record.get("ttl", 1), | |
) | |
else: | |
logger.info("No IPv6 change") | |
if last_ips["ipv4"] != addrs["ipv4"] or last_ips["ipv6"] != addrs["ipv6"]: | |
for record in https_records: | |
contents = [f'alpn="{record["alpn"]}"'] | |
for addr_type, addr in addrs.items(): | |
if addr: | |
contents.append(f'{addr_type}hint="{addr}"') | |
update_cloudflare_https_record( | |
api_token, | |
zone_id, | |
record["id"], | |
record["name"], | |
record["target"], | |
" ".join(contents), | |
record.get("priority", 1), | |
record.get("ttl", 1), | |
) | |
if last_run: | |
with last_run.open("w") as f_out: | |
f_out.write(dump({"ipv4": str(addrs["ipv4"]), "ipv6": str(addrs["ipv6"])}, Dumper=Dumper)) | |
return 0 | |
def use_config(config_file: pathlib.Path) -> int: | |
with config_file.open("rb") as f_in: | |
config = load(f_in, Loader=Loader) | |
last_run = config.get("last_run") | |
if last_run: | |
last_run = pathlib.Path(last_run) | |
return main( | |
config["api_token"], | |
config["zone_id"], | |
config.get("ipv4_methods", []), | |
config.get("ipv6_methods", []), | |
config["records"].get("A", []), | |
config["records"].get("AAAA", []), | |
config["records"].get("HTTPS", []), | |
last_run, | |
) | |
def cli() -> None: | |
parser = argparse.ArgumentParser(description="Updates dynamic DNS entries at Cloudflare") | |
parser.add_argument("--config", required=True, type=pathlib.Path, help="Path to configuration file") | |
parser.add_argument( | |
"--logging", | |
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | |
default="INFO", | |
help="Desired logging level", | |
) | |
args = parser.parse_args() | |
log_level = getattr(logging, args.logging) | |
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s") | |
logging.getLogger("urllib3").setLevel(max(log_level, logging.INFO)) | |
sys.exit(use_config(args.config)) | |
if __name__ == "__main__": | |
cli() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
api_token: REDACTED_API_TOKEN | |
zone_id: REDACTED_ZONE_ID | |
ipv4_methods: | |
- name: fritz_upnp | |
hostname: 192.168.178.1 | |
- type: ipify | |
ipv6_methods: | |
- name: static_iface | |
iface: enp1s0 | |
last_run: /var/run/dyndns_cloudflare | |
records: | |
A: | |
- name: srv.example.com | |
id: REDACTED_RECORD_ID | |
proxied: false | |
AAAA: | |
- name: srv.example.com | |
id: REDACTED_RECORD_ID | |
proxied: false | |
HTTPS: | |
- name: test1.example.com | |
id: REDACTED_RECORD_ID | |
target: srv.example.com | |
alpn: h3,h2 | |
- name: test2.example.com | |
id: REDACTED_RECORD_ID | |
target: srv.example.com | |
alpn: h3,h2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment