Skip to content

Instantly share code, notes, and snippets.

@hiway
Created January 29, 2020 16:40
Show Gist options
  • Save hiway/54f688f28035035013019f2fd5dfb490 to your computer and use it in GitHub Desktop.
Save hiway/54f688f28035035013019f2fd5dfb490 to your computer and use it in GitHub Desktop.
Fetch and save SMS from Tp-Link Archer MR600 4G Router using Selenium and Firefox to SQLite database.
#!/usr/bin/env python3
import datetime
import logging
import os
import peewee
import time
from collections import namedtuple
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from typing import Iterator
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------
# URL where your router responds with a login page
ROUTER_URL = os.getenv('ROUTER_URL', default='http://192.168.1.1/')
# Admin password for the router
ROUTER_PASSWORD = os.getenv('ROUTER_PASSWORD', default='admin')
# How many messages to get and store into db at a time
SMS_FETCH_LIMIT = int(os.getenv('SMS_FETCH_LIMIT', default=100))
# Debug mode enables verbose runtime information
DEBUG = True if os.getenv('DEBUG', 'NO').upper() in [
'TRUE', 'YES', '1'] else False
# Enable headless mode if DEBUG is False
HEADLESS = not DEBUG
# Set log level
log_level = logging.DEBUG if DEBUG else logging.INFO
log_level_text = 'DEBUG' if DEBUG else 'INFO'
logging.basicConfig(format='%(levelname)s:%(message)s', level=log_level)
# Seconds to wait for UI to refresh
DELAY = 3
# Path to Firefox app (default: '/usr/local/bin/firefox' on FreeBSD)
FIREFOX_BINARY = '/usr/local/bin/firefox'
# Store SMS in SQLite file (default: 'sms.db')
DATABASE_FILENAME = 'sms.db'
# ----------------------------------------------------------------------
# Data Structures and Models
# ----------------------------------------------------------------------
# A named tuple makes it easy to return multiple values:
Message = namedtuple('Message', ['sender', 'timestamp', 'text'])
# SQLite database object, used in Peewee model
db = peewee.SqliteDatabase(DATABASE_FILENAME)
# SQLite Table to Store SMS modeled using Peewee library
class SMS(peewee.Model):
sender = peewee.CharField()
timestamp = peewee.DateField(default=datetime.date.today)
text = peewee.CharField()
class Meta:
database = db
db_table = 'sms'
# Helper function to save SMS to database
def save_sms(msg) -> None:
logger.info('Saving SMS: {sender} [{timestamp}] {text}'.format(
sender=msg.sender,
timestamp=msg.timestamp,
text=msg.text,
))
sms = SMS.create(sender=msg.sender, timestamp=msg.timestamp, text=msg.text)
sms.save()
# ----------------------------------------------------------------------
# Browser Automation
# ----------------------------------------------------------------------
class MR600(object):
def __init__(self, delay=3, headless=HEADLESS, firefox_binary=FIREFOX_BINARY) -> None:
self.delay = delay
options = Options()
options.headless = headless
self.driver = webdriver.Firefox(firefox_binary=firefox_binary, options=options)
logger.info(
'Initialized MR600 browser automation '
'with delay={delay} seconds, '
'headless={headless} '
'and firefox_binary={firefox_binary!r}'.format(
delay=delay,
headless=headless,
firefox_binary=firefox_binary,
))
def login(self, router_url=ROUTER_URL, router_password=ROUTER_PASSWORD) -> None:
logger.info('Loading router login page at: {}'.format(router_url))
self.driver.get(router_url)
self.driver.find_element_by_id('pc-login-password').clear()
self.driver.find_element_by_id('pc-login-password').send_keys(router_password)
logger.info('Entered password, waiting for admin page to load.')
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "pc-login-btn"))
)
self.driver.find_element_by_id('pc-login-btn').click()
logger.info('Logged in to admin page, waiting for asynchronous page load.')
time.sleep(self.delay)
def click_element_by_id(self, id, require_clickable=True) -> None:
if require_clickable:
element = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.ID, id))
)
else:
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, id))
)
element.click()
def click_element_by_xpath(self, xpath) -> None:
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, xpath))
)
element.click()
def open_sms_page(self) -> None:
logger.info('Navigating to SMS page.')
self.click_element_by_id('map_icon_sms')
time.sleep(self.delay)
def open_sms_detail_page(self, index=1) -> None:
logger.info('Navigating to SMS detail page.')
self.click_element_by_id("msg_{}".format(index))
time.sleep(self.delay)
def open_sms_detail_next_page(self) -> None:
logger.info('Navigating to next SMS detail page.')
self.click_element_by_id('divNextBtn')
time.sleep(self.delay)
def get_sms(self) -> Message:
logger.info('Reading SMS details.')
msg_content = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "msgContent"))
)
text = msg_content.get_attribute('innerHTML')
timestamp = self.driver.find_element_by_id('recvTime').get_attribute('innerHTML')
sender = self.driver.find_element_by_id('phoneNumber').get_attribute('innerHTML')
if not(text or sender or timestamp):
logger.info('No more messages.')
raise IndexError('No more messages.')
return Message(sender=sender, timestamp=timestamp, text=text)
def logout(self) -> None:
logger.info('Logging out.')
self.click_element_by_id('topLogout')
time.sleep(self.delay)
self.click_element_by_xpath('//button[contains(@class, "btn-msg-ok") and contains(@class, "btn-confirm")]')
time.sleep(self.delay)
def close_browser(self) -> None:
logger.info('Closing browser.')
self.driver.quit()
# ----------------------------------------------------------------------
# Fetch unread messages (iterator)
# ----------------------------------------------------------------------
def unread_messages(router) -> Iterator[Message]:
router.login()
time.sleep(DELAY * 2)
router.open_sms_page()
try:
router.open_sms_detail_page()
except TimeoutException:
logger.info('No new messages.')
raise IndexError('No new messages.')
save_sms(router.get_sms())
for x in range(SMS_FETCH_LIMIT):
router.open_sms_detail_next_page()
try:
sms = router.get_sms()
yield sms
except IndexError:
break
router.logout()
router.close_browser()
# ----------------------------------------------------------------------
# Main Program
#
# (Import and use this script as a library for different behaviour)
# ----------------------------------------------------------------------
def main():
SMS.create_table()
router = MR600()
try:
for sms in unread_messages(router):
save_sms(sms)
except (KeyboardInterrupt, IndexError):
router.logout()
router.close_browser()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment