Created
November 21, 2022 08:49
-
-
Save duketwo/f445de3a7d23b93e06efdbde3b91f437 to your computer and use it in GitHub Desktop.
OpenWRT 4G LTE WAN connection failover handling python Wireguard VPN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ifaddr | |
import ipaddress | |
import logging | |
import threading | |
import time | |
import platform | |
import subprocess | |
# 1x Wireguard service running on 100.100.100.100 | |
# One DSL-Line and one LTE/4G-Line and want automatically swap between both while keeping current connections ACTIVE | |
# Set a static route to 100.100.100.100 | |
# Set the default route via the wireguard device | |
# Set a static route to 1.1.1.1 via the DSL connection | |
# Set a static route to 1.1.1.2 via the LTE connection | |
# Ping both 1.1.1.1/1.1.1.2 constantly to measure connection uptimes | |
# Change the device of route to 100.100.100.100 depending on which connection is available | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.FileHandler("debug.log"), | |
logging.StreamHandler() | |
] | |
) | |
route_1_down = 0 | |
route_2_down = 0 | |
gateway = None | |
ip = None | |
networkPrefix = None | |
network = None | |
prev_state_log = None | |
ticks = 0 | |
def get_ip(name): | |
adapters = ifaddr.get_adapters() | |
for adapter in adapters: | |
if adapter.nice_name != name: | |
continue | |
for ip in adapter.ips: | |
try: | |
addr = ipaddress.IPv4Address(ip.ip) | |
return addr | |
except: | |
continue | |
return None | |
def has_interface_an_ip_address(name): | |
return get_ip(name) is not None | |
def get_network_prefix(name): | |
adapters = ifaddr.get_adapters() | |
for adapter in adapters: | |
if adapter.nice_name != name: | |
continue | |
for ip in adapter.ips: | |
try: | |
addr = ipaddress.IPv4Address(ip.ip) | |
return ip.network_prefix | |
except: | |
continue | |
return None | |
def ping(host): | |
command = ['fping', '-c', '1', '-t', '1200', host] | |
return subprocess.call(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) == 0 | |
def thread_function(ip, id): | |
logging.info(f'Thread Id [{id}] Ip [{ip}]: starting') | |
i = 0 | |
past_three_pings = [None] * 3 | |
while True: | |
global route_1_down | |
global route_2_down | |
p = ping(ip) | |
past_three_pings[i] = p | |
if id == 1: | |
time.sleep(1) | |
else: | |
time.sleep(10) | |
i = (i + 1) % 3 | |
if past_three_pings[0] == False and past_three_pings[1] == False and past_three_pings[2] == False: | |
#logging.info(f'Host [{ip}] has failed three pings in a row.') | |
if id == 1: | |
route_1_down = 1 | |
if id == 2: | |
route_2_down = 1 | |
else: | |
if id == 1: | |
route_1_down = 0 | |
if id == 2: | |
route_2_down = 0 | |
logging.info("Thread %s: finishing", ip) | |
def ensure_routes(): | |
output = subprocess.getoutput('ip route') | |
a = f'1.1.1.2 via {gateway} dev wwan0' | |
if a not in output: | |
logging.info(f'Adding route to 1.1.1.2 via {gateway} dev wwan0') | |
subprocess.getoutput(f'ip route del {a}') | |
subprocess.getoutput(f'ip route add {a}') | |
b = f'1.1.1.1 via 192.168.10.1 dev br-WAN' | |
if b not in output: | |
logging.info(f'Adding route to 1.1.1.1 via 192.168.10.1 dev wwan0') | |
subprocess.getoutput(f'ip route del {b}') | |
subprocess.getoutput(f'ip route add {b}') | |
return | |
def enable_route_wwan(): | |
subprocess.getoutput(f'ip route del 100.100.100.100 && ip route add 100.100.100.100 via {gateway} dev wwan0') | |
return | |
def enable_route_dsl(): | |
subprocess.getoutput('ip route del 100.100.100.100 && ip route add 100.100.100.100 via 192.168.10.1 dev br-WAN') | |
return | |
def is_route_dsl_active(): | |
output = subprocess.getoutput('ip route') | |
return f'100.100.100.100 via 192.168.10.1' in output | |
logging.info("Adding 1.1.1.1 thread.") | |
h1 = threading.Thread(target=thread_function, args=('1.1.1.1',1)) | |
h1.start() | |
logging.info("Adding 1.1.1.2 thread.") | |
h2 = threading.Thread(target=thread_function, args=('1.1.1.2',2)) | |
h2.start() | |
def reset_mobile_device(): | |
logging.info(f'Resetting the mobile device.') | |
subprocess.getoutput('ifdown 4G && sleep 1 && echo -ne "AT^NDISDUP=1,0\r\n" > /dev/ttyUSB1 && sleep 1 && echo -ne "AT^NDISDUP=1,1,\"internet\"\r\n" > /dev/ttyUSB1 && sleep 1 && ifup 4G') #change device '4G' name + apn 'internet' | |
return | |
while True: | |
ip = get_ip("wwan0") | |
if ip is not None: | |
networkPrefix = get_network_prefix("wwan0") | |
network = ipaddress.IPv4Network(f'{ip}/{networkPrefix}', strict=False) | |
gateway = network[network.num_addresses-2] | |
ensure_routes() | |
if not route_1_down or not route_2_down: | |
if route_1_down and is_route_dsl_active() and not route_2_down: | |
logging.info(f'Enabling mobile route.') | |
enable_route_wwan() | |
if not route_1_down and not is_route_dsl_active(): | |
logging.info(f'Enabling DSL route.') | |
enable_route_dsl() | |
current_state_log = f'Route 1.1.1.1 down [{route_1_down}] Route 1.1.1.2 down [{route_2_down}] Is DSL route active [{is_route_dsl_active()}]' | |
if current_state_log != prev_state_log: | |
logging.info(current_state_log) | |
prev_state_log = current_state_log | |
#print(ping('1.1.1.2')) | |
else: | |
if not is_route_dsl_active: | |
logging.info(f'Device wwan0 has no ip address, but the default DSL route is not active. Activating the default DSL route.') | |
enable_route_dsl() | |
if route_2_down and ticks % 60 == 0: | |
reset_mobile_device() | |
time.sleep(1) | |
ticks = ticks + 1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment