Skip to content

Instantly share code, notes, and snippets.

@flniu
Created March 27, 2022 01:44
Show Gist options
  • Save flniu/5d7f0b799e7ddb500164c31a27a7cb93 to your computer and use it in GitHub Desktop.
Save flniu/5d7f0b799e7ddb500164c31a27a7cb93 to your computer and use it in GitHub Desktop.
HIPA API Helper
from time import sleep
import requests
from typing import TypedDict, Dict, Any, List
class RecordId(TypedDict):
id: str
class RecordData(TypedDict):
values: Dict[str, Any]
class Record(RecordId, RecordData):
createdAt: str
updatedAt: str
class PagedRecords(TypedDict):
page: int
pageSize: int
totalCount: int
items: List[Record]
class HipaApiException(Exception):
pass
class HipaApi:
rate_limit_per_second = 5
wait_seconds: float = 1 / rate_limit_per_second
def __init__(self, hipa_host: str, api_key: str, app_id: str, table_id: str):
self.endpoint = f"{hipa_host}/v1/apps/{app_id}/tables/{table_id}/records"
self.headers = {"Authorization": f"Bearer {api_key}"}
def _call_api(self, method: str, url: str, **kwargs) -> Any:
kwargs["headers"] = self.headers
r = requests.request(method, url, **kwargs)
if r.status_code != 200:
raise HipaApiException(r.text)
sleep(self.wait_seconds)
return r.json()
def list(self, page: int = 1, page_size: int = 100) -> PagedRecords:
params = {"page": page, "pageSize": page_size}
return self._call_api("get", self.endpoint, params=params)
def list_all(self) -> List[Record]:
page = 1
paged_records = self.list(page=page)
records = paged_records["items"]
while paged_records["totalCount"] > paged_records["pageSize"] * paged_records["page"]:
page += 1
paged_records = self.list(page=page)
records.extend(paged_records["items"])
return records
def get(self, record_id: str) -> Record:
return self._call_api("get", f"{self.endpoint}/{record_id}")
def create(self, data: RecordData) -> Record:
return self._call_api("post", self.endpoint, json=data)
def update(self, record_id: str, data: RecordData) -> Record:
return self._call_api("patch", f"{self.endpoint}/{record_id}", json=data)
def delete(self, record_id: str) -> RecordId:
return self._call_api("delete", f"{self.endpoint}/{record_id}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment