Skip to content

Instantly share code, notes, and snippets.

@dieterplex
Created October 31, 2018 16:17
Show Gist options
  • Save dieterplex/07718d3c78f119b2d2382dc8c0dcff3e to your computer and use it in GitHub Desktop.
Save dieterplex/07718d3c78f119b2d2382dc8c0dcff3e to your computer and use it in GitHub Desktop.
# coding: utf-8
import json
import os
import sys
import re
import fnmatch
import concurrent.futures
# https://stackoverflow.com/a/5141829/554150
def find_logs(target_path):
logs = []
includes = ['*.log.*'] # for files only
excludes = ['*.stderr', '*.stdout'] # for dirs and files
# transform glob patterns to regular expressions
includes = r'|'.join([fnmatch.translate(x) for x in includes])
excludes = r'|'.join([fnmatch.translate(x) for x in excludes]) or r'$.'
for root, dirs, files in os.walk(target_path):
# print(">> ", root, dirs, files)
# exclude/include files
files = [os.path.join(root, f) for f in files]
files = [f for f in files if not re.match(excludes, f)]
files = [f for f in files if re.match(includes, f)]
for fname in files:
logs.append(fname)
return logs
def extract_log(log_file):
log_file_out = log_file + ".stdout"
log_file_err = log_file + ".stderr"
# print("log_file: ", log_file)
with open(log_file) as f, open(log_file_out, "w+") as out, open(log_file_err, "w+") as err:
for line in f:
try:
x = json.loads(line)
except:
print(line)
log = x["log"]
if x["stream"]=="stdout":
out.write(log)
else:
err.write(log)
def main(argv):
files = []
for topdir in argv:
files.extend(find_logs(topdir))
#for log_file in files:
# extract_log(log_file)
# https://docs.python.org/3/library/concurrent.futures.html
# 1. thread
# with concurrent.futures.ThreadPoolExecutor(max_workers=7) as executor:
# # Start the load operations and mark each future with its URL
# future_to_log = {executor.submit(extract_log, log): log for log in files}
# for future in concurrent.futures.as_completed(future_to_log):
# log = future_to_log[future]
# try:
# data = future.result()
# except Exception as exc:
# print('%r generated an exception: %s' % (log, exc))
# else:
# print('%r page is %d bytes' % (log, len(data)))
# 2. process
with concurrent.futures.ProcessPoolExecutor() as executor:
for f, _ in zip(files, executor.map(extract_log, files)):
print('process log file %s is done.' % (f, ))
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment