Created
February 10, 2022 16:31
-
-
Save andyeff/9b20f6e69c1948bbfa3b59a1444a74c2 to your computer and use it in GitHub Desktop.
Python SSL Cert Checker for AWS Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
SSL Cert inspection function | |
OpenSSL is available from the PyOpenSSL package | |
""" | |
import json | |
import logging | |
from datetime import datetime | |
import boto3 | |
from botocore.exceptions import ClientError | |
from OpenSSL import crypto | |
import base64 | |
# Set up some initial logging | |
logging.basicConfig(level=logging.NOTSET) | |
logger = logging.getLogger(__name__) | |
def lambda_handler(event, context): | |
cert_secret = get_secret("my/aws/secret/cert") | |
mycert = cert_secret["cert"] | |
mykey = cert_secret["key"] | |
# Cert is stored in AWS Secrets base64 encoded, so lets do the decode-dance | |
cert_b64b = mycert.encode("ascii") | |
key_b64b = mykey.encode("ascii") | |
cert_b = base64.b64decode(cert_b64b) | |
key_b = base64.b64decode(key_b64b) | |
plaincert = cert_b.decode("ascii") | |
plainkey = key_b.decode("ascii") | |
# Write the files to "disk" so it can be used as a client cert by Requests, for example | |
with open("/tmp/cert.crt", "w") as file: | |
file.writelines(plaincert) | |
with open("/tmp/key.pem", "w") as file: | |
file.writelines(plainkey) | |
cert = crypto.load_certificate(crypto.FILETYPE_PEM, plaincert) | |
# Get expiry date | |
cert_expiry = cert.get_notAfter() | |
cert_expiry = cert_expiry.decode("utf-8") | |
cert_expiry = datetime.strptime(cert_expiry, "%Y%m%d%H%M%SZ") | |
cert_expiry_str = datetime.strftime(cert_expiry, "%m/%d/%Y, %H:%M:%S") | |
# Check if expired | |
cert_hasexpired = cert.has_expired() | |
# Get subject | |
cert_subject = cert.get_subject() | |
cstr = str(cert_subject) | |
# Sample return response | |
return {"certExpiry": cert_expiry_str, "expiryStatus": cert_hasexpired, "subject": cstr} | |
# --- helper functions | |
def get_secret(requested_secret: str) -> dict: | |
""" | |
get_db_secret: | |
takes a string, returns a dict of the secrets contents | |
secret is parsed to ensure it contains all required fields | |
for RDS connectivity | |
""" | |
session = boto3.session.Session() | |
client = session.client(service_name="secretsmanager", region_name="eu-west-2") | |
try: | |
get_secret_value_response = client.get_secret_value(SecretId=requested_secret) | |
except ClientError as error: | |
logger.error(f"Error in obtaining secret {requested_secret}") | |
if error.response["Error"]["Code"] == "AccessDeniedException": | |
logger.error("Access failure reading secrets") | |
elif error.response["Error"]["Code"] == "InvalidRequestException": | |
logger.error("Invalid Request - confirm KMS is valid") | |
elif error.response["Error"]["Code"] == "ResourceNotFoundException": | |
logger.error(f"Could not find requested secret {requested_secret}") | |
else: | |
logger.error(f"Error while obtaining secret {requested_secret}") | |
logger.error(error) | |
raise error | |
secretstr = get_secret_value_response["SecretString"] | |
secret = json.loads(secretstr) | |
return secret |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment