Last active
August 29, 2015 13:56
-
-
Save macrotis/9023073 to your computer and use it in GitHub Desktop.
Reads one or more MySQL general_log files (either plaintext or gzip'd), outputs one CSV file with counts/second. Probably full of bugs, but it seems to work out so far.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import csv | |
import gzip | |
import itertools | |
import os | |
import re | |
import sys | |
import traceback | |
from datetime import datetime as dt, timedelta as td | |
def parse_line(line): | |
timestamp = None | |
typ = None | |
action = None | |
line = re.sub('[\t ]+', ' ', line) | |
if len(line) < 1: | |
return (None, None, None) | |
if line[0] == ' ': | |
rest = line.strip() | |
elif not re.match('^[0-9]{6}', line): | |
return (None, None, None) | |
else: | |
ymd = line[:line.index(' ')] | |
rest= line[line.index(' ') + 1:] | |
if rest[0] == ' ': | |
rest = rest[1:] | |
hms = rest[:rest.index(' ')] | |
try: | |
hours, minutes, seconds = map(int, hms.split(':')) | |
year = 2000 + int(ymd[:2]) | |
month = int(ymd[2:4]) | |
day = int(ymd[4:]) | |
except ValueError: | |
return (None, None, None) | |
timestamp = dt(year, month, day, hours, minutes, seconds) | |
rest = rest[rest.index(' ') + 1:].strip() | |
try: | |
pid = int(rest[:rest.index(' ')]) | |
except ValueError: | |
return (None, None, None) | |
rest = rest[rest.index(' ') + 1:] | |
if ' ' in rest: | |
typ = rest[:rest.index(' ')].strip() | |
rest = re.sub('^ +', '', rest[rest.index(' '):]) | |
if '/*' in rest: | |
action = rest[:rest.index('/*')].strip() | |
else: | |
action = rest.strip() | |
return (typ, action, timestamp) | |
def parse_file(fp): | |
last_timestamp = None | |
line_number = 0 | |
for i in range(0, 3): | |
fp.readline() | |
while True: | |
line = fp.readline() | |
if line == '': | |
raise StopIteration() | |
line_number += 1 | |
try: | |
typ, action, timestamp = parse_line(line) | |
except Exception as e: | |
raise Exception(("Failure on line %s" % line_number, line, e)) | |
if timestamp is not None: | |
last_timestamp = timestamp | |
if typ is None or action is None: | |
continue | |
yield (typ, action, last_timestamp) | |
def dml_stats_per_timestamp(qtups): | |
table_stats = {} | |
qtup_time_resolution_buffer = [] | |
def find_table_name(string, tn_start, tn_end_chars=[' ', '\t']): | |
rest_of_line = string[tn_start:].strip() | |
if not len(rest_of_line): return None | |
if rest_of_line[0] == '`': | |
if not (len(rest_of_line) - 1): return None | |
try: | |
table_name_end = rest_of_line.index('`', 1) | |
except ValueError: | |
return None | |
else: | |
table_name = rest_of_line[1:table_name_end] | |
else: | |
table_name_end = None | |
for i in xrange(len(rest_of_line)): | |
if rest_of_line[i] in tn_end_chars: | |
table_name_end = i | |
break | |
if table_name_end is not None: | |
table_name = rest_of_line[:table_name_end] | |
else: | |
table_name = re.sub('[\t ]+', ' ', rest_of_line).split(' ')[0] | |
return table_name | |
def update_the_table_stats(qtup, table_stats): | |
if qtup[0] == 'Query': | |
if re.match('^INSERT INTO ', qtup[1], re.IGNORECASE): | |
# len('INSERT INTO') == 12 | |
table_name = find_table_name(qtup[1], 12, ['(', ' ']) | |
if not table_name: return | |
elif not table_name in table_stats: | |
table_stats[table_name] = {'inserts': 1} | |
elif 'inserts' not in table_stats[table_name]: | |
table_stats[table_name]['inserts'] = 1 | |
else: | |
table_stats[table_name]['inserts'] += 1 | |
elif re.match('^UPDATE ', qtup[1], re.IGNORECASE): | |
# len('UPDATE ') == 7 | |
table_name = find_table_name(qtup[1], 7) | |
if not table_name in table_stats: | |
table_stats[table_name] = {'updates': 1} | |
elif 'updates' not in table_stats[table_name]: | |
table_stats[table_name]['updates'] = 1 | |
else: | |
table_stats[table_name]['updates'] += 1 | |
elif re.match('^DELETE FROM ', qtup[1], re.IGNORECASE): | |
# len('DELETE FROM ') == 12 | |
table_name = find_table_name(qtup[1], 12) | |
if not table_name in table_stats: | |
table_stats[table_name] = {'deletes': 1} | |
elif 'deletes' not in table_stats[table_name]: | |
table_stats[table_name]['deletes'] = 1 | |
else: | |
table_stats[table_name]['deletes'] += 1 | |
try: | |
last_qtup = qtups.next() | |
cur_timestamp = last_qtup[2] | |
if cur_timestamp is None: | |
qtup_time_resolution_buffer.append(list(last_qtup)) | |
while cur_timestamp is None: | |
try: | |
last_qtup = qtups.next() | |
cur_timestamp = last_qtup[2] | |
if cur_timestamp is None: | |
qtup_time_resolution_buffer.append(list(last_qtup)) | |
except StopIteration: | |
for qt in qtup_time_resolution_buffer: | |
yield qt | |
raise StopIteration() | |
cur_timestamp -= td(seconds=1) | |
except StopIteration: | |
yield (None, None) | |
raise StopIteration() | |
else: | |
for qtup in qtup_time_resolution_buffer: | |
update_the_table_stats(qtup, table_stats) | |
if len(qtup_time_resolution_buffer): | |
yield (cur_timestamp, table_stats) | |
table_stats = {} | |
cur_timestamp += td(seconds=1) | |
for qtup in qtups: | |
update_the_table_stats(last_qtup, table_stats) | |
if qtup[2] > cur_timestamp: | |
yield (cur_timestamp, table_stats) | |
cur_timestamp = qtup[2] | |
table_stats = {} | |
last_qtup = qtup | |
update_the_table_stats(last_qtup, table_stats) | |
yield (cur_timestamp, table_stats) | |
raise StopIteration() | |
if len(sys.argv) < 3: | |
print("Usage: %s $OUTPUT $INPUTS" % sys.argv[0], file=sys.stderr) | |
ofp = open(sys.argv[1], 'w') | |
cur_file = None | |
statements = ('inserts', 'deletes', 'updates') | |
try: | |
stats = {} | |
tables = set() | |
for filename in sys.argv[2:]: | |
cur_file = filename | |
try: | |
if re.match('.*\.gz$', filename): | |
ifp = gzip.open(filename, 'r') | |
else: | |
ifp = open(filename, 'r') | |
for timestamp, records in dml_stats_per_timestamp(parse_file(ifp)): | |
for table in records: | |
tables.add(table) | |
if timestamp in stats: | |
for table in records: | |
if table in stats[timestamp]: | |
for statement in statements: | |
if statement in records[table]: | |
if statement not in stats[timestamp][table]: | |
stats[timestamp][table][statement] = \ | |
records[table][statement] | |
else: | |
stats[timestamp][table][statement] += \ | |
records[table][statement] | |
else: | |
stats[timestamp][table] = records[table] | |
else: | |
stats[timestamp] = records | |
print("%s: %s" % (timestamp, records)) | |
finally: | |
ifp.close() | |
except Exception as e: | |
print("Error reading file %s" % cur_file, file=sys.stderr) | |
traceback.print_exc(None, sys.stderr) | |
writer = csv.writer(ofp) | |
table_list = sorted(list(tables)) | |
writer.writerow(["Timestamp"] + map(lambda p: ' '.join(p), (list(itertools.product( | |
table_list, statements | |
))))) | |
for timestamp in sorted(stats.keys()): | |
output_row = [timestamp] | |
records = stats[timestamp] | |
for table in table_list: | |
if table in records: | |
for statement in statements: | |
if statement in records[table]: | |
output_row.append(records[table][statement]) | |
else: | |
output_row.append(0) | |
else: | |
for statement in statements: | |
output_row.append(0) | |
writer.writerow(output_row) | |
ofp.flush() | |
ofp.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment