Skip to content

Instantly share code, notes, and snippets.

@joaodaher
Created September 14, 2021 18:03
Show Gist options
  • Save joaodaher/cac4bbc3de64ca3e36dbbffca9d19e2c to your computer and use it in GitHub Desktop.
Save joaodaher/cac4bbc3de64ca3e36dbbffca9d19e2c to your computer and use it in GitHub Desktop.
from dataclasses import dataclass
from typing import Dict, Any
from urllib.parse import urlparse
import requests
from django.core.cache import cache
from rest_framework import status
from tenacity import retry, stop_after_delay, retry_if_exception_type
DEFAULT_PAGE_SIZE: int = 50
ResourceType = Dict[str, Any]
QueryParamType = Dict[str, Any]
class BotmakerAuthException(Exception):
pass
def reauthenticate(retry_state):
client = retry_state.args[0]
client.update_credentials(force_renew=True)
botmaker_auth = retry(
stop=stop_after_delay(60),
retry=retry_if_exception_type(BotmakerAuthException),
after=reauthenticate,
)
@dataclass
class BotmakerClient:
client_id: str
secret_id: str
refresh_token: str
api_endpoint: str = "https://go.botmaker.com"
api_version: str = "/api/v1.0"
def __post_init__(self):
self.session = requests.Session()
self.update_credentials()
self.platform = "WHATSAPP"
def update_credentials(self, force_renew: bool = False):
token = self.get_token(force_renew=force_renew)
self.session = requests.Session()
self.session.headers.update({"access-token": token})
def get_token(self, force_renew: bool = False):
token_key = "botmaker_access_token"
access_token = cache.get(token_key)
if force_renew or not access_token:
with cache.lock(f"BOTMAKER_RENEW_{self.client_id}_LOCK", timeout=10, blocking_timeout=5):
headers = {
"clientId": self.client_id,
"secretId": self.secret_id,
"refreshToken": self.refresh_token,
}
response = requests.post(url=f"{self.api_endpoint}{self.api_version}/auth/credentials", headers=headers)
if response.status_code != status.HTTP_200_OK:
raise BotmakerAuthException(response.status_code)
credentials = response.json()
access_token = credentials["accessToken"]
self.refresh_token = credentials["refreshToken"]
self._persist_credentials()
cache.set(token_key, access_token)
return access_token
def _persist_credentials(self):
# Considering that:
# - the refresh token is mandatory to generate new access token
# - the refresh token changes every time a new access token is generated
# we must save the newly created refresh token to the database to avoid locking out
# future instances of the Botmaker client
from v1 import models # pylint: disable=import-outside-toplevel
agent = models.WhatsAppBusinessAccount.objects.filter(client_id=self.client_id, secret_id=self.secret_id)
agent.update(refresh_token=self.refresh_token)
def _raise_for_status(self, response: requests.Response) -> None:
if response.status_code == status.HTTP_401_UNAUTHORIZED:
raise BotmakerAuthException("Access Token is not valid. Attempting to refresh it.")
response.raise_for_status()
body = response.json()
if body.get("problems") is not None:
raise WhatsAppException(body["problems"]["message"])
@botmaker_auth
def _get(self, endpoint: str, params: QueryParamType = None) -> ResourceType:
url = f"{self.api_endpoint}{self.api_version}{endpoint}"
response = self.session.get(
url=url,
params=params,
)
self._raise_for_status(response=response)
return response.json()
@botmaker_auth
def _post(self, endpoint: str, body: ResourceType = None, params: QueryParamType = None) -> ResourceType:
url = f"{self.api_endpoint}{self.api_version}{endpoint}"
response = self.session.post(
url=url,
json=body or {},
params=params,
)
self._raise_for_status(response=response)
return response.json()
def send_template_message(self, channel_id: str, whatsapp_id: str, template_name: str, **kwargs) -> ResourceType:
endpoint = "/intent/v2"
body = {
"chatPlatform": self.platform.lower(), # not a bug: this endpoint requires lowercase platform
"chatChannelNumber": channel_id,
"platformContactId": whatsapp_id,
"ruleNameOrId": template_name,
"params": kwargs,
}
return self._post(
endpoint=endpoint,
body=body,
)
def send_message(self, channel_id: str, whatsapp_id: str, text: str = None, file_url: str = None) -> ResourceType:
endpoint = "/message/v3"
body = {
"chatPlatform": self.platform.lower(), # not a bug: this endpoint requires lowercase platform
"chatChannelNumber": channel_id,
"platformContactId": whatsapp_id,
}
if not text and not file_url:
raise WhatsAppException("Either 'text' or 'file_url' must be provided to send a message")
if file_url:
file_handler = FileHandler.create(file=file_url)
body.update(file_handler.to_upload(url=file_url))
if text:
body["messageText"] = text
return self._post(
endpoint=endpoint,
body=body,
)
def check_phone(self, channel_id: str, phone_number: str) -> bool:
endpoint = "/customer/checkWhatsAppContact"
body = {
"chatChannelNumber": channel_id,
"contacts": [phone_number],
}
response = self._post(
endpoint=endpoint,
body=body,
)
return response["result"].get(phone_number)
@dataclass
class FileHandler:
content: Any
name: str
mime_type: str
def to_upload(self, url: str) -> ResourceType:
# WhatsApp supported files <https://developers.facebook.com/docs/whatsapp/api/media#supported-files>
# are mapped to Botmaker params
if self.mime_type in ("image/jpeg", "image/png"):
return {"imageURL": url}
if self.mime_type in ("audio/aac", "audio/mp4", "audio/amr", "audio/mpeg", "audio/ogg"):
return {"audioURL": url}
return {"fileURL": url}
@classmethod
def create(cls, file: str) -> "FileHandler":
if not file.startswith("http"):
raise WhatsAppException("Only URLs are supported as WhatsApp files")
return cls.from_url(url=file)
@classmethod
def from_url(cls, url: str) -> "FileHandler":
response = requests.get(url=url)
response.raise_for_status()
return cls(
content=response.content,
name=urlparse(url).path.rsplit("/", 1)[-1],
mime_type=response.headers["Content-Type"],
)
class WhatsAppException(Exception):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment