Skip to content

Instantly share code, notes, and snippets.

@h3po
Last active September 22, 2022 15:00
Show Gist options
  • Save h3po/14be83300b63db95240864c8c3229636 to your computer and use it in GitHub Desktop.
Save h3po/14be83300b63db95240864c8c3229636 to your computer and use it in GitHub Desktop.
openvpn verify script that checks ldap group membership and certificate ocsp status
dnf install openssl3 python3-ldap3 python3-cryptography
X509_0_CN="Foo Bar" peer_cert=./foo_bar.pem ./verify.py conf.py 0 cn=Foo\ Bar

example stdout

[DEBUG] args: ['./verify.py', 'conf.py', '0', 'cn=Foo Bar']
[INFO ] checking ocsp status for certificate 'cn=Foo Bar'
[DEBUG] ocsp check successful
[INFO ] checking ldap membership for 'cn=Foo Bar'
[DEBUG] prepared ldap search_filter: (&(commonName=Foo Bar)(memberof=cn=VPN,cn=groups,cn=accounts,dc=my,dc=domain))
[DEBUG] established ldap connection to ipa.my.domain as uid=svc_openvpn_ldap,cn=users,cn=accounts,dc=my,dc=domain
[DEBUG] membership check ok for (&(commonName=Foo Bar)(memberof=cn=VPN,cn=groups,cn=accounts,dc=my,dc=domain))
[DEBUG] group check successful
[INFO ] verify ok

usage in openvpn server conf

tls-export-cert /tmp
tls-verify "/etc/openvpn/server/scripts/openvpn-verify-cert.py conf.py"
LDAP_SERVER = "ipa.my.domain"
LDAP_USER_BASE = "cn=users,cn=accounts,dc=my,dc=domain"
LDAP_BINDDN = "uid=svc_openvpn_ldap,cn=users,cn=accounts,dc=my,dc=domain"
LDAP_BINDPW = "password"
LDAP_ATTR = "commonName"
CERT_ATTR = "X509_0_CN"
LDAP_GROUP = "cn=VPN,cn=groups,cn=accounts,dc=my,dc=domain"
LDAP_GROUP_SEARCH_RECURSIVE = True
import ldap3, ssl
TLS_CONFIG = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)
import sys
LOG_CONFIG = {
"version": 1,
"root": {
"handlers" : ["console", "file"],
"level": "DEBUG"
},
"handlers": {
"console": {
"formatter": "formatter",
"class": "logging.StreamHandler",
"level": "DEBUG",
"stream": sys.stdout
},
"file": {
"formatter": "formatter",
"class": "logging.FileHandler",
"level": "INFO",
"filename": "verify.log"
}
},
"formatters": {
"formatter": {
"format": "%(asctime)s [%(levelname)-5.5s] %(message)s"
}
},
}
CERT_BUNDLE = "./ca_bundle.pem"
IGNORE_BROKEN_OCSP = False
#!/usr/bin/env python3
import sys, os, logging, base64
from logging import debug, info, warn, error
import ldap3
#import requests, urllib
import cryptography.x509 as x509
#import cryptography.x509.ocsp as ocsp
import cryptography.hazmat.primitives
#import cryptography.hazmat.primitives.hashes
import subprocess
def userIsInLdapGroup(cert_mapping_value, config):
if config.LDAP_GROUP_SEARCH_RECURSIVE:
recursive = ":1.2.840.113556.1.4.1941:"
else:
recursive = ""
search_filter = f"(&({config.LDAP_ATTR}={cert_mapping_value})(memberof{recursive}={config.LDAP_GROUP}))"
debug(f"prepared ldap search_filter: {search_filter}")
try:
ldap_server = ldap3.Server(config.LDAP_SERVER, use_ssl=True, tls=config.TLS_CONFIG)
ldap_conn = ldap3.Connection(ldap_server, config.LDAP_BINDDN, config.LDAP_BINDPW, auto_bind=True)
debug(f"established ldap connection to {config.LDAP_SERVER} as {config.LDAP_BINDDN}")
except Exception as e:
error(f"could not establish ldap connection to {config.LDAP_SERVER} as {config.LDAP_BINDDN}: {e}")
return False
ldap_conn.search(config.LDAP_USER_BASE, search_filter)
if ldap_conn.entries:
#debug(f"got search results: {ldap_conn.entries}")
debug(f"membership check ok for {search_filter}")
return True
warn(f"got no search results for {search_filter}")
return False
def certificateOcspIsValid(cert, cacerts, config):
#retrieve the issuer cert
try:
issuer_cert = next(filter(lambda c: c.subject == cert.issuer, cacerts))
except StopIteration:
warn(f"issuer cert is not in bundle: {cert.subject}")
return False
#extract the aia extensions from the cert
try:
aia_extensions = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.AUTHORITY_INFORMATION_ACCESS).value
except cryptography.x509.ExtensionNotFound:
warn(f"aia extensions not found in cert: {cert.subject}")
return False
#extract the ocsp extension from the aia extensions
try:
ocsp_extension = next(filter(lambda e: e.access_method._name == "OCSP", aia_extensions))
except StopIteration:
warn(f"ocsp extension not found in cert: {cert.subject}")
return False
#extract the ocsp url from the ocsp extension
ocsp_url = ocsp_extension.access_location.value
if not ocsp_url:
warn(f"could not extract ocsp url from certificate: {cert.subject}")
return False
# #create the ocsp request data
# reqbuilder = ocsp.OCSPRequestBuilder()
# reqbuilder = reqbuilder.add_certificate(cert, issuer_cert, cryptography.hazmat.primitives.hashes.SHA1())
# req = base64.b64encode(reqbuilder.build().public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)).decode()
# #send the ocsp request
# try:
# http_response = requests.get(
# f"{ocsp_url}/{urllib.parse.quote(req)}",
# headers={"Content-Type": "application/ocsp-request"}
# )
# except requests.exceptions.BaseHTTPError:
# warn(f"could not reach ocsp url: {ocsp_url}")
# return False
# if not http_response.status_code == 200:
# warn(f"did not receive response from ocsp for: {cert.subject} {ocsp_url}")
# return False
# #parse the ocsp response
# try:
# ocsp_response = ocsp.load_der_ocsp_response(http_response.content)
# except Exception:
# error(f"could not parse ocsp response for: {cert.subject} {ocsp_url}")
# return False
# if not ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
# warn(f"ocsp response not successful: {ocsp_response.response_status}")
# return False
# return ocsp_response.certificate_status == ocsp.OCSPCertStatus.GOOD
#write the issuer certificate to a file
issuer_file = f"/tmp/{issuer_cert.subject.rfc4514_string()}.pem"
open(issuer_file, "wb").write(issuer_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM))
#check ocsp with openssl3
openssl = subprocess.run(["openssl3", "ocsp", "-CAfile", config.CERT_BUNDLE, "-issuer", issuer_file,
"-cert", os.environ["peer_cert"], "-url", ocsp_url],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
debug(f"openssl ocsp response output:\nsderr: {openssl.stderr.decode()}\nstdout: {openssl.stdout.decode()}")
#parse the openssl stdout and stderr
if openssl.returncode == 0 and \
openssl.stderr.decode() == "Response verify OK\n" and \
openssl.stdout.decode().startswith(f"{os.environ['peer_cert']}: good\n"):
debug(f"openssl ocsp check successful for {cert.subject}")
return True
warn("openssl ocsp response verify failed")
return False
def main():
config_file = sys.argv[1]
import importlib, logging.config
config = importlib.import_module(config_file.rstrip(".py"))
logging.config.dictConfig(config.LOG_CONFIG)
cert_depth, cert_dn = *sys.argv[2:],
debug(f"args: {sys.argv}")
#debug(f"environment: {os.environ}")
#read the input cert from tmp
peer_cert = open(os.environ["peer_cert"], "rb").read()
peer_cert = x509.load_pem_x509_certificate(peer_cert)
#cert is selfsigned
if peer_cert.issuer == peer_cert.subject:
debug(f"certificate is selfsigned: '{cert_dn}'")
else:
#read the issuer certificates into a list
start_line = b'-----BEGIN CERTIFICATE-----'
cacerts = []
for cacert in open(config.CERT_BUNDLE, "rb").read().split(start_line)[1:]:
cacerts.append(x509.load_pem_x509_certificate(start_line + cacert))
#the certificate is not selfsigned and not a leaf certificate, so it is a subca
if int(cert_depth) > 0:
info(f"checking that subca cert is in bundle '{cert_dn}'")
if not peer_cert in cacerts:
warn("subca cert is not in bundle")
return 1
debug("cert is in bundle")
#the certificate is a leaf
else:
info(f"checking ocsp status for certificate '{cert_dn}'")
if not certificateOcspIsValid(peer_cert, cacerts, config):
info(f"ocsp check failed")
if config.IGNORE_BROKEN_OCSP:
warn(f"ignoring ocsp check")
else:
return 1
debug("ocsp check successful")
info(f"checking ldap membership for '{cert_dn}'")
if not userIsInLdapGroup(os.environ[config.CERT_ATTR], config):
info(f"group check failed")
return 1
debug("group check successful")
info(f"verify ok: {cert_dn}")
return 0
if __name__ == "__main__":
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment