Created
October 11, 2017 19:27
-
-
Save randerzander/b7ef272123cbfcb4b5422abc620bbb92 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from org.apache.commons.io import IOUtils | |
from java.nio.charset import StandardCharsets | |
from org.apache.nifi.processor.io import StreamCallback | |
import os, sys, imp, traceback, time | |
from urlparse import urlparse | |
parser_path = '/home/dev/projects/scripts/parsers/' | |
class PyStreamCallback(StreamCallback): | |
def __init__(self, result): | |
self.result = result | |
def process(self, instream, outstream): | |
outstream.write(self.result) | |
def fail(flowfile, err): | |
flowfile = session.putAttribute(flowfile, 'parse.error', err) | |
session.transfer(flowfile, REL_FAILURE) | |
def process(flowfile): | |
url = urlparse(flowfile.getAttribute('source.url')) | |
tld = url.hostname.split('.')[-2] | |
parser = tld | |
path = parser_path + parser + '.py' | |
# load the parser if it has been updated | |
if parser not in sys.modules or os.path.getmtime(path) > sys.modules[parser].loaded_at: | |
try: | |
module = imp.load_source(parser, path) | |
module.loaded_at = int(time.time()) | |
except: | |
fail(flowfile, 'Loading Module: ' + traceback.format_exc()) | |
return | |
parse_module = sys.modules[parser] | |
# Read flowfile content | |
data = {} | |
instream = session.read(flowfile) | |
if hasattr(parse_module, 'format') and parse_module.format.lower() == 'binary': | |
data['content'] = IOUtils.toByteArray(instream) | |
else: | |
data['content'] = IOUtils.toString(instream, StandardCharsets.UTF_8) | |
instream.close() | |
# Attempt to parse | |
try: | |
if hasattr(parse_module, 'attributes'): | |
for attribute in parse_module.attributes: | |
data[attribute] = flowfile.getAttribute(attribute) | |
result = parse_module.parse(data) | |
# Copy returned fields into flowfile attributes | |
for attr in result.keys(): | |
if attr != 'content': | |
flowfile = session.putAttribute(flowfile, attr, result[attr]) | |
# If content is a list, emit one new flowfile per entry | |
if type(result['content']) is list: | |
for content in result['content']: | |
child_flowfile = session.create(flowfile) | |
child_flowfile = session.write(child_flowfile, PyStreamCallback(content)) | |
session.transfer(child_flowfile, REL_SUCCESS) | |
session.remove(flowfile) | |
else: | |
flowfile = session.write(flowfile, PyStreamCallback(result['content'])) | |
session.transfer(flowfile, REL_SUCCESS) | |
except: | |
fail(flowfile, 'Parsing: ' + traceback.format_exc()) | |
# Execution starts here | |
flowfile = session.get() | |
if (flowfile != None): process(flowfile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment