Skip to content

Instantly share code, notes, and snippets.

@ninenine
Last active January 14, 2018 20:22
Show Gist options
  • Save ninenine/acfacbd0f033b7280d71a192af120ed1 to your computer and use it in GitHub Desktop.
Save ninenine/acfacbd0f033b7280d71a192af120ed1 to your computer and use it in GitHub Desktop.
xkcd Async Downloader
import asyncio
import pathlib
import os
import logging
import aiohttp
import uvloop
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
IMG_URL = 'http://xkcd.com/{}/info.0.json'
LATEST_COMIC_URL = 'http://xkcd.com/info.0.json'
DOWNLOAD_DIR = './comics/'
async def produce(queue, n):
for i in range(1, n + 1):
logger.info('producing {}/{}'.format(i, n))
await queue.put(IMG_URL.format(i))
async def consume(queue, session):
while True:
try:
url = await queue.get()
#logger.info('consuming {}...'.format(url))
async with session.get(
url, timeout=10) as resp:
resp.raise_for_status()
data = await resp.json()
async with session.get(data['img']) as resp2:
comic = await resp2.read()
pathlib.Path(DOWNLOAD_DIR + data['year']).mkdir(
parents=True, exist_ok=True)
img_path = DOWNLOAD_DIR + data['year'] + "/" + data['img'].split(
'/')[4]
if not os.path.exists(img_path):
logger.info('Downloading {}...'.format(img_path))
with open(img_path, "wb") as f:
f.write(comic)
logger.info("{} done.".format(img_path))
else:
pass
#logger.info("Skipping...")
except Exception as e:
logger.error("%s", e)
finally:
queue.task_done()
async def get_latest(session):
logger.info('Getting latest...')
async with session.get(LATEST_COMIC_URL, timeout=10) as resp:
data = await resp.json()
return data['num']
async def run():
queue = asyncio.Queue()
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=1000)) as session:
consumers = [
asyncio.ensure_future(consume(queue, session)) for _ in range(50)
]
num = await get_latest(session)
await produce(queue, num)
await queue.join()
for consumer in consumers:
consumer.cancel()
logger.info("Download Completed \o/")
if __name__ == '__main__':
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
#loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment