|
#!/usr/bin/env python3 |
|
|
|
import sys, os, logging, base64 |
|
from logging import debug, info, warn, error |
|
import ldap3 |
|
#import requests, urllib |
|
import cryptography.x509 as x509 |
|
#import cryptography.x509.ocsp as ocsp |
|
import cryptography.hazmat.primitives |
|
#import cryptography.hazmat.primitives.hashes |
|
import subprocess |
|
|
|
def userIsInLdapGroup(cert_mapping_value, config): |
|
|
|
if config.LDAP_GROUP_SEARCH_RECURSIVE: |
|
recursive = ":1.2.840.113556.1.4.1941:" |
|
else: |
|
recursive = "" |
|
|
|
search_filter = f"(&({config.LDAP_ATTR}={cert_mapping_value})(memberof{recursive}={config.LDAP_GROUP}))" |
|
debug(f"prepared ldap search_filter: {search_filter}") |
|
|
|
try: |
|
ldap_server = ldap3.Server(config.LDAP_SERVER, use_ssl=True, tls=config.TLS_CONFIG) |
|
ldap_conn = ldap3.Connection(ldap_server, config.LDAP_BINDDN, config.LDAP_BINDPW, auto_bind=True) |
|
debug(f"established ldap connection to {config.LDAP_SERVER} as {config.LDAP_BINDDN}") |
|
except Exception as e: |
|
error(f"could not establish ldap connection to {config.LDAP_SERVER} as {config.LDAP_BINDDN}: {e}") |
|
return False |
|
|
|
ldap_conn.search(config.LDAP_USER_BASE, search_filter) |
|
if ldap_conn.entries: |
|
#debug(f"got search results: {ldap_conn.entries}") |
|
debug(f"membership check ok for {search_filter}") |
|
return True |
|
|
|
warn(f"got no search results for {search_filter}") |
|
return False |
|
|
|
def certificateOcspIsValid(cert, cacerts, config): |
|
|
|
#retrieve the issuer cert |
|
try: |
|
issuer_cert = next(filter(lambda c: c.subject == cert.issuer, cacerts)) |
|
except StopIteration: |
|
warn(f"issuer cert is not in bundle: {cert.subject}") |
|
return False |
|
|
|
#extract the aia extensions from the cert |
|
try: |
|
aia_extensions = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.AUTHORITY_INFORMATION_ACCESS).value |
|
except cryptography.x509.ExtensionNotFound: |
|
warn(f"aia extensions not found in cert: {cert.subject}") |
|
return False |
|
|
|
#extract the ocsp extension from the aia extensions |
|
try: |
|
ocsp_extension = next(filter(lambda e: e.access_method._name == "OCSP", aia_extensions)) |
|
except StopIteration: |
|
warn(f"ocsp extension not found in cert: {cert.subject}") |
|
return False |
|
|
|
#extract the ocsp url from the ocsp extension |
|
ocsp_url = ocsp_extension.access_location.value |
|
if not ocsp_url: |
|
warn(f"could not extract ocsp url from certificate: {cert.subject}") |
|
return False |
|
|
|
# #create the ocsp request data |
|
# reqbuilder = ocsp.OCSPRequestBuilder() |
|
# reqbuilder = reqbuilder.add_certificate(cert, issuer_cert, cryptography.hazmat.primitives.hashes.SHA1()) |
|
# req = base64.b64encode(reqbuilder.build().public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)).decode() |
|
|
|
# #send the ocsp request |
|
# try: |
|
# http_response = requests.get( |
|
# f"{ocsp_url}/{urllib.parse.quote(req)}", |
|
# headers={"Content-Type": "application/ocsp-request"} |
|
# ) |
|
# except requests.exceptions.BaseHTTPError: |
|
# warn(f"could not reach ocsp url: {ocsp_url}") |
|
# return False |
|
|
|
# if not http_response.status_code == 200: |
|
# warn(f"did not receive response from ocsp for: {cert.subject} {ocsp_url}") |
|
# return False |
|
|
|
# #parse the ocsp response |
|
# try: |
|
# ocsp_response = ocsp.load_der_ocsp_response(http_response.content) |
|
# except Exception: |
|
# error(f"could not parse ocsp response for: {cert.subject} {ocsp_url}") |
|
# return False |
|
|
|
# if not ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL: |
|
# warn(f"ocsp response not successful: {ocsp_response.response_status}") |
|
# return False |
|
|
|
# return ocsp_response.certificate_status == ocsp.OCSPCertStatus.GOOD |
|
|
|
#write the issuer certificate to a file |
|
issuer_file = f"/tmp/{issuer_cert.subject.rfc4514_string()}.pem" |
|
open(issuer_file, "wb").write(issuer_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM)) |
|
|
|
#check ocsp with openssl3 |
|
openssl = subprocess.run(["openssl3", "ocsp", "-CAfile", config.CERT_BUNDLE, "-issuer", issuer_file, |
|
"-cert", os.environ["peer_cert"], "-url", ocsp_url], |
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
debug(f"openssl ocsp response output:\nsderr: {openssl.stderr.decode()}\nstdout: {openssl.stdout.decode()}") |
|
|
|
#parse the openssl stdout and stderr |
|
if openssl.returncode == 0 and \ |
|
openssl.stderr.decode() == "Response verify OK\n" and \ |
|
openssl.stdout.decode().startswith(f"{os.environ['peer_cert']}: good\n"): |
|
debug(f"openssl ocsp check successful for {cert.subject}") |
|
return True |
|
|
|
warn("openssl ocsp response verify failed") |
|
return False |
|
|
|
def main(): |
|
config_file = sys.argv[1] |
|
|
|
import importlib, logging.config |
|
config = importlib.import_module(config_file.rstrip(".py")) |
|
logging.config.dictConfig(config.LOG_CONFIG) |
|
|
|
cert_depth, cert_dn = *sys.argv[2:], |
|
debug(f"args: {sys.argv}") |
|
#debug(f"environment: {os.environ}") |
|
|
|
#read the input cert from tmp |
|
peer_cert = open(os.environ["peer_cert"], "rb").read() |
|
peer_cert = x509.load_pem_x509_certificate(peer_cert) |
|
|
|
#cert is selfsigned |
|
if peer_cert.issuer == peer_cert.subject: |
|
debug(f"certificate is selfsigned: '{cert_dn}'") |
|
|
|
else: |
|
#read the issuer certificates into a list |
|
start_line = b'-----BEGIN CERTIFICATE-----' |
|
cacerts = [] |
|
for cacert in open(config.CERT_BUNDLE, "rb").read().split(start_line)[1:]: |
|
cacerts.append(x509.load_pem_x509_certificate(start_line + cacert)) |
|
|
|
#the certificate is not selfsigned and not a leaf certificate, so it is a subca |
|
if int(cert_depth) > 0: |
|
info(f"checking that subca cert is in bundle '{cert_dn}'") |
|
if not peer_cert in cacerts: |
|
warn("subca cert is not in bundle") |
|
return 1 |
|
debug("cert is in bundle") |
|
|
|
#the certificate is a leaf |
|
else: |
|
info(f"checking ocsp status for certificate '{cert_dn}'") |
|
if not certificateOcspIsValid(peer_cert, cacerts, config): |
|
info(f"ocsp check failed") |
|
if config.IGNORE_BROKEN_OCSP: |
|
warn(f"ignoring ocsp check") |
|
else: |
|
return 1 |
|
debug("ocsp check successful") |
|
|
|
info(f"checking ldap membership for '{cert_dn}'") |
|
if not userIsInLdapGroup(os.environ[config.CERT_ATTR], config): |
|
info(f"group check failed") |
|
return 1 |
|
debug("group check successful") |
|
|
|
info(f"verify ok: {cert_dn}") |
|
return 0 |
|
|
|
if __name__ == "__main__": |
|
exit(main()) |