Skip to content

Instantly share code, notes, and snippets.

@fsultan
Last active May 28, 2024 22:24
Show Gist options
  • Save fsultan/95df6a7067918b6b68d21546c62f0aa5 to your computer and use it in GitHub Desktop.
Save fsultan/95df6a7067918b6b68d21546c62f0aa5 to your computer and use it in GitHub Desktop.
# import json
import logging
import os
import shutil
import boto3
from datetime import timedelta
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.x509.oid import NameOID
from botocore.exceptions import ClientError
from botocore.config import Config
# from secrets_manager_utils import SecretsManagerUtils
logger = logging.getLogger()
logger.setLevel(logging.INFO)
client_dict = {}
resource_dict = {}
current_aws_region = os.environ['AWS_REGION']
config = Config(
retries={
'max_attempts': 10,
'mode': 'standard'
}
)
def handler(event, context):
if event["is_standalone_deployment"] :
try:
SSL_CERT_GENERATOR_KMS_KEY_ARN = event['ssl_cert_generator_kms_key_arn']
CNAME = event['ssl_cert_cname']
key = generate_rsa(2048)
cert = create_cert(key)
PRIVATE_KEY_SECRET_NAME = "/application/gemfire/gemfire-cluster-self-signed-private-key"
CERTIFICATE_SECRET_NAME = "/application/gemfire/gemfire-cluster-self-signed-certificate"
TRUSTED_CERTIFICATE_SECRET_NAME = "/application/gemfire/gemfire-cluster-self-signed-trusted-certificate"
SecMgrClient = SecretsManagerUtils(kms_arn=SSL_CERT_GENERATOR_KMS_KEY_ARN)
upload_secret(SecMgrClient, key, PRIVATE_KEY_SECRET_NAME, type="private-key")
upload_secret(SecMgrClient, cert, CERTIFICATE_SECRET_NAME, type="certificate")
upload_secret(SecMgrClient, cert, TRUSTED_CERTIFICATE_SECRET_NAME, type="trusted-certificate")
except KeyError as err:
logger.error(f"Missing key in event payload: {err}")
raise err
except IOError as err:
logger.error(f"CANNOT ZIP CERTIFICATE FILES DUE TO IO ERROR: {err}")
raise err
except IOError as err:
logger.error(f"CANNOT CREATE CERTIFICATE DUE TO OS ERROR : {err}")
raise err
except Exception as err:
logger.error(f"CERTIFICATE SET UP FAILED DUE TO : {err}")
raise err
def upload_secret(client, secret, secret_name, type: str):
secret_string = ""
if type == "private-key":
secret_string = secret.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode()
else:
secret_string = secret.public_bytes(
encoding=serialization.Encoding.PEM
).decode()
try:
# If there is no secret with the given secret name: Create Secret
client.upload_secret
secret_name = secret_name, secret_string = secret_string
logger.info(f"CREATED {type} SECRET")
except ClientError as e:
try:
response = client.describe_secret(
secret_name=secret_name
)
if 'DeletedDate' in response:
# If the secret is already there in the secrets manager and in deleted state: restore and update secret
client.restore_secret(
secret_name=secret_name
)
client.put_secret_value(
secret_name=secret_name, secret_string=secret_string
)
logger.info (f"RESTORED AND UPDATED {type} SECRET")
else:
# if the secret is already there but not in the deleted state: update the secret
client.put_secret_value(
secret_name=secret_name, secret_string=secret_string
)
logger.info(f"UPDATED {type} SECRET")
except Exception as e:
logger.error(f"Could not upload certificate due to {e}")
return
except Exception as e:
logger.error(f"Could not upload certificate due to {e}")
def write_cert(cert, key):
try:
key_path = "/tmp/privateKey.pem"
cert_path = "/tmp/certificateChain.pem"
trusted_cert = "/tmp/trustedCertificates.pem"
with open(cert_path, "wt") as f_cert:
f_cert.write(cert.public_bytes(
encoding=serialization.Encoding.PEM
).decode("utf-8"))
with open(key_path, "wt") as f_private_key:
f_private_key.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode("utf-8"))
shutil.copy(cert_path, trusted_cert)
except Exception as e:
logger.error(f"COULD NOT WRITE FILE DUE TO : {e}")
raise e
def create_cert(key):
try:
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"New Jersey"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Jersey City"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"JP Morgan Chase"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"EFP"),
x509.NameAttribute(NameOID.COMMON_NAME, CNAME),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"New Jersey"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Jersey City"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"JP Morgan Chase"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"EFP"),
x509.NameAttribute(NameOID.COMMON_NAME, CNAME),
]))
builder = builder.not_valid_before(datetime.datetime.utcnow())
builder = builder.not_valid_after(datetime.datetime.utcnow() + timedelta(days=365))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(key.public_key())
builder = builder.add_extension(
x509.SubjectAlternativeName([x509.DNSName(CNAME)]),
critical=False,
)
cert = builder.sign(
private_key=key,
algorithm=hashes.SHA256(),
)
return cert
except Exception as e:
logger.error (f"CERTIFICATE CREATION FAILED DUE TO {e}")
raise e
def generate_rsa(bits) :
try:
key = rsa.generate_private_key(
public_exponent=65537,
key_size=bits,
backend=default_backend()
)
return key
except Exception as e:
logger.error ("RSA GENERATION ERROR")
raise e
def get_endpoint_url_dict(aws_region):
return {
"s3": f"https://s3.{aws_region}.amazonaws.com",
"ssm": f"https://ssm.{aws_region}.amazonaws.com",
"ec2": f"https://ec2.{aws_region}.amazonaws.com",
"emr": f"https://elasticmapreduce.{aws_region}.amazonaws.com",
"acm": f"https://acm.{aws_region}.amazonaws.com",
"sqs": f"https://sqs.{aws_region}.amazonaws.com",
"sns": f"https://sns.{aws_region}.amazonaws.com",
"elbv2": f"https://elasticloadbalancing.{aws_region}.amazonaws.com",
"lambda": f"https://lambda.{aws_region}.amazonaws.com",
"application-autoscaling": f"https://application-autoscaling.{aws_region}.amazonaws.com",
"secretsmanager": f"https://secretsmanager.{aws_region}.amazonaws.com"
}
def get_aws_client(aws_service):
if aws_service not in client_dict:
client_dict[aws_service] = boto3.client(
service_name=aws_service,
region_name=current_aws_region,
config=config,
endpoint_url=get_endpoint_url_dict(current_aws_region).get(aws_service)
)
return client_dict[aws_service]
class SecretsManagerUtils:
def __init__(self, kms_arn: str):
self.secretsmanager_client = get_aws_client('secretsmanager')
self.kms_key_id = kms_arn
def upload_secret(self, secret_name: str, secret_string: str):
return self.secretsmanager_client.create_secret(
Name=secret_name,
SecretString=secret_string,
KmsKeyId=self.kms_key_id
)
def restore_secret(self, secret_name: str):
return self.secretsmanager_client.restore_secret(
SecretId=secret_name
)
def describe_secret(self, secret_name: str):
return self.secretsmanager_client.describe_secret(
SecretId=secret_name
)
def put_secret_value(self, secret_name: str, secret_string: str):
return self.secretsmanager_client.put_secret_value(
SecretId=secret_name,
SecretString=secret_string
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment