|
import sys |
|
import os |
|
import subprocess |
|
import shutil |
|
import readline |
|
import time |
|
|
|
CA_STORAGE_DIRECTORY = os.path.join(os.path.dirname(__file__), 'storage') |
|
CERTIFICATE_STORAGE_DIRECTORY = os.path.join(CA_STORAGE_DIRECTORY, 'certs') |
|
|
|
CA_CERT = os.path.join(CA_STORAGE_DIRECTORY, 'cacert.pem') |
|
CA_KEY = os.path.join(CA_STORAGE_DIRECTORY, 'cakey.key') |
|
CA_INDEX = os.path.join(CA_STORAGE_DIRECTORY, 'index.txt') |
|
CA_SERIAL = os.path.join(CA_STORAGE_DIRECTORY, 'serial.txt') |
|
CA_OPENSSL_CONFIG = os.path.join(CA_STORAGE_DIRECTORY, '../openssl-ca.conf') |
|
|
|
SERVER_CERT = os.path.join(CA_STORAGE_DIRECTORY, 'servercert.pem') |
|
SERVER_KEY = os.path.join(CA_STORAGE_DIRECTORY, 'serverkey.key') |
|
SERVER_SIGN_REQUEST = os.path.join(CA_STORAGE_DIRECTORY, 'servercert.csr') |
|
SERVER_OPENSSL_CONFIG = os.path.join(CA_STORAGE_DIRECTORY, 'openssl-server.conf') |
|
SERVER_OPENSSL_TEMPLATE = os.path.join(CA_STORAGE_DIRECTORY, '../openssl-server-template.conf') |
|
|
|
GENERATE_CA_COMMAND =\ |
|
'openssl req -x509 -config openssl-ca.conf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM -batch' |
|
|
|
GENERATE_CSR_COMMAND =\ |
|
'openssl req -config openssl-server.conf -newkey rsa:2048 -sha256 -nodes -out servercert.csr -outform PEM -batch' |
|
|
|
SIGN_NEW_CERTIFICATE =\ |
|
'openssl ca -config openssl-ca.conf -policy signing_policy -extensions signing_req -out servercert.pem ' \ |
|
'-infiles servercert.csr' |
|
|
|
|
|
class Runner: |
|
|
|
def __init__(self): |
|
if not os.path.exists(CA_STORAGE_DIRECTORY): |
|
os.makedirs(CA_STORAGE_DIRECTORY) |
|
|
|
if not os.path.exists(CERTIFICATE_STORAGE_DIRECTORY): |
|
os.makedirs(CERTIFICATE_STORAGE_DIRECTORY) |
|
|
|
if not os.path.exists(CA_OPENSSL_CONFIG): |
|
print('The file %s does not exist, cannot continue.' % CA_OPENSSL_CONFIG) |
|
sys.exit(1) |
|
|
|
if not os.path.exists(SERVER_OPENSSL_TEMPLATE): |
|
print('The file %s does not exist, cannot continue.' % SERVER_OPENSSL_TEMPLATE) |
|
sys.exit(1) |
|
|
|
os.chdir(CA_STORAGE_DIRECTORY) |
|
|
|
@staticmethod |
|
def already_has_ca(): |
|
return all(os.path.exists(x) for x in [CA_CERT, CA_KEY, CA_INDEX, CA_SERIAL]) |
|
|
|
def run(self): |
|
if not self.already_has_ca(): |
|
self.generate_certificate_authority() |
|
else: |
|
print('Found a certificate authority named %s' % self.get_certificate_authority_name()) |
|
|
|
names = self.ask_for_qualified_names() |
|
names = self.broaden_inputs(names) |
|
|
|
self.request_certificate_for_names(names) |
|
|
|
self.sign_new_certificate() |
|
|
|
print('Finished!') |
|
|
|
def generate_certificate_authority(self): |
|
print('Generating certificate authority...') |
|
|
|
shutil.copyfile(CA_OPENSSL_CONFIG, os.path.join(CA_STORAGE_DIRECTORY, 'openssl-ca.conf')) |
|
|
|
stdout, stderr, return_code = self.execute_command_whole_output(GENERATE_CA_COMMAND.split(' ')) |
|
print(stdout) |
|
print(stderr) |
|
if return_code != 0: |
|
print('Error while creating CA') |
|
sys.exit(1) |
|
|
|
open(CA_INDEX, 'a').close() |
|
open(CA_INDEX + '.attr', 'a').close() |
|
with open(CA_SERIAL, 'w') as f: |
|
f.write('01') |
|
|
|
print('Generated certificate authority. Be sure to insert ' + CA_CERT + ' to your keychain/OS/whatever.') |
|
|
|
@staticmethod |
|
def execute_command_whole_output(cmd: list, _input='') -> (str, str, int): |
|
process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=_input, encoding='ascii') |
|
return process.stdout, process.stderr, int(process.returncode) |
|
|
|
@staticmethod |
|
def ask_for_qualified_names(): |
|
print('For which domains would you like to generate SSL certificates?') |
|
print('(subdomains are generated automatically with a wildcard)') |
|
inputs = [] |
|
while True: |
|
try: |
|
this_input = input('Insert a domain name (or EOF to end) : ') |
|
if '' == this_input: |
|
continue |
|
if '.' not in this_input: |
|
this_input += '.localhost' |
|
inputs.append(this_input) |
|
except EOFError: |
|
print() |
|
if len(inputs) == 0: |
|
print('No names specified, exiting...') |
|
sys.exit(1) |
|
break |
|
return inputs |
|
|
|
@staticmethod |
|
def broaden_inputs(inputs): |
|
final_inputs = {} |
|
i = 1 |
|
for name in inputs: |
|
final_inputs['DNS.' + str(i)] = name |
|
i += 1 |
|
final_inputs['DNS.' + str(i)] = '*.' + name |
|
i += 1 |
|
i = 1 |
|
for ip in ['127.0.0.1', '::1']: |
|
final_inputs['IP.' + str(i)] = ip |
|
i += 1 |
|
return final_inputs |
|
|
|
def request_certificate_for_names(self, names): |
|
|
|
print('Generating request for certificate...') |
|
|
|
mappings = { |
|
'country_name': 'US', |
|
'province_name': 'Colorado', |
|
'locality_name': 'South Park', |
|
'organization_name': 'Localhost Server Inc.', |
|
'common_name': 'For example ' + names.get('DNS.1'), |
|
'alternate_names': '\n'.join('%s = %s' % (k, v) for k, v in names.items()) |
|
} |
|
|
|
with open(SERVER_OPENSSL_TEMPLATE, 'r') as t: |
|
final_config = t.read() |
|
for key, value in mappings.items(): |
|
final_config = final_config.replace('{{ %s }}' % key, value) |
|
pass |
|
|
|
with open(SERVER_OPENSSL_CONFIG, 'w') as c: |
|
c.write(final_config) |
|
|
|
stdout, stderr, return_code = self.execute_command_whole_output(GENERATE_CSR_COMMAND.split(' ')) |
|
print(stdout) |
|
print(stderr) |
|
if return_code != 0: |
|
print('Error while creating signing request') |
|
sys.exit(1) |
|
|
|
print('Request generated...') |
|
pass |
|
|
|
def sign_new_certificate(self): |
|
|
|
stdout, stderr, return_code = self.execute_command_whole_output(SIGN_NEW_CERTIFICATE.split(' '), 'y\ny\n') |
|
print(stdout) |
|
print(stderr) |
|
if return_code != 0: |
|
print('Error while signing request') |
|
sys.exit(1) |
|
|
|
with open(CA_SERIAL + '.old', 'r') as s: |
|
serial = s.read().strip() |
|
pem_path = os.path.join(CERTIFICATE_STORAGE_DIRECTORY, '%s.pem' % serial) |
|
with open(pem_path, 'a') as pem: |
|
with open(SERVER_KEY, 'r') as key: |
|
pem.write(key.read()) |
|
print('Wrote the private key into %s' % pem_path) |
|
|
|
@staticmethod |
|
def get_certificate_authority_name(): |
|
with open(CA_OPENSSL_CONFIG, 'r') as f: |
|
for line in f: |
|
if 'commonName_default' in line: |
|
return line.split('=')[1].strip() |
|
|
|
pass |