Last active
May 26, 2016 16:36
-
-
Save ryanpersaud/734b68e3624d06433deaa114acc33865 to your computer and use it in GitHub Desktop.
A NiFi template for the ExecuteScript processor that uses Jython to convert string timestamps to longs in JSON. Based on @mattyb149's template (https://gist.github.com/mattyb149/89205fcbc6d0e15ba024)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
<template> | |
<description>Converts string timestamps to long in JSON. | |
</description> | |
<name>ConvertStringTimestampsToLong</name> | |
<snippet> | |
<processors> | |
<id>c6cf8df9-bfcf-45e9-ab06-76de85e78f78</id> | |
<parentGroupId>a27e19b1-ef3e-41a2-b0ce-266370b14b47</parentGroupId> | |
<position> | |
<x>1233.620849609375</x> | |
<y>5.457275390625</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<defaultConcurrentTasks> | |
<entry> | |
<key>TIMER_DRIVEN</key> | |
<value>1</value> | |
</entry> | |
<entry> | |
<key>EVENT_DRIVEN</key> | |
<value>0</value> | |
</entry> | |
<entry> | |
<key>CRON_DRIVEN</key> | |
<value>1</value> | |
</entry> | |
</defaultConcurrentTasks> | |
<defaultSchedulingPeriod> | |
<entry> | |
<key>TIMER_DRIVEN</key> | |
<value>0 sec</value> | |
</entry> | |
<entry> | |
<key>CRON_DRIVEN</key> | |
<value>* * * * * ?</value> | |
</entry> | |
</defaultSchedulingPeriod> | |
<descriptors> | |
<entry> | |
<key>Script Engine</key> | |
<value> | |
<allowableValues> | |
<displayName>ECMAScript</displayName> | |
<value>ECMAScript</value> | |
</allowableValues> | |
<allowableValues> | |
<displayName>Groovy</displayName> | |
<value>Groovy</value> | |
</allowableValues> | |
<allowableValues> | |
<displayName>lua</displayName> | |
<value>lua</value> | |
</allowableValues> | |
<allowableValues> | |
<displayName>python</displayName> | |
<value>python</value> | |
</allowableValues> | |
<allowableValues> | |
<displayName>ruby</displayName> | |
<value>ruby</value> | |
</allowableValues> | |
<defaultValue>ECMAScript</defaultValue> | |
<description>The engine to execute scripts</description> | |
<displayName>Script Engine</displayName> | |
<dynamic>false</dynamic> | |
<name>Script Engine</name> | |
<required>true</required> | |
<sensitive>false</sensitive> | |
<supportsEl>false</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
<value> | |
<description>Path to script file to execute. Only one of Script File or Script Body may be used | |
</description> | |
<displayName>Script File</displayName> | |
<dynamic>false</dynamic> | |
<name>Script File</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>true</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value> | |
<description>Body of script to execute. Only one of Script File or Script Body may be used</description> | |
<displayName>Script Body</displayName> | |
<dynamic>false</dynamic> | |
<name>Script Body</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>false</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
<value> | |
<description>Comma-separated list of paths to files and/or directories which contain modules required by | |
the script. | |
</description> | |
<displayName>Module Directory</displayName> | |
<dynamic>false</dynamic> | |
<name>Module Directory</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>false</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>endTime</key> | |
<value> | |
<description></description> | |
<displayName>endTime</displayName> | |
<dynamic>true</dynamic> | |
<name>endTime</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>true</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>mrt</key> | |
<value> | |
<description></description> | |
<displayName>mrt</displayName> | |
<dynamic>true</dynamic> | |
<name>mrt</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>true</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>rt</key> | |
<value> | |
<description></description> | |
<displayName>rt</displayName> | |
<dynamic>true</dynamic> | |
<name>rt</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>true</supportsEl> | |
</value> | |
</entry> | |
<entry> | |
<key>startTime</key> | |
<value> | |
<description></description> | |
<displayName>startTime</displayName> | |
<dynamic>true</dynamic> | |
<name>startTime</name> | |
<required>false</required> | |
<sensitive>false</sensitive> | |
<supportsEl>true</supportsEl> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Script Engine</key> | |
<value>python</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value> | |
import json | |
import java.io | |
import time | |
import datetime | |
from org.apache.commons.io import IOUtils | |
from java.nio.charset import StandardCharsets | |
from org.apache.nifi.processor.io import StreamCallback | |
class PyStreamCallback(StreamCallback): | |
def __init__(self, properties): | |
self.properties = properties | |
def process(self, inputStream, outputStream): | |
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) | |
obj = json.loads(text) | |
for key in self.properties: | |
if key.isDynamic() and key.getName() in obj: | |
obj[key.getName()] = int(time.mktime(datetime.datetime.strptime(obj[key.getName()], self.properties[key]).timetuple()) * 1000) | |
outputStream.write(bytearray(json.dumps(obj, indent=4).encode('utf-8'))) | |
flowFile = session.get() | |
properties = context.getProperties() | |
if (flowFile != None): | |
flowFile = session.write(flowFile,PyStreamCallback(properties)) | |
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') | |
session.transfer(flowFile, REL_SUCCESS) | |
</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
</entry> | |
<entry> | |
<key>endTime</key> | |
<value>%Y-%m-%d %H:%M:%S.%f</value> | |
</entry> | |
<entry> | |
<key>mrt</key> | |
<value>%Y-%m-%d %H:%M:%S.%f</value> | |
</entry> | |
<entry> | |
<key>rt</key> | |
<value>%Y-%m-%d %H:%M:%S.%f</value> | |
</entry> | |
<entry> | |
<key>startTime</key> | |
<value>%Y-%m-%d %H:%M:%S.%f</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>ExecuteScriptConvertTimestampsToLongs</name> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<description>FlowFiles that failed to be processed</description> | |
<name>failure</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<description>FlowFiles that were successfully processed</description> | |
<name>success</name> | |
</relationships> | |
<state>RUNNING</state> | |
<style/> | |
<supportsEventDriven>false</supportsEventDriven> | |
<supportsParallelProcessing>false</supportsParallelProcessing> | |
<type>org.apache.nifi.processors.script.ExecuteScript</type> | |
</processors> | |
</snippet> | |
<timestamp>05/26/2016 12:18:00 EDT</timestamp> | |
</template> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example configuration. The property name is the name of the JSON field to convert and the value is the (Python) timestamp format.