Last active
March 12, 2017 20:17
-
-
Save randerzander/bff28a1383df4d5e75cdf42ebb8be17d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from org.apache.commons.io import IOUtils | |
from java.nio.charset import StandardCharsets | |
from org.apache.nifi.processor.io import StreamCallback | |
import os, sys, imp, traceback, time | |
parser_path = '/scripts/parsers/' | |
class PyStreamCallback(StreamCallback): | |
def __init__(self, result): | |
self.result = result | |
def process(self, instream, outstream): | |
outstream.write(self.result) | |
def fail(flowfile, err): | |
flowfile = session.putAttribute(flowfile, 'parse.error', err) | |
session.transfer(flowfile, REL_FAILURE) | |
def process(flowfile): | |
parser = flowfile.getAttribute('parser') | |
path = parser_path + parser + '.py' | |
# load the parser if it has been updated | |
if parser not in sys.modules or os.path.getmtime(path) > sys.modules[parser].loaded_at: | |
try: | |
module = imp.load_source(parser, path) | |
module.loaded_at = int(time.time()) | |
except: | |
fail(flowfile, 'Loading Module: ' + traceback.format_exc()) | |
return | |
parse_module = sys.modules[parser] | |
# Read flowfile content | |
data = {} | |
instream = session.read(flowfile) | |
if hasattr(parse_module, 'format') and parse_module.format.lower() == 'binary': | |
data['content'] = IOUtils.toByteArray(instream) | |
else: | |
data['content'] = IOUtils.toString(instream, StandardCharsets.UTF_8) | |
instream.close() | |
# Attempt to parse | |
try: | |
if hasattr(parse_module, 'attributes'): | |
for attribute in parse_module.attributes: | |
data[attribute] = flowfile.getAttribute(attribute) | |
result = parse_module.parse(data) | |
flowfile = session.write(flowfile, PyStreamCallback(result)) | |
session.transfer(flowfile, REL_SUCCESS) | |
except: | |
fail(flowfile, 'Parsing: ' + traceback.format_exc()) | |
# Execution starts here | |
flowfile = session.get() | |
if (flowfile != None): process(flowfile) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json, datetime | |
attributes = ['filename', 's2s.host'] | |
def parse(data): | |
unix_timestamp = str(datetime.datetime.fromtimestamp(int(data['filename'][0:13])/1000)) | |
lines = data['content'].split('\n') | |
#Extract list of field names | |
fields = lines[0].replace('%', '').split() | |
fields = ['user', 'pid', 'cpu', 'mem', 'vsz', 'rss', 'tty', 'stat', 'startup', 'time', 'command'] | |
processes = [] | |
for line in lines[1:]: | |
tokens = line.split() | |
if len(tokens) == 0: continue | |
tokens = tokens[0:10] + [' '.join(tokens[10:])] | |
process = {'ts': unix_timestamp, 'host': data['s2s.host']} | |
for idx, field in enumerate(fields): | |
if field == 'cpu' or field == 'mem': process[field] = float(tokens[idx]) | |
elif field == 'vsz' or field == 'rss': process[field] = int(tokens[idx]) | |
else: process[field] = tokens[idx] | |
processes.append(json.dumps(process)) | |
return '\n'.join(processes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment