Skip to content

Instantly share code, notes, and snippets.

@ledil
Created November 26, 2020 17:06
Show Gist options
  • Save ledil/f799b513557e4593cbdcabc096a50d57 to your computer and use it in GitHub Desktop.
Save ledil/f799b513557e4593cbdcabc096a50d57 to your computer and use it in GitHub Desktop.
asyncio + django
def _fetchImageSizes(self, images: list):
TIMEOUT_CONNECT = 2
async def fetchImage(image: dict):
ctx = ssl.create_default_context()
ctx.set_ciphers('DEFAULT@SECLEVEL=0')
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
timeout = aiohttp.ClientTimeout(total=TIMEOUT_CONNECT)
async with aiohttp.ClientSession(timeout=timeout, connector=aiohttp.TCPConnector(ssl=ctx), headers={
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.110 Safari/537.36'}) as session:
async with session.get(image.get("src"), ssl=ctx) as response:
if response.status == 200:
image = await response.read()
img = Image.open(image)
print("fetched!")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [fetchImage(image) for image in images]
try:
loop.run_until_complete(asyncio.wait(tasks))
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment