Skip to content

Instantly share code, notes, and snippets.

@MrDrMcCoy
Last active September 7, 2019 18:34
Show Gist options
  • Save MrDrMcCoy/a1bce4dddcb49af631c3db74e244dc02 to your computer and use it in GitHub Desktop.
Save MrDrMcCoy/a1bce4dddcb49af631c3db74e244dc02 to your computer and use it in GitHub Desktop.
Check for compromised credentials from CSV export.
#!/usr/bin/env python
"""
Author: Jeremy McCoy (jeremy@awesomegeek.com)
License: WTFPL
About:
This script helps you to check if any of your usernames and passwords have
been found in major security leaks.
It Will take your decrypted passwords from KeePass or simlar and send SHA1
hashes to haveibeenpwned.com to check if they have been compromised.
Assumptions:
- CSV password dump from KeePass2/LastPass or similar named passdb.csv.
It must have a header line that includes the following columns:
Title or Name,Username,Password,URL
- Python 2.7+
- Python requests library is installed
- OpenSSL version > 1.0 installed
"""
import argparse
import csv
import hashlib
import json
import logging
import re
import requests
import sys
import time
# Set up logger
logging.basicConfig(
format='%(levelname)s [%(lineno)d] %(message)s')
log = logging.getLogger('amipwned')
parser = argparse.ArgumentParser(
description="""
This script checks usernames and passwords against the haveibeenpwned
database for leaks and delivers a report of any compromised accounts.
""")
parser.add_argument('--passfile', default='passwords.csv',
help="""
CSV export (with header line) of your passwords from
LastPass, KeePass, Enpass, etc. Default = ./passwords.csv
""")
parser.add_argument('--report', default='report.json',
help="""
File to write report to in JSON format. Default = ./report.json
""")
parser.add_argument('--interval', default=1.51, type=float,
help="""
Time between requests to avoid rate limiting in seconds. Default = 1.51
""")
parser.add_argument('--strength', action='store_true',
help="""
Only checks for credential strength and exits.
""")
parser.add_argument('--loglevel', default='info',
help="""
Set logging level. Default = info
""")
conf = vars(parser.parse_args())
log.setLevel(logging.getLevelName(conf['loglevel'].upper()))
api_base = 'https://haveibeenpwned.com/api/v2/'
api_headers = {'User-Agent': 'amipwned.py'}
usernames = []
passwords = {}
breached_usernames = {}
pasted_emails = {}
pwned_passwords = {}
weak_credentials = []
skip_search = [
'admin',
'administrator',
'anonymous',
'info',
'root',
'username',
'webmaster',
''
]
total_emails = 0
total_csv = 0
def mrjson(stuff):
# This is a dict-to-json function that can safely handle non-serializable items
return json.dumps(
stuff,
default=lambda o: 'ERROR: Item not JSON serializable',
sort_keys=True,
indent=3)
def column_matcher(header, row):
username_column = None
password_column = None
title_column = None
url_column = None
email_column = None
# read header
for col_index, column in enumerate(header):
if column.lower() == 'username':
username_column = col_index
elif column.lower() == 'password':
password_column = col_index
elif column.lower() == 'url':
url_column = col_index
elif column.lower() == 'email':
email_column = col_index
elif column.lower() == 'name' or column.lower() == 'title':
title_column = col_index
# if header has most columns
if None not in [username_column, password_column, title_column, url_column]:
return username_column, password_column, title_column, url_column, email_column
else:
# otherwise, each row might be key,value. enpass is dumb like that.
username_column = None
password_column = None
title_column = None
url_column = None
email_column = None
# try to read row
for col_index, column in enumerate(row):
if column.lower() == 'username':
username_column = col_index + 1
elif column.lower() == 'password':
password_column = col_index + 1
elif column.lower() == 'url':
url_column = col_index + 1
elif column.lower() == 'email':
email_column = col_index + 1
elif column.lower() == 'name' or column.lower() == 'title':
title_column = col_index
return username_column, password_column, title_column, url_column, email_column
with open('lastpass.csv', 'rb') as passfile:
passdb = csv.reader(passfile, delimiter=',', quotechar='"')
for row_index, row in enumerate(passdb):
print(row_index, row)
break
try:
log.info('Reading password file...')
with open(conf['passfile'], 'rb') as passfile:
passdb = csv.reader(passfile, delimiter=',', quotechar='"')
for row_index, row in enumerate(passdb):
total_csv += 1
sha = None
# Save header
if row_index == 0:
header = row
continue
# Extract header and detect columns from CSV
username_column, password_column, title_column, url_column, email_column = column_matcher(header, row)
if password_column:
password = row[password_column]
hasher = hashlib.sha1()
hasher.update(password)
sha = hasher.hexdigest()
if sha not in passwords:
passwords[sha] = {
'usernames': [],
'titles': [],
'password': password,
'urls': []
}
if username_column:
username = row[username_column].lower()
if username not in usernames and username not in skip_search:
usernames.append(username)
if sha and username not in passwords[sha]['usernames']:
passwords[sha]['usernames'].append(username)
if email_column:
email = row[email_column].lower()
if email not in usernames and email not in skip_search:
usernames.append(email)
if sha and email not in passwords[sha]['usernames']:
passwords[sha]['usernames'].append(email)
if title_column:
title = row[title_column].lower()
if sha and title not in passwords[sha]['titles']:
passwords[sha]['titles'].append(title)
if url_column:
url = row[url_column]
if sha and url not in passwords[sha]['urls']:
passwords[sha]['urls'].append(url)
except Exception as e:
log.exception('Issue parsing CSV: ' + conf['passfile'] + '\n' + str(e))
sys.exit(1)
def credential_strength(credential):
log.debug('Checking credential:\n' + mrjson(credential))
credential['reasons'] = []
if len(credential['password']) <= 8:
credential['reasons'].append('too short')
if not re.search(r'\d', credential['password']):
credential['reasons'].append('no digits')
if not re.search(r'[a-z]', credential['password']):
credential['reasons'].append('no lowercase')
if not re.search(r'[A-Z]', credential['password']):
credential['reasons'].append('no uppercase')
if not re.search(r'[^\w]', credential['password']):
credential['reasons'].append('no special characters')
if len(credential['usernames']) > 1 or len(credential['titles']) > 1 or len(credential['urls']) > 2:
credential['reasons'].append('password reuse')
if len(credential['reasons']) > 0:
weak_credentials.append(credential)
log.warn('Weak credential:\n' + mrjson(credential))
log.info('Checking for weak passwords...')
for credential in passwords:
credential_strength(passwords[credential])
if not conf['strength']:
log.info('Checking for breached usernames...')
for username in usernames:
if ' ' not in username:
log.debug('Checking username: ' + username)
r = requests.get(api_base + 'breachedaccount/' + username, headers=api_headers)
if str(r.status_code) == 429:
log.error('Rate-limiting error received. Please try again later.')
sys.exit(1)
elif str(r.status_code) == 200:
breached_usernames[username] = json.loads(r.text)
log.warn(str(r.status_code) + ' Account breached: ' + username)
else:
log.debug(str(r.status_code) + ' Account OK: ' + username)
time.sleep(conf['interval'])
else:
log.debug('Skipping username: ' + username)
log.info('Checking for pasted emails...')
for username in usernames:
if '@' in username:
total_emails += 1
log.debug('Checking email: ' + username)
r = requests.get(api_base + 'pasteaccount/' + username, headers=api_headers)
if str(r.status_code) == 429:
log.error('Rate-limiting error received. Please try again later.')
sys.exit(1)
elif str(r.status_code) == 200:
pasted_emails[username] = json.loads(r.text)
log.warn(str(r.status_code) + ' Email in paste: ' + username)
else:
log.debug(str(r.status_code) + ' Email OK: ' + username)
time.sleep(conf['interval'])
else:
log.debug('Skipping non-email: ' + username)
log.info('Checking for pwned passwords...')
for sha in passwords:
log.debug('Checking password hash: ' + sha)
r = requests.get(api_base + 'pwnedpassword/' + sha, headers=api_headers)
if str(r.status_code) == 429:
log.error('Rate-limiting error received. Please try again later.')
sys.exit(1)
elif str(r.status_code) == 200:
pwned_passwords[sha] = passwords[sha]
log.warn(str(r.status_code) + ' Password pwned: ' + passwords[sha]['password'])
else:
log.debug(str(r.status_code) + ' Password OK: ' + passwords[sha]['password'])
time.sleep(conf['interval'])
log.info('Summary:\n' + mrjson({
'Total Usernames': len(usernames),
'Breached Usernames': len(breached_usernames),
'Total Emails': total_emails,
'Pasted Emails': len(pasted_emails),
'Total Passwords': len(passwords),
'Pwned Passwords': len(pwned_passwords),
'Total CSV entries': total_csv,
'Weak credentials': len(weak_credentials)
}))
report = mrjson({
'Breached Usernames': breached_usernames,
'Pasted Emails': pasted_emails,
'Pwned Passwords:': pwned_passwords,
'Weak credentials': weak_credentials
})
log.debug('Findings:\n' + report)
try:
with open(conf['report'], 'w') as outfile:
outfile.write(report)
log.info('Wrote detailed report to file: ' + conf['report'])
except Exception, e:
log.exception('Could not write report to file: ' + conf['report'] + '\n' + str(e))
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment