Skip to content

Instantly share code, notes, and snippets.

@dmckean
Created April 8, 2016 05:47
Show Gist options
  • Save dmckean/f66ad16f90c2ef139f2bb0e52ac936a3 to your computer and use it in GitHub Desktop.
Save dmckean/f66ad16f90c2ef139f2bb0e52ac936a3 to your computer and use it in GitHub Desktop.
A simple HTTP server for downloading MOBI files from Instapaper, suitable for use from a Kindle browser.
#! /usr/bin/env python
"""
A simple HTTP server for downloading MOBI files from Instapaper
In addition to username, password, a list of folder names and ids
must be provided to access Instapaper folders. These IDs may be
found in instapaper folder URLs, e.g.
https://www.instapaper.com/u/folder/(id)/(name)
Leave the id to `None` to reach the home Instapaper folder
USAGE: kinstapaper.py [-h] USERNAME PASSWORD ...
"""
import os
import time
from datetime import timedelta, datetime
import BaseHTTPServer
import SimpleHTTPServer
import threading
import logging
import argparse
import requests
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
class InstaQueue(object):
def __init__(self, username, password, folders):
self.cookie_time = None
self.session = requests.Session()
self.folder_queue = list()
self.timer = 5
self.auth_data = {
"username" : username,
"password" : password
}
self.folders = folders
@property
def queue_not_empty(self):
return len(self.folder_queue) > 0
@property
def sorted_folder_items(self):
return sorted(self.folders.iteritems(), key=lambda(k,v): v.get("rank", k))
def update_index(self):
now = time.time()
def delta_str(then):
mod_date = datetime(1,1,1) + timedelta(seconds=now - then)
if mod_date.day > 1:
result = "%sd" % mod_date.day-1
if mod_date.hour > 0:
result = "%sh" % mod_date.hour
if mod_date.minute > 0:
result = "%sm" % mod_date.minute
else:
result = "%ds" % mod_date.second
return "%s ago" % result
output = "<ul>"
for folder, values in self.sorted_folder_items:
title = values.get("title", folder)
if folder in self.folder_queue:
output += "<li>%s - Currently updating</li>" % title
elif os.path.exists("%s.mobi" % folder):
mod_time = os.path.getmtime("%s.mobi" % folder)
mod_time_str = delta_str(mod_time)
output += "<li><a href='%s.mobi'>%s</a> - <a href='./?update=true&folder=%s'>%s</a></li>" % (folder, title, folder, mod_time_str)
else:
output += "<li>%s - <a href='./?update=true&folder=%s'>Download</a></li>" % (title, folder)
output += "</ul>"
reload_meta = "<meta http-equiv='Refresh' content='%s'>" % self.timer if self.queue_not_empty else ""
index = open("index.html", "w")
index.write(("<html><head><style>* { font-size: 50px; } h1 { font-size: 80px; } a { font-size: inherit; }</style>" +
"<title>Instapaper Folders</title>%(meta)s</head>" +
"<body><h1><a href='.'>Instapaper Folders</a></h1>%(body)s" +
"<p><a href='./?updateAll=true'>Update all</a></p>" +
"</body></html>") % { "meta": reload_meta, "body": output})
def update_cookie(self):
r = self.session.post("https://www.instapaper.com/user/login" , data=self.auth_data)
self.cookie_time = time.time()
def update_mobi(self, name):
logging.debug("Updating: %s" % name)
folder = name.lstrip("./")
if folder not in self.folders.keys():
logging.debug("%s not found" % folder)
return
if self.cookie_time is None:
self.update_cookie()
elif (time.time() - self.cookie_time) > 3600:
self.update_cookie()
folder_url = "https://www.instapaper.com/mobi"
folder_id = self.folders[folder].get("id")
folder_id = None if folder_id == "None" else folder_id
if folder_id:
folder_url += "/%s" % folder_id
folder_response = self.session.get(folder_url)
if folder_response.status_code == 200:
with open("%s.mobi" % folder, "wb") as fd:
for chunk in folder_response:
fd.write(chunk)
logging.debug("%s updated" % name)
else:
logging.debug("%s could not be downloaded" % name)
def push_folder(self, folder):
if folder not in self.folder_queue:
self.folder_queue.append(folder)
self.update_index()
logging.debug("%s added to queue" % folder)
else:
logging.debug("%s is already in the queue" % folder)
def pop_folder(self):
if self.queue_not_empty:
folder = self.folder_queue[0]
self.update_mobi(folder)
self.timer += 5 if self.timer < 30 else 0
self.folder_queue.pop(0)
self.update_index()
else:
self.timer = 5
def watch_queue(self):
logging.debug("Watching download queue")
while True:
self.pop_folder()
time.sleep(5)
instaqueue = None
class InstaRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def send_head(self):
instaqueue.update_index()
p = self.path.split("?",1)
self.path = p[0]
if len(p) == 2:
kwargs = dict()
args = p[1].split("&")
for arg in args:
k,v = arg.split("=",1)
kwargs[k]=v
if kwargs.get("update") == "true":
folder = kwargs.get("folder")
instaqueue.push_folder(folder)
elif kwargs.get("updateAll") == "true":
logging.debug("Updating all")
for folder, value in instaqueue.sorted_folder_items:
instaqueue.push_folder(folder)
self.send_response(301)
self.send_header('Location','./')
self.end_headers()
return
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
def server(port, address, protocol):
server_address = (address, port)
InstaRequestHandler.protocol_version = protocol
httpd = BaseHTTPServer.HTTPServer(server_address, InstaRequestHandler)
logging.debug("Starting HTTP server")
httpd.serve_forever()
def test(username, password, *args, **kwargs):
global instaqueue
#
# List the folders you want to download, only a "name" and {"id": "1234567"} k-v pair are required
# e.g. folders = {
# "name": { "rank": 1, "id": 1234567 },
# "home": { "rank": 2, "id": None, title: "Instapaper Home" }
# }
# name: the filename assigned to the mobi file that will be download from Instapaper
# rank: specify the preferred order for the folders
# id: (optional) this can be found in the address bar's URL when viewing the folder on the web
# e.g. https://www.instapaper.com/u/folder/1234567/world-news
# The ID is ususally a 7-digit number
# Enter `None` to specify the primary inbox
# title: (optional) Name visible from the browser
#
folder_dict = {
"home": {"rank": 1, "id": None, "title": "Home"},
"news": {"rank": 2, "id": "1234567", "title": "World News"},
"editorial": {"rank": 3, "id": "1234568", "title": "Editorial"}
}
folder_list = dict()
if kwargs.get("folders"):
with kwargs.get("folders") as f:
for l in f:
cols = l.rstrip().split("\t")
folder_list[cols[0]] = dict({"id": cols[1]})
if len(cols) > 2:
folder_list[cols[0]]["rank"] = cols[2]
if len(cols) > 3:
folder_list[cols[0]]["title"] = cols[3]
else:
folder_list = folder_dict
instaqueue = InstaQueue(username, password, folder_list)
instaqueue.update_index()
server_thread = threading.Thread(name="InstaServer", target=server,
args=(kwargs["port"], kwargs["address"], kwargs["protocol"]))
server_thread.daemon = True
server_thread.start()
instaqueue.watch_queue()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("username", type=str, help="Instapaper Account username (email)")
parser.add_argument("password", type=str, help="Instapaper Account password")
parser.add_argument("-p", "--port", dest="port", type=int, help="Server Port (default: %(default)s)", default=8000)
parser.add_argument("--address", dest="address", type=str, help="Server Address (default: '%(default)s')", default="")
parser.add_argument("--protocol", dest="protocol", type=str, help="Server Protocol (default: '%(default)s')", default="HTTP/1.0")
parser.add_argument("-f", "--folder-list", dest="folders", type=argparse.FileType("r"),
help="A tab-delimited file specifying a list of folders, using the fields 'name', 'id', 'rank' (optional), 'title' (optional)" +
" e.g. home None 2 Instapaper Home")
args = parser.parse_args()
test(args.username, args.password, port=args.port, address=args.address,
protocol=args.protocol, folders=args.folders)
@dmckean
Copy link
Author

dmckean commented Apr 8, 2016

screenshot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment