Created
September 3, 2021 08:24
-
-
Save datajoely/f4d026dfcea3ad4b1fd9e44a30da56fd to your computer and use it in GitHub Desktop.
Expiring HTTP dataset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module provides custom Kedro dataset | |
""" | |
import hashlib | |
import json | |
import logging | |
from pathlib import Path | |
from typing import Any, Dict, Optional, Union | |
from urllib.parse import urlparse | |
import humanize | |
import pandas as pd | |
import requests | |
from kedro.io import AbstractDataSet, DataSetError, MemoryDataSet, PickleLocalDataSet | |
class HTTPRequestDataSet(AbstractDataSet): | |
""" | |
This class uses the requests library to ping an API endpoint and return the output. | |
""" | |
@staticmethod | |
def _clean_dict(some_dict: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
return some_dict if isinstance(some_dict, dict) else {} | |
def __init__(self, url: str, credentials: Optional[Dict[str, Any]] = None, | |
str_kwargs: Optional[Dict[str, Any]] = None, request_type: str = 'GET', | |
request_kwargs: Optional[Dict[str, Any]] = None): | |
""" | |
The constructor initialises the class attributes and formats the API request url | |
Args: | |
url: The API URL | |
credentials: The (optional) key needed to connect to the service | |
str_kwargs: Any additional keyword arguments | |
request_type: Define if GET or POST request | |
request_kwargs : The (optional) ``kwargs`` dictionary to provide to the ``requests.get()`` | |
""" | |
assert request_type.upper() in ('GET', 'POST') | |
self.request_type = request_type.upper() | |
formatted_url_args = self._clean_dict(str_kwargs) | |
formatted_creds = self._clean_dict(credentials) | |
formatted_request_kwargs = self._clean_dict(request_kwargs) | |
self.request_kwargs = {k: {key: value.format(**credentials) for key, value in v.items()} | |
for k, v in formatted_request_kwargs.items()} | |
self.url = url.format(**formatted_creds, **formatted_url_args) | |
self.api_attrs = credentials | |
def _load(self) -> requests.Response: | |
""" | |
The load function will return a ``requests.Response`` object and update class attributes | |
if a HTTP 200 response is returned. | |
Returns: | |
HTTP Response object | |
Raises: | |
DataSetError if request is unsuccessful | |
""" | |
if self.request_type == 'GET': | |
response = requests.get(url=self.url, **self.request_kwargs) | |
else: | |
response = requests.post(url=self.url, **self.request_kwargs) | |
try: | |
if response.status_code == 200: | |
return response | |
raise DataSetError('Unable to make requests {} code returned in response'.format(response.status_code)) | |
except requests.exceptions.MissingSchema as error: | |
raise DataSetError(error) | |
def _save(self, data: Any) -> MemoryDataSet: | |
""" | |
The save function will simply return a ``MemoryDataSet`` object | |
""" | |
return MemoryDataSet(data=data) | |
def _describe(self) -> Dict[str, Any]: | |
""" | |
The describe function will provide information about the current object | |
""" | |
return dict(url={self.url}) | |
def exists(self) -> bool: | |
""" | |
HTTP request can never exist before being called | |
""" | |
return False | |
class ExpiringHTTPRequestDataSet(AbstractDataSet): | |
""" | |
This class handles the HTTP request, but only calls a new version if the previous version | |
has expired or not | |
""" | |
def __init__(self, dataset: Union[HTTPRequestDataSet, Dict], | |
folder_path: str, | |
expiry: str, | |
credentials: Optional[Dict[str, Any]] = None): | |
""" | |
This constructor handles the creation of the expiring request dataset | |
Args: | |
dataset: The dataset to handle | |
credentials: The credentials to apply | |
folder_path: The folder path to handle | |
expiry: The maximum age of the dataset before it is recalled | |
""" | |
self.expiry = pd.to_timedelta(expiry) | |
if isinstance(dataset, Dict): | |
self._dataset = HTTPRequestDataSet(**dataset, credentials=credentials) | |
elif isinstance(dataset, HTTPRequestDataSet): | |
self._dataset = dataset | |
else: | |
raise ValueError("Invalid YAML definition provided") | |
self._folder_path = folder_path | |
self._name = self._hash_attrs() | |
self._host = urlparse(self._dataset.url).netloc | |
self._full_path = Path(self._folder_path + f'/{self._name}.pkl') | |
self._pickle = PickleLocalDataSet(filepath=self._full_path) | |
def _exists(self) -> bool: | |
""" | |
This function checks if the file already exists, in this case a check to | |
see if the file has already by persisted. | |
""" | |
return self._full_path.exists() | |
def _load(self) -> Any: | |
""" | |
""" | |
# If the file DOESN'T exist, load from scratch + save | |
if not self._exists(): | |
logging.info('Brand new request for "%s"', self._host) | |
data = self._dataset.load() | |
self._save(data=data) | |
# If the file DOES exist, but it HAS EXPIRED, load from scratch + save | |
elif self._age_seconds > self.expiry.total_seconds(): | |
logging.info('Refreshing "%s" after expiry', self._host) | |
data = self._dataset.load() | |
self._save(data=data) | |
# If the file DOES exist AND still within the expiry WINDOW, load saved | |
else: | |
logging.debug('Loading "%s" from cache', self._host) | |
data = self._pickle.load() | |
return data | |
def _save(self, data: Any) -> None: | |
""" | |
This function persists the dataset in the appropriate pickle | |
""" | |
self._pickle.save(data) | |
def _describe(self) -> Dict[str, Any]: | |
""" | |
This function provides a description of the current description | |
""" | |
return dict(expiry=self.expiry, host=self._host) | |
def get_created_date(self) -> pd.Timestamp: | |
""" | |
Details the created date or if it doesn't, provides start of unix time | |
""" | |
if self.exists(): | |
return pd.to_datetime(self._full_path.stat().st_mtime, unit='s') | |
return pd.to_datetime('1970-01-01') | |
def _hash_attrs(self) -> str: | |
""" | |
Since the name of the catalog entry is not known by the dataset object, | |
a hash of the object attributes is used in place | |
""" | |
attributes = json.dumps(self._dataset.__dict__, sort_keys=True) | |
hash_value = str(hashlib.md5(attributes.encode('utf8')).hexdigest()) | |
return hash_value | |
@staticmethod | |
def _get_now() -> pd.Timestamp: | |
""" | |
This convenience function is used to provide current time | |
""" | |
return pd.to_datetime('now') | |
@property | |
def _age_delta(self) -> pd.Timedelta: | |
""" | |
Return the age of the dataset since the cache was last created | |
""" | |
if self._exists(): | |
delta = self._get_now() - self.get_created_date() | |
return delta | |
return pd.to_timedelta(0) | |
@property | |
def _age_seconds(self) -> int: | |
""" | |
Return the age of the dataset in seconds | |
""" | |
return int(self._age_delta.total_seconds()) | |
@property | |
def _age_human(self) -> str: | |
""" | |
Return the age of the dataset in human friendly format | |
""" | |
return humanize.naturaldelta(self._age_delta) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment