Skip to content

Instantly share code, notes, and snippets.

@husio
Created November 14, 2016 17:26
Show Gist options
  • Save husio/26db6beaff5df317e258a305e8edb48f to your computer and use it in GitHub Desktop.
Save husio/26db6beaff5df317e258a305e8edb48f to your computer and use it in GitHub Desktop.
Service authentication in python.
from datetime import datetime, timedelta
import base64
import json
from oauth2client.service_account import ServiceAccountCredentials
import httplib2
class Authenticator:
def __init__(self, auth_service_url):
self.auth_service_url = auth_service_url
self._token = None
self._valid_till = None
def headers(self):
"""Return dictionary that contains header fields required for the
authentication process
"""
return {"Authentication": "Bearer " + self.jwt()}
def jwt(self):
if self._token is None \
or self._valid_till > datetime.now() - timedelta(minutes=10):
self._token, self._valid_till = authenticate(self.auth_service_url)
return self._token
def authenticate(auth_service_url):
token = acquire_access_token(auth_service_url)
try:
raw_header, raw_claims, _ = token.split('.')
except Exception:
raise CorruptedToken("invalid format")
try:
header = load_b64_json(raw_header)
except Exception:
raise CorruptedToken("cannot decode header")
if header.get("typ") != "JWT":
raise InvalidToken("not JWT token")
if header.get("alg") != "RS256":
raise InvalidToken("signature algorithm not supported")
try:
claims = load_b64_json(raw_claims)
except Exception:
raise CorruptedToken("cannot decode claims")
if 'exp' in claims:
exp = datetime.fromtimestamp(claims['exp'])
else:
exp = datetime.now() + timedelta(days=5)
return (token, exp)
def acquire_access_token(auth_service_url):
acc = ServiceAccountCredentials.get_application_default()
acc = acc.create_scoped([
"https://www.googleapis.com/auth/cloud-platform.read-only",
"https://www.googleapis.com/auth/userinfo.email",
])
http = acc.authorize(httplib2.Http())
_, body = http.request(auth_service_url + "/auth", "POST")
return json.loads(body.decode('utf8'))["accessToken"]
def load_b64_json(raw):
data = base64.b64decode(fix_base64_padding(raw))
return json.loads(data.decode('utf8'))
def fix_base64_padding(data):
missing_padding = len(data) % 4
if missing_padding != 0:
data += '=' * (4 - missing_padding)
return data
class AuthenticatorException(Exception):
pass
class CorruptedToken(AuthenticatorException):
pass
class InvalidToken(AuthenticatorException):
pass
if __name__ == "__main__":
# demo how it works
auth = Authenticator("https://auth-dot-opinary-dev.appspot.com")
print(auth.headers())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment