Created
January 29, 2020 16:40
-
-
Save hiway/54f688f28035035013019f2fd5dfb490 to your computer and use it in GitHub Desktop.
Fetch and save SMS from Tp-Link Archer MR600 4G Router using Selenium and Firefox to SQLite database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import datetime | |
import logging | |
import os | |
import peewee | |
import time | |
from collections import namedtuple | |
from selenium import webdriver | |
from selenium.common.exceptions import TimeoutException | |
from selenium.webdriver.firefox.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from typing import Iterator | |
logger = logging.getLogger(__name__) | |
# ---------------------------------------------------------------------- | |
# Configuration | |
# ---------------------------------------------------------------------- | |
# URL where your router responds with a login page | |
ROUTER_URL = os.getenv('ROUTER_URL', default='http://192.168.1.1/') | |
# Admin password for the router | |
ROUTER_PASSWORD = os.getenv('ROUTER_PASSWORD', default='admin') | |
# How many messages to get and store into db at a time | |
SMS_FETCH_LIMIT = int(os.getenv('SMS_FETCH_LIMIT', default=100)) | |
# Debug mode enables verbose runtime information | |
DEBUG = True if os.getenv('DEBUG', 'NO').upper() in [ | |
'TRUE', 'YES', '1'] else False | |
# Enable headless mode if DEBUG is False | |
HEADLESS = not DEBUG | |
# Set log level | |
log_level = logging.DEBUG if DEBUG else logging.INFO | |
log_level_text = 'DEBUG' if DEBUG else 'INFO' | |
logging.basicConfig(format='%(levelname)s:%(message)s', level=log_level) | |
# Seconds to wait for UI to refresh | |
DELAY = 3 | |
# Path to Firefox app (default: '/usr/local/bin/firefox' on FreeBSD) | |
FIREFOX_BINARY = '/usr/local/bin/firefox' | |
# Store SMS in SQLite file (default: 'sms.db') | |
DATABASE_FILENAME = 'sms.db' | |
# ---------------------------------------------------------------------- | |
# Data Structures and Models | |
# ---------------------------------------------------------------------- | |
# A named tuple makes it easy to return multiple values: | |
Message = namedtuple('Message', ['sender', 'timestamp', 'text']) | |
# SQLite database object, used in Peewee model | |
db = peewee.SqliteDatabase(DATABASE_FILENAME) | |
# SQLite Table to Store SMS modeled using Peewee library | |
class SMS(peewee.Model): | |
sender = peewee.CharField() | |
timestamp = peewee.DateField(default=datetime.date.today) | |
text = peewee.CharField() | |
class Meta: | |
database = db | |
db_table = 'sms' | |
# Helper function to save SMS to database | |
def save_sms(msg) -> None: | |
logger.info('Saving SMS: {sender} [{timestamp}] {text}'.format( | |
sender=msg.sender, | |
timestamp=msg.timestamp, | |
text=msg.text, | |
)) | |
sms = SMS.create(sender=msg.sender, timestamp=msg.timestamp, text=msg.text) | |
sms.save() | |
# ---------------------------------------------------------------------- | |
# Browser Automation | |
# ---------------------------------------------------------------------- | |
class MR600(object): | |
def __init__(self, delay=3, headless=HEADLESS, firefox_binary=FIREFOX_BINARY) -> None: | |
self.delay = delay | |
options = Options() | |
options.headless = headless | |
self.driver = webdriver.Firefox(firefox_binary=firefox_binary, options=options) | |
logger.info( | |
'Initialized MR600 browser automation ' | |
'with delay={delay} seconds, ' | |
'headless={headless} ' | |
'and firefox_binary={firefox_binary!r}'.format( | |
delay=delay, | |
headless=headless, | |
firefox_binary=firefox_binary, | |
)) | |
def login(self, router_url=ROUTER_URL, router_password=ROUTER_PASSWORD) -> None: | |
logger.info('Loading router login page at: {}'.format(router_url)) | |
self.driver.get(router_url) | |
self.driver.find_element_by_id('pc-login-password').clear() | |
self.driver.find_element_by_id('pc-login-password').send_keys(router_password) | |
logger.info('Entered password, waiting for admin page to load.') | |
WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.ID, "pc-login-btn")) | |
) | |
self.driver.find_element_by_id('pc-login-btn').click() | |
logger.info('Logged in to admin page, waiting for asynchronous page load.') | |
time.sleep(self.delay) | |
def click_element_by_id(self, id, require_clickable=True) -> None: | |
if require_clickable: | |
element = WebDriverWait(self.driver, 10).until( | |
EC.element_to_be_clickable((By.ID, id)) | |
) | |
else: | |
element = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.ID, id)) | |
) | |
element.click() | |
def click_element_by_xpath(self, xpath) -> None: | |
element = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.XPATH, xpath)) | |
) | |
element.click() | |
def open_sms_page(self) -> None: | |
logger.info('Navigating to SMS page.') | |
self.click_element_by_id('map_icon_sms') | |
time.sleep(self.delay) | |
def open_sms_detail_page(self, index=1) -> None: | |
logger.info('Navigating to SMS detail page.') | |
self.click_element_by_id("msg_{}".format(index)) | |
time.sleep(self.delay) | |
def open_sms_detail_next_page(self) -> None: | |
logger.info('Navigating to next SMS detail page.') | |
self.click_element_by_id('divNextBtn') | |
time.sleep(self.delay) | |
def get_sms(self) -> Message: | |
logger.info('Reading SMS details.') | |
msg_content = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.ID, "msgContent")) | |
) | |
text = msg_content.get_attribute('innerHTML') | |
timestamp = self.driver.find_element_by_id('recvTime').get_attribute('innerHTML') | |
sender = self.driver.find_element_by_id('phoneNumber').get_attribute('innerHTML') | |
if not(text or sender or timestamp): | |
logger.info('No more messages.') | |
raise IndexError('No more messages.') | |
return Message(sender=sender, timestamp=timestamp, text=text) | |
def logout(self) -> None: | |
logger.info('Logging out.') | |
self.click_element_by_id('topLogout') | |
time.sleep(self.delay) | |
self.click_element_by_xpath('//button[contains(@class, "btn-msg-ok") and contains(@class, "btn-confirm")]') | |
time.sleep(self.delay) | |
def close_browser(self) -> None: | |
logger.info('Closing browser.') | |
self.driver.quit() | |
# ---------------------------------------------------------------------- | |
# Fetch unread messages (iterator) | |
# ---------------------------------------------------------------------- | |
def unread_messages(router) -> Iterator[Message]: | |
router.login() | |
time.sleep(DELAY * 2) | |
router.open_sms_page() | |
try: | |
router.open_sms_detail_page() | |
except TimeoutException: | |
logger.info('No new messages.') | |
raise IndexError('No new messages.') | |
save_sms(router.get_sms()) | |
for x in range(SMS_FETCH_LIMIT): | |
router.open_sms_detail_next_page() | |
try: | |
sms = router.get_sms() | |
yield sms | |
except IndexError: | |
break | |
router.logout() | |
router.close_browser() | |
# ---------------------------------------------------------------------- | |
# Main Program | |
# | |
# (Import and use this script as a library for different behaviour) | |
# ---------------------------------------------------------------------- | |
def main(): | |
SMS.create_table() | |
router = MR600() | |
try: | |
for sms in unread_messages(router): | |
save_sms(sms) | |
except (KeyboardInterrupt, IndexError): | |
router.logout() | |
router.close_browser() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment