Skip to content

Instantly share code, notes, and snippets.

@alastairparagas
Last active August 28, 2018 17:04
Show Gist options
  • Save alastairparagas/75ca6db8d92b93f17f0a38adfcc1e5b8 to your computer and use it in GitHub Desktop.
Save alastairparagas/75ca6db8d92b93f17f0a38adfcc1e5b8 to your computer and use it in GitHub Desktop.
Multithreading requests in Python
from concurrent.futures import ThreadPoolExecutor
from http.client import HTTPException
from urllib import request
from typing import Union, Dict, Any, List
def get_request_task(url: str) -> Union[List[Dict[str, Any]], None]:
try:
contents = None
with request.urlopen(url) as response:
contents = response.read()
return contents
except HTTPException:
return None
with ThreadPoolExecutor() as executor:
for result in executor.map(get_request_task, [
"https://jsonplaceholder.typicode.com/posts",
"https://jsonplaceholder.typicode.com/comments",
"https://jsonplaceholder.typicode.com/albums"
]):
if result is None:
print("Something terrible has happened!")
else:
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment