Skip to content

Instantly share code, notes, and snippets.

@jtarang
Last active September 6, 2019 04:51
Show Gist options
  • Save jtarang/8b4fd94ec85905924ce3d723b1093e1c to your computer and use it in GitHub Desktop.
Save jtarang/8b4fd94ec85905924ce3d723b1093e1c to your computer and use it in GitHub Desktop.
Populate Cache on CDN for EpgReady
from aiohttp import ( ClientSession, TCPConnector)
from aiohttp.client_exceptions import ContentTypeError
from ssl import create_default_context
# allows us to customize the session with cert files
def custom_session(path_to_cert=None, custom_headers=None):
if path_to_cert:
try:
# with open | no need for importing `os` to do path.exists
with open(path_to_cert) as cert:
cert.read()
sslcontext = create_default_context(cafile=path_to_cert)
conn = TCPConnector(ssl_context=sslcontext)
except:
return 'Cert file not Found!'
else:
conn = TCPConnector(verify_ssl=False)
session = ClientSession(connector=conn, headers=custom_headers)
return session
class Requests:
def __init__(self, ca_cert=None, proxy='', headers=None, session=None):
self.proxy = proxy
self.cert = ca_cert
self.headers = headers
self.session = self.init_session()
def init_session(self):
return custom_session(path_to_cert=self.cert, custom_headers=self.headers)
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.session.close()
# takes in a request obj to extract the response | text or json
@staticmethod
async def get_response_from_request(request=None):
if request is not None:
try:
# https://docs.aiohttp.org/en/stable/client_advanced.html#disabling-content-type-validation-for-json-responses
# resp = await request.json(content_type='text/html; charset=utf-8')
resp = await request.json()
# ValueError thrown if no JSON decoded
except ContentTypeError:
resp = request.text
return resp
raise Exception('No request object passed!')
async def do_delete(self, url):
async with self.session as session:
async with session.delete(url, proxy=self.proxy) as response:
j_resp = await self.get_response_from_request(request=response)
return j_resp
async def do_get(self, url, data=None):
async with self.session as session:
async with session.get(url, proxy=self.proxy, params=data) as response:
j_resp = await self.get_response_from_request(request=response)
return j_resp
async def do_patch(self, url, data=None):
async with self.sesison as session:
async with session.patch(url, proxy=self.proxy, json=data) as response:
j_resp = await self.get_response_from_request(request=response)
return j_resp
async def do_post(self, url, data=None):
async with self.session as session:
async with session.post(url, proxy=self.proxy, json=data) as response:
j_resp = await self.get_response_from_request(request=response)
return j_resp
async def do_put(self, url, data=None):
async with self.session as session:
async with session.put(url, proxy=self.proxy, json=data) as response:
j_resp = await self.get_response_from_request(request=response)
return j_resp
79,1 All
from async_client import Requests
from syncer import sync
class CacheReactor:
def __init__(self):
self.host = "https://api.epgready.com"
async def get_countries(self):
async with Requests() as self.requests:
countries_dict = await self.requests.do_get(self.host + '/api/countries')
return countries_dict
async def populate_cache(self):
countries_dict = await self.get_countries()
for country in countries_dict['countries']:
async with Requests() as self.requests:
await self.requests.do_get(self.host + '/api/channel-mappings',
data=dict(country=country['country_value']))
return None
if __name__ == "__main__":
cache_reactor = CacheReactor()
sync(cache_reactor.populate_cache())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment