Last active
September 8, 2022 14:13
-
-
Save debuss/4d332dbdec2697ee76f0bc7689801857 to your computer and use it in GitHub Desktop.
Pega Hotfix Validator script in Python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pega Hotfix Validator script. | |
# @author Alexandre Debusschère <alexandre.debusschere@hey.com> | |
# @see https://docs.pega.com/keeping-current-pega/86/manually-verifying-hotfix-files-using-third-party-tools | |
from sys import exit, argv | |
from os import path, chdir | |
from zipfile import ZipFile | |
from operator import itemgetter | |
from shutil import rmtree | |
import json | |
import subprocess | |
class HotfixValidator: | |
EXIT_SUCCESS = 0 | |
EXIT_FAILURE = 1 | |
def __init__(self, argv): | |
self.argc = len(argv) | |
self.argv = argv | |
@staticmethod | |
def _out(messages): | |
if isinstance(messages, str): | |
messages = [messages] | |
for message in messages: | |
print(message) | |
def _success(self, messages): | |
if isinstance(messages, str): | |
messages = [messages] | |
self._out(map(lambda message: "\033[0;32m" + message + "\033[0m", messages)) | |
def _info(self, messages): | |
if isinstance(messages, str): | |
messages = [messages] | |
self._out(map(lambda message: "\033[0;36m" + message + "\033[0m", messages)) | |
def _error(self, messages): | |
if isinstance(messages, str): | |
messages = [messages] | |
self._out(map(lambda message: "\033[0;31m" + message + "\033[0m", messages)) | |
exit(self.EXIT_FAILURE) | |
def _help(self): | |
self._out([ | |
'Pega Hotfix Validator script', '', | |
'Usage: python3 hf-validator.py [file.zip]', '', | |
'Required argument:', | |
' file.zip', | |
' The hotfix zip file downloaded from Pega MSP' | |
]) | |
def _extract_hotfix(self): | |
if not path.exists(self.argv[1]) or not self.argv[1].lower().endswith('.zip'): | |
self._error('Provided hotfix is not a file or not ZIP file...') | |
self.working_directory = './' + self.argv[1][0:-4] | |
ZipFile(self.argv[1]).extractall(self.working_directory) | |
self._out('ZIP file extracted to '+self.working_directory) | |
def _set_working_directory(self): | |
chdir(self.working_directory) | |
self._out('Changed working directory to '+self.working_directory) | |
def _extractJsonSigfile(self): | |
f = open('SIGFILE.JSON') | |
self._sig = json.load(f) | |
f.close() | |
self._out('Extracted and decoded SIGFILE.JSON') | |
def _decode_certificates_to_files(self): | |
pegasystems = self._sig['certificates'][0] | |
intermediate = self._sig['certificates'][1] | |
subprocess.call(f'echo {pegasystems} | base64 --decode > pegasystems.der', shell=True) | |
self._out('Decoded certificate pegasystems.der') | |
subprocess.call(f'echo {intermediate} | base64 --decode > intermediate.der', shell=True) | |
self._out('Decoded certificates intermediate.der') | |
def _translate_certificates_into_crt_format(self): | |
subprocess.call('openssl x509 -in pegasystems.der -inform der > pegasystems.crt', shell=True) | |
self._out('Translated pegasystems.der to pegasystems.crt') | |
subprocess.call('openssl x509 -in intermediate.der -inform der > intermediate.crt', shell=True) | |
self._out('Translated intermediate.der to intermediate.crt') | |
def _verify_subject_certificate(self): | |
output = subprocess.getoutput('openssl x509 -in pegasystems.crt -text -noout') | |
values = 'C = US, ST = Massachusetts, L = Cambridge, O = Pegasystems Inc., CN = Pegasystems Inc.' | |
if values not in output: | |
self._error('Unable to match the certificate subject with the required one') | |
self._out('Verified subject certificate') | |
def _verify_certificate_chain(self): | |
output = subprocess.getoutput('openssl verify -crl_download -crl_check -untrusted intermediate.crt pegasystems.crt') | |
if output != 'pegasystems.crt: OK': | |
self._error('Unable to verify certificate chain: '+output) | |
self._out('Verified certificate chain') | |
def _extract_public_key_from_pegasystems_certificate(self): | |
subprocess.call('openssl x509 -pubkey -noout -in pegasystems.der -inform der > pubkey.pub', shell=True) | |
self._out('Extracted public key from PegaSystems certificate') | |
def _check_signatures(self): | |
for element in self._sig['signatures']: | |
name, signature = itemgetter('path', 'signature')(element) | |
self._info('Checking file: '+name) | |
subprocess.call(f'echo {signature} | base64 --decode > signature.sig', shell=True) | |
self._out('Decoded signature') | |
result = subprocess.getoutput(f'openssl dgst -verify pubkey.pub -keyform PEM -sha256 -signature signature.sig {name}') | |
if result != 'Verified OK': | |
self._error('Unable to verify certificate') | |
self._out('Verified certificate') | |
def _delete_extract_folder(self): | |
chdir('..') | |
rmtree(self.working_directory) | |
self._out('Cleaned folder '+self.working_directory) | |
def run(self): | |
if self.argc < 2: | |
self._help() | |
self._error('Missing hotfix ZIP file...') | |
self._extract_hotfix() | |
self._set_working_directory() | |
self._extractJsonSigfile() | |
self._decode_certificates_to_files() | |
self._translate_certificates_into_crt_format() | |
self._verify_subject_certificate() | |
self._verify_certificate_chain() | |
self._extract_public_key_from_pegasystems_certificate() | |
self._check_signatures() | |
self._delete_extract_folder() | |
self._success(f'Hotfix {self.argv[1]} validated successfully !') | |
exit(self.EXIT_SUCCESS) | |
validator = HotfixValidator(argv) | |
validator.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment