Created
June 17, 2021 16:03
-
-
Save fantix/759ebf1f6fcbc3e9adfd507ffc24074b to your computer and use it in GitHub Desktop.
Local CA with cryptography
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import os | |
import socket | |
import ssl | |
import threading | |
import uuid | |
from cryptography import x509 | |
from cryptography.hazmat import backends | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.x509 import oid | |
def _new_cert(name, issuer=None): | |
backend = backends.default_backend() | |
private_key = rsa.generate_private_key( | |
public_exponent=65537, key_size=2048, backend=backend | |
) | |
public_key = private_key.public_key() | |
subject = x509.Name([x509.NameAttribute(oid.NameOID.COMMON_NAME, name)]) | |
builder = ( | |
x509.CertificateBuilder() | |
.subject_name(subject) | |
.public_key(public_key) | |
.serial_number(int(uuid.uuid4())) | |
) | |
if issuer: | |
issuer_cert, signing_key = issuer | |
builder = ( | |
builder.issuer_name(issuer_cert.subject) | |
.not_valid_before(issuer_cert.not_valid_before) | |
.not_valid_after(issuer_cert.not_valid_after) | |
) | |
else: | |
signing_key = private_key | |
builder = ( | |
builder.issuer_name(subject) | |
.add_extension( | |
x509.BasicConstraints(ca=True, path_length=None), | |
critical=True, | |
) | |
.not_valid_before( | |
datetime.datetime.today() - datetime.timedelta(days=1) | |
) | |
.not_valid_after( | |
datetime.datetime.today() + datetime.timedelta(weeks=1000) | |
) | |
) | |
certificate = builder.sign( | |
private_key=signing_key, | |
algorithm=hashes.SHA256(), | |
backend=backend, | |
) | |
return certificate, private_key | |
def _write_cert(path, cert_key_pair): | |
certificate, private_key = cert_key_pair | |
with open(path, "wb") as f: | |
f.write( | |
private_key.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.TraditionalOpenSSL, | |
encryption_algorithm=serialization.NoEncryption(), | |
) | |
) | |
f.write( | |
certificate.public_bytes( | |
encoding=serialization.Encoding.PEM, | |
) | |
) | |
def new_ca(path, name): | |
cert_key_pair = _new_cert(name) | |
_write_cert(path, cert_key_pair) | |
return cert_key_pair | |
def load_ca(path): | |
with open(path, "rb") as f: | |
data = f.read() | |
private_key = serialization.load_pem_private_key(data, None) | |
certificate = x509.load_pem_x509_certificate(data) | |
return certificate, private_key | |
def new_cert(path, name, ca_cert_key_pair): | |
cert_key_pair = _new_cert(name, issuer=ca_cert_key_pair) | |
_write_cert(path, cert_key_pair) | |
def main(): | |
if not os.path.exists("server.pem") or not os.path.exists("ca.pem"): | |
try: | |
ca_cert_key_pair = load_ca("ca.pem") | |
except FileNotFoundError: | |
ca_cert_key_pair = new_ca("ca.pem", "Development CA") | |
new_cert("server.pem", "Development Server", ca_cert_key_pair) | |
s1, s2 = socket.socketpair() | |
def server(): | |
ctx = ssl.SSLContext() | |
ctx.load_cert_chain("server.pem") | |
ss1 = ctx.wrap_socket(s1, server_side=True) | |
print(ss1.recv(1024)) | |
ss1.send(b"World") | |
t = threading.Thread(target=server) | |
t.start() | |
client_ctx = ssl.SSLContext() | |
client_ctx.load_verify_locations("ca.pem") | |
client_ctx.verify_mode = ssl.CERT_REQUIRED | |
client_ctx.check_hostname = False | |
ss2 = client_ctx.wrap_socket(s2) | |
ss2.send(b"Hello") | |
print(ss2.recv(1024)) | |
t.join() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment