Created
September 14, 2021 18:03
-
-
Save joaodaher/cac4bbc3de64ca3e36dbbffca9d19e2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from typing import Dict, Any | |
from urllib.parse import urlparse | |
import requests | |
from django.core.cache import cache | |
from rest_framework import status | |
from tenacity import retry, stop_after_delay, retry_if_exception_type | |
DEFAULT_PAGE_SIZE: int = 50 | |
ResourceType = Dict[str, Any] | |
QueryParamType = Dict[str, Any] | |
class BotmakerAuthException(Exception): | |
pass | |
def reauthenticate(retry_state): | |
client = retry_state.args[0] | |
client.update_credentials(force_renew=True) | |
botmaker_auth = retry( | |
stop=stop_after_delay(60), | |
retry=retry_if_exception_type(BotmakerAuthException), | |
after=reauthenticate, | |
) | |
@dataclass | |
class BotmakerClient: | |
client_id: str | |
secret_id: str | |
refresh_token: str | |
api_endpoint: str = "https://go.botmaker.com" | |
api_version: str = "/api/v1.0" | |
def __post_init__(self): | |
self.session = requests.Session() | |
self.update_credentials() | |
self.platform = "WHATSAPP" | |
def update_credentials(self, force_renew: bool = False): | |
token = self.get_token(force_renew=force_renew) | |
self.session = requests.Session() | |
self.session.headers.update({"access-token": token}) | |
def get_token(self, force_renew: bool = False): | |
token_key = "botmaker_access_token" | |
access_token = cache.get(token_key) | |
if force_renew or not access_token: | |
with cache.lock(f"BOTMAKER_RENEW_{self.client_id}_LOCK", timeout=10, blocking_timeout=5): | |
headers = { | |
"clientId": self.client_id, | |
"secretId": self.secret_id, | |
"refreshToken": self.refresh_token, | |
} | |
response = requests.post(url=f"{self.api_endpoint}{self.api_version}/auth/credentials", headers=headers) | |
if response.status_code != status.HTTP_200_OK: | |
raise BotmakerAuthException(response.status_code) | |
credentials = response.json() | |
access_token = credentials["accessToken"] | |
self.refresh_token = credentials["refreshToken"] | |
self._persist_credentials() | |
cache.set(token_key, access_token) | |
return access_token | |
def _persist_credentials(self): | |
# Considering that: | |
# - the refresh token is mandatory to generate new access token | |
# - the refresh token changes every time a new access token is generated | |
# we must save the newly created refresh token to the database to avoid locking out | |
# future instances of the Botmaker client | |
from v1 import models # pylint: disable=import-outside-toplevel | |
agent = models.WhatsAppBusinessAccount.objects.filter(client_id=self.client_id, secret_id=self.secret_id) | |
agent.update(refresh_token=self.refresh_token) | |
def _raise_for_status(self, response: requests.Response) -> None: | |
if response.status_code == status.HTTP_401_UNAUTHORIZED: | |
raise BotmakerAuthException("Access Token is not valid. Attempting to refresh it.") | |
response.raise_for_status() | |
body = response.json() | |
if body.get("problems") is not None: | |
raise WhatsAppException(body["problems"]["message"]) | |
@botmaker_auth | |
def _get(self, endpoint: str, params: QueryParamType = None) -> ResourceType: | |
url = f"{self.api_endpoint}{self.api_version}{endpoint}" | |
response = self.session.get( | |
url=url, | |
params=params, | |
) | |
self._raise_for_status(response=response) | |
return response.json() | |
@botmaker_auth | |
def _post(self, endpoint: str, body: ResourceType = None, params: QueryParamType = None) -> ResourceType: | |
url = f"{self.api_endpoint}{self.api_version}{endpoint}" | |
response = self.session.post( | |
url=url, | |
json=body or {}, | |
params=params, | |
) | |
self._raise_for_status(response=response) | |
return response.json() | |
def send_template_message(self, channel_id: str, whatsapp_id: str, template_name: str, **kwargs) -> ResourceType: | |
endpoint = "/intent/v2" | |
body = { | |
"chatPlatform": self.platform.lower(), # not a bug: this endpoint requires lowercase platform | |
"chatChannelNumber": channel_id, | |
"platformContactId": whatsapp_id, | |
"ruleNameOrId": template_name, | |
"params": kwargs, | |
} | |
return self._post( | |
endpoint=endpoint, | |
body=body, | |
) | |
def send_message(self, channel_id: str, whatsapp_id: str, text: str = None, file_url: str = None) -> ResourceType: | |
endpoint = "/message/v3" | |
body = { | |
"chatPlatform": self.platform.lower(), # not a bug: this endpoint requires lowercase platform | |
"chatChannelNumber": channel_id, | |
"platformContactId": whatsapp_id, | |
} | |
if not text and not file_url: | |
raise WhatsAppException("Either 'text' or 'file_url' must be provided to send a message") | |
if file_url: | |
file_handler = FileHandler.create(file=file_url) | |
body.update(file_handler.to_upload(url=file_url)) | |
if text: | |
body["messageText"] = text | |
return self._post( | |
endpoint=endpoint, | |
body=body, | |
) | |
def check_phone(self, channel_id: str, phone_number: str) -> bool: | |
endpoint = "/customer/checkWhatsAppContact" | |
body = { | |
"chatChannelNumber": channel_id, | |
"contacts": [phone_number], | |
} | |
response = self._post( | |
endpoint=endpoint, | |
body=body, | |
) | |
return response["result"].get(phone_number) | |
@dataclass | |
class FileHandler: | |
content: Any | |
name: str | |
mime_type: str | |
def to_upload(self, url: str) -> ResourceType: | |
# WhatsApp supported files <https://developers.facebook.com/docs/whatsapp/api/media#supported-files> | |
# are mapped to Botmaker params | |
if self.mime_type in ("image/jpeg", "image/png"): | |
return {"imageURL": url} | |
if self.mime_type in ("audio/aac", "audio/mp4", "audio/amr", "audio/mpeg", "audio/ogg"): | |
return {"audioURL": url} | |
return {"fileURL": url} | |
@classmethod | |
def create(cls, file: str) -> "FileHandler": | |
if not file.startswith("http"): | |
raise WhatsAppException("Only URLs are supported as WhatsApp files") | |
return cls.from_url(url=file) | |
@classmethod | |
def from_url(cls, url: str) -> "FileHandler": | |
response = requests.get(url=url) | |
response.raise_for_status() | |
return cls( | |
content=response.content, | |
name=urlparse(url).path.rsplit("/", 1)[-1], | |
mime_type=response.headers["Content-Type"], | |
) | |
class WhatsAppException(Exception): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment