Created
June 25, 2016 19:15
-
-
Save lrstanley/effdb7c618fb7387a3672937cba27134 to your computer and use it in GitHub Desktop.
Old Exiclean version, in Python. Here for reference.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
Exiclean -- Exim mail queue cleaner/spam removal script | |
----------------------------------------------------------------------------- | |
LICENSE: The MIT License (MIT) | |
Copyright (c) 2016 Liam Stanley <me@liamstanley.io> | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
import os | |
import re | |
import sys | |
import glob | |
import time | |
import getopt | |
from hashlib import md5 | |
from threading import Thread | |
try: | |
import queue | |
except: | |
import Queue as queue | |
VERSION = 0.1 | |
_args = [ | |
{ | |
'name': 'help', 'short': 'h', 'long': 'help', | |
'description': 'Shows this help dialog' | |
}, | |
{ | |
'name': 'version', 'short': 'v', 'long': 'version', | |
'description': 'Shows the current exiclean version' | |
}, | |
{ | |
'name': 'nocolors', 'short': 'n', 'long': 'no-colors', | |
'description': 'Strips color from all output' | |
}, | |
{ | |
'name': 'spooldir', 'short': 's:', 'long': 'spool-dir=', | |
'description': 'Changes the default spool directory', | |
'default': '/var/spool/exim/input/' | |
}, | |
{ | |
'name': 'readcount', 'short': 'r:', 'long': 'rthreads=', | |
'default': 2, 'description': 'Number of threads used for reading queue (min: 1, max: 5)' | |
}, | |
{ | |
'name': 'deletecount', 'short': 'd:', 'long': 'dthreads=', | |
'default': 1, 'description': 'Number of threads used for removing items from queue (min: 1, max: 5)' | |
}, | |
{ | |
'name': 'top', 'long': 'top-common', | |
'description': 'Show top common items within the queue' | |
}, | |
{ | |
'name': 'quiet', 'short': 'q', 'long': 'quiet', | |
'description': 'Print out only useful information' | |
}, | |
{ | |
'name': 'sort_user', 'short': 'u:', 'long': 'user=', | |
'description': 'Filter results based on user' | |
}, | |
{ | |
'name': 'queuemax', 'long': 'queue-max=', | |
'description': 'Only pull last X number of items from the queue' | |
} | |
] | |
colors = { | |
"black": "\033[30m", "blue": "\033[34m", "green": "\033[32m", | |
"cyan": "\033[36m", "red": "\033[31m", "purple": "\033[35m", | |
"brown": "\033[33m", "lightgray": "\033[37m", "darkgray": "\033[1;30m", | |
"lightblue": "\033[1;34m", "lightgreen": "\033[1;30m", "lightcyan": "\033[1;36m", | |
"lightred": "\033[1;31m", "lightpurple": "\033[1;35m", "yellow": "\033[1;33m", | |
"white": "\033[1;37m", "cr": "\033[0m" | |
} | |
class Exiclean(object): | |
def __init__(self): | |
self.db = {} | |
self.readq = queue.Queue() | |
self.read_init = False | |
self.read_count = 0 | |
self.deleteq = queue.Queue() | |
self.delete_init = False | |
self.delete_count = 0 | |
# headers we want if they exist in the exim header files | |
self.headers = ['to', 'from', 'subject'] | |
# headers we want to use to sort/group by | |
self.organizable = ['to', 'from', 'subject', 'user'] | |
# set some sane screen width/height -- used for self.out() | |
self.width = 80 | |
self.height = 25 | |
# start parsing sys.argv[] | |
self.gen_args() | |
# update screen dimensions (self.width, self.height) | |
self.update_dimensions() | |
def show_help(self): | |
""" | |
Prints out script help dialog -- attempts to simulate argparse or | |
optparse. Needs to be manually done to be able to be 2-3 compatible. | |
""" | |
helpdoc = """ | |
usage: {exe} [-h] [arguments] | |
exiclean -- exim mail queue cleaner/spam removal script | |
optional arguments: | |
{arguments} | |
""" | |
helpdoc = re.sub(r'^ +', '', helpdoc.lstrip("\n"), flags=re.M) | |
args = [] | |
for item in _args: | |
cmds = "" | |
if item.get('short'): cmds += "-%s" % item['short'].rstrip(':') | |
if item.get('short') and item.get('long'): | |
cmds += ", --%s" % item['long'].rstrip('=') | |
elif item.get('long'): | |
cmds += "--%s" % item['long'].rstrip('=') | |
if item.get('short', '').endswith(':') or item.get('long', '').endswith('='): | |
cmds += " <args>" | |
desc = item.get('description') or "No description" | |
args.append([cmds, desc]) | |
helpdoc = helpdoc.format(exe=sys.argv[0], arguments=self.table_fmt(args)) | |
self.out(helpdoc, tag=False) | |
def table_fmt(self, data): | |
""" | |
Prints out a unix "column" style output. Pass in a list of lists, e.g: | |
self.table_fmt([['something', 'else'], ['a', 'b']]) | |
""" | |
widths = [max(map(len, col)) for col in zip(*data)] | |
tmp = "" | |
for row in data: | |
tmp += " ".join((val.ljust(width) for val, width in zip(row, widths))).strip() | |
tmp += "\n" | |
return tmp | |
def nproc(self): | |
""" Returns number of processors seen by the system """ | |
try: | |
out = os.popen('cat /proc/cpuinfo').read() | |
cores = int(len(re.findall(r'proc', out))) | |
except: | |
cores = 1 | |
return cores | |
def update_dimensions(self): | |
""" Updates self.width & self.height based on terminal dimensions """ | |
try: | |
h, w = os.popen('stty size', 'r').read().split() | |
self.height, self.width = int(h), int(w) | |
except: | |
pass | |
def out(self, text="", ret=True, tag=True, parse_colors=True, trim=False, info=False): | |
""" | |
Stdout wrapper | |
optional args: | |
ret -- Add carriage return (allows text to be overwritten) | |
tag -- (True) Show [EXICLEAN] prefix tag line | |
parse_colors -- (True) Inject ANSII color codes into stdout | |
trim -- Ellipsis output based on terminal width | |
info -- (True) if self.quiet is set, output gets dropped | |
""" | |
if info and self.quiet: | |
return | |
if tag and len(text) > 1: | |
text = "[EXICLEAN]: " + text | |
if not ret: | |
text = "\r" + text | |
else: | |
text += "\n" | |
if self.nocolors or not parse_colors: | |
text = re.sub(r'<[a-z]+>', '', text) | |
else: | |
for color in colors: | |
text = text.replace("<%s>" % color, colors[color]) | |
if trim: | |
try: | |
ansii_count = len(re.findall(r'(\x1b[^m]*m)', text)) | |
except: | |
ansii_count = 0 | |
if len(re.sub(r'\x1b[^m]*m', '', text)) > self.width: | |
text = text[:(self.width + (ansii_count * 2)) - 3].rstrip('. ') + "...\n" | |
sys.stdout.write(text) | |
sys.stdout.flush() | |
def exit(self, text): | |
if text: | |
self.out("<red>Error: %s<cr>" % str(text)) | |
sys.exit(1) | |
def gen_args(self): | |
""" | |
Parse sys.argv[] and map to Exiclean() class attributes. E.g: | |
-q -> self.quiet | |
As we are trying to support broad versions of Python (e.g. 2.4+ -> 3.5+), | |
this needs to be done manually to be consistent between Python versions | |
""" | |
for _arg in _args: | |
if 'default' in _arg: | |
setattr(self, _arg['name'], _arg['default']) | |
else: | |
setattr(self, _arg['name'], None) | |
_short = [x['short'] for x in _args if x.get('short')] | |
_long = [x['long'] for x in _args if x.get('long')] | |
try: | |
opts, alt_args = getopt.getopt(sys.argv[1:], ''.join(_short), _long) | |
except getopt.GetoptError as err: | |
self.exit(err) | |
for name, value in opts: | |
name = name.lstrip('-') | |
arg = None | |
for _arg in _args: | |
if name == _arg.get('short', '').rstrip(':') or name == _arg.get('long', '').rstrip('='): | |
arg = _arg | |
break | |
if not arg: | |
self.exit("Unknown arg %s" % name) | |
if value == '': value = True | |
if isinstance(value, str) and value.isdigit(): value = int(value) | |
setattr(self, arg['name'], value) | |
def init_read(self): | |
""" Initialize read queue threads """ | |
if self.read_init: return | |
if self.readcount < 1: | |
self.readcount = 1 | |
if self.readcount > 10: | |
self.readcount = 10 | |
self.out("Detecting core count: <cyan>%d<cr>" % self.nproc(), info=True) | |
if (float(self.readcount) / float(self.nproc())) >= 2.5: | |
self.out("<yellow>WARNING: Selecting higher threads than cores may impose additional load!<cr>", info=True) | |
self.out("Starting <cyan>%d<cr> read thread(s) (min: 1, max: 10)" % self.readcount, info=True) | |
def _read(): | |
while True: | |
fn = self.readq.get() | |
if fn is None: | |
break | |
self.read_header_files(fn) | |
self.readq.task_done() | |
for i in range(self.readcount): | |
t = Thread(target=_read) | |
t.setDaemon(True) | |
t.start() | |
self.read_init = True | |
def init_delete(self): | |
""" Initialize delete queue threads """ | |
if self.delete_init: return | |
if self.deletecount < 1: | |
self.deletecount = 1 | |
if self.deletecount > 5: | |
self.deletecount = 5 | |
if self.deletecount >= 4: | |
self.out("<yellow>WARNING: Selecting 4+ delete threads may cause very high IOWAIT!<cr>", info=True) | |
self.out("Starting <cyan>%d<cr> delete thread(s) (min: 1, max: 5)" % self.deletecount, info=True) | |
def _delete(): | |
while True: | |
fn = self.deleteq.get() | |
self.delete_header_files(fn) | |
self.deleteq.task_done() | |
for i in range(self.deletecount): | |
t = Thread(target=_delete) | |
t.setDaemon(True) | |
t.start() | |
self.delete_init = True | |
def read_header_files(self, fn): | |
""" Read header files from the exim spool dir (self.spooldir) """ | |
try: | |
with open(fn, 'r') as f: | |
text = f.read() | |
def get_header(search): | |
tmp = re.findall(search, text, flags=re.M) | |
if len(tmp) > 0: | |
return tmp[0].strip() | |
return None | |
mid = get_header(r'^([a-zA-Z0-9-]{16})-H$') | |
ident = get_header(r'^-ident (.*?)$') | |
if not ident or (self.sort_user and ident and ident.lower() != self.sort_user.lower()): | |
return | |
msg = { | |
'id': mid, | |
'user': ident, | |
'fn': fn | |
} | |
for item in re.findall(r'^[0-9A-Z\*]{3,4} +([^:]+): +(.*?)$', text, flags=re.M): | |
header, header_content = item[0].strip().lower(), item[1].strip() | |
if len(header_content) > 65: | |
header_content = header_content[:65] + "..." | |
if header in ['to', 'from']: | |
header_content = re.sub(r'.*<([^>]+)>.*', '\g<1>', header_content) | |
if header in self.headers: | |
msg[header] = header_content | |
for item in self.organizable: | |
if item not in msg: | |
msg[item] = None | |
self.db[mid] = msg | |
self.read_count += 1 | |
except: | |
pass | |
def delete_header_files(self, id): | |
""" Delete header (and map/body) files from exim spool dir (self.spooldir) """ | |
email = self.db.get(id, None) | |
if not email: | |
return | |
self.db.pop(id, None) | |
header_fn = email['fn'] | |
extra_fn = re.sub(r'\-H$', '-D', header_fn) | |
try: | |
os.remove(header_fn) | |
except OSError: | |
pass | |
self.delete_count += 1 | |
if header_fn != extra_fn: | |
try: | |
os.remove(extra_fn) | |
except OSError: | |
pass | |
def delete_sync(self, id_list): | |
""" Dumps id_list info deleteq and tracks total deletions """ | |
self.delete_count = 0 | |
[self.deleteq.put(id) for id in id_list] | |
def status(): | |
self.out("Deleting items from exim queue: <cyan>%d<cr>" % self.delete_count, ret=False, info=True) | |
while self.deleteq.unfinished_tasks: | |
status() | |
time.sleep(0.3) | |
status() | |
self.out() | |
self.out("Finished deleting items from queue.") | |
def hash(self, string): | |
""" Returns a md5 hash string -- encode utf-8 for py3 """ | |
return md5(string.encode('utf-8')).hexdigest() | |
def input(self, string): | |
""" Get raw user input -- py2 uses raw_input() -- py3 uses input() """ | |
try: | |
ipt = raw_input(string) | |
except NameError: | |
ipt = input(string) | |
return ipt | |
def question_prompt(self, title, options): | |
""" | |
Initiate question prompt based on a list of lists | |
The child list must consist of a len(2) list, with each arg matching: | |
0: key to be returned from self.question_prompt() | |
1: text to display in question | |
""" | |
self.update_dimensions() | |
self.out(title, tag=False) | |
count = 0 | |
for lst in options: | |
count += 1 | |
self.out(" [<cyan>{n}<cr>] {text}".format(n=str(count), text=lst[1]), tag=False, trim=True) | |
self.out(" [X] Cancel", tag=False) | |
results = self.input("Select option [X]: ") | |
if results.lower() == 'x': | |
return None | |
if not results.isdigit() or int(results) < 1 or int(results) > count: | |
# re-invoke | |
return None | |
return options[int(results) - 1][0] | |
def update_db(self): | |
""" Update self.db with all emails in mail queue """ | |
self.db = {} | |
self.spooldir = self.spooldir.rstrip('/') | |
if not os.path.isdir(self.spooldir): | |
self.exit("Spool directory does not exist or unable to access: %s" % self.spooldir) | |
count = 0 | |
for f in glob.glob(self.spooldir + '/*/*-H'): | |
count += 1 | |
self.readq.put(f) | |
self.out("Updating local queue db. Files scanned: <cyan>%d<cr>" % count, ret=False, info=True) | |
if self.queuemax and count >= self.queuemax: | |
break | |
self.out() | |
self.out("Reading items in queue. This may take a moment... (%d/%d)" % (self.read_count, count)) | |
def status(): | |
self.out("Reading items in queue. This may take a moment... (<cyan>%d<cr>/<cyan>%d<cr>)" % (self.read_count, count), ret=False, info=True) | |
while self.readq.unfinished_tasks: | |
status() | |
time.sleep(0.3) | |
status() | |
self.out() | |
self.out("Finished processing items from queue.") | |
def most_common(self): | |
""" | |
Return a list of the top 20 most common self.organizable headers in | |
self.db, though top results may contain duplicates | |
""" | |
self.out("Gathering information from queue db...", info=True) | |
self.tmp = {} | |
for email_id in self.db: | |
email = self.db[email_id] | |
for key in self.organizable: | |
if not email[key]: break | |
kv = self.hash(key + email[key]) | |
if kv not in self.tmp: | |
self.tmp[kv] = {'hash': kv, 'count': 1, 'ids': [email['id']], 'type': key, 'value': email[key]} | |
else: | |
self.tmp[kv]['count'] += 1 | |
self.tmp[kv]['ids'].append(email['id']) | |
return sorted([self.tmp[x] for x in self.tmp], key=lambda k: k['count'], reverse=True)[:20] | |
def main(self): | |
""" Main Exiclean() initialization function """ | |
if self.help: | |
self.show_help() | |
sys.exit(0) | |
self.out("Running version: <cyan>%s<cr>" % str(VERSION), info=True) | |
if self.version: | |
sys.exit(0) | |
if self.queuemax: | |
if self.sort_user: | |
self.exit("--queue-max cannot be used with any sort filters") | |
if self.queuemax < 100: | |
self.queuemax = 100 | |
self.out("<cyan>Limiting queue scan to %d emails<cr>" % self.queuemax, info=True) | |
self.init_read() | |
self.init_delete() | |
self.update_db() | |
if self.top: | |
scan = self.most_common() | |
self.out() | |
out = [["COUNT", "TYPE", "ITEM"], ["-----", "----", "----"]] | |
for item in scan[:10]: | |
out.append([str(item['count']), str(item['type']), str(item['value'])]) | |
self.out(self.table_fmt(out), tag=False) | |
sys.exit(0) | |
if self.sort_user: | |
self.organizable.remove('user') | |
while True: | |
scan = self.most_common() | |
if len(scan) == 0: | |
self.out("<cyan>No email in the queue to delete! AWESOME!<cr>") | |
sys.exit(0) | |
q = [[x['hash'], "%6d x [<green>%7s<cr>] %s" % (x['count'], str(x['type']), str(x['value']))] for x in scan[:8]] | |
id_to_delete = self.question_prompt("What would you like to delete?", q) | |
if not id_to_delete: | |
print("\nExiting...") | |
sys.exit(0) | |
item = [x for x in scan if x['hash'] == id_to_delete][0] | |
scan = None | |
self.delete_sync(item['ids']) | |
if __name__ == '__main__': | |
try: | |
ec = Exiclean() | |
except KeyboardInterrupt: | |
print("\nCancelling initialization...\nExiting..") | |
sys.exit(1) | |
try: | |
ec.main() | |
except KeyboardInterrupt: | |
print("\nExiting...") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment