Created
July 28, 2017 22:24
-
-
Save eridal/3dccdda92da9b0e6e4adddae256c85b9 to your computer and use it in GitHub Desktop.
Build a dot model from a aws data pipeline json definition
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function merge (into, object) { | |
Object | |
.keys(object) | |
.forEach(k => { | |
let val = object[k] | |
if (val){ | |
if (Array.isArray(val)) { | |
into[k] = [].concat(into[k] || [], val) | |
} | |
else if (typeof val === 'object') { | |
into[k] = into[k] ? merge(into[k], val) : val | |
} else { | |
into[k] = val | |
} | |
} | |
}) | |
return into | |
} | |
function wrap (...fns) { | |
return (obj) => fns.reduce((node, fn) => { | |
return merge(node, fn(obj, node)) | |
}, {}) | |
} | |
function get (expr, object) { | |
return expr | |
.split('.') | |
.reduce((ctx, key) => ctx && ctx[key], object) | |
} | |
let colors = { | |
data: '#DBDB8D', | |
config: '#C5B0D5', | |
activity: '#9EDAE5', | |
database: '#C5B0D5', | |
schedule: '#F7B6D2', | |
resource: '#C7C7C7', | |
format: '#C5B0D5', | |
lines: '#656565', // #C7C7C7', #8F8F8F | |
} | |
let arrow = (obj, style) => ({ | |
node: { | |
id: obj.id, | |
name: obj.name, | |
}, | |
style: merge({ color: colors.lines }, style || {}) | |
}) | |
let mixins = { | |
color: (color) => () => ({ | |
styles: { | |
style: 'filled', | |
fillcolor: `${color || 'red'};0.1:white`, | |
}, | |
}), | |
depends: (...fields) => mixins._into('depends', fields), | |
requires: (...fields) => mixins._into('requires', fields), | |
_into: (property, fields) => (obj) => { | |
let node = {} | |
node[property] = fields | |
.map(k => get(k, obj)) | |
.filter(Boolean) | |
.map(val => arrow(val)) | |
return node | |
}, | |
} | |
function Node (obj) { | |
return { | |
id: obj.id, | |
name: obj.name, | |
type: obj.type, | |
styles: { | |
shape: 'box', | |
}, | |
depends: [].concat( | |
obj.dependsOn ? arrow(obj.dependsOn) : [], | |
obj.runsOn ? arrow(obj.runsOn, { style: 'dashed' }) : [], | |
obj.input ? arrow(obj.input) : [] | |
), | |
requires: [].concat( | |
obj.output ? arrow(obj.output) : [] | |
), | |
} | |
} | |
let Config = wrap(Node, mixins.color(colors.config ), mixins.requires('schedule')) | |
let Schedule = wrap(Node, mixins.color(colors.schedule)) | |
let Activity = wrap(Node, mixins.color(colors.activity), mixins.depends('schedule')) | |
let Data = wrap(Node, mixins.color(colors.data ), mixins.depends('dataFormat')) | |
let Resource = wrap(Node, mixins.color(colors.resource), mixins.depends('schedule')) | |
let Format = wrap(Node, mixins.color(colors.format )) | |
let Database = wrap(Node, mixins.color(colors.database)) | |
// https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-pipeline-objects.html | |
let types = { | |
Config, | |
Schedule, | |
Data, | |
DynamoDBDataNode: Data, | |
MySqlDataNode: Data, | |
RedshiftDataNode: wrap(Data, mixins.requires('database')), | |
S3DataNode: Data, | |
Format, | |
CSV: Format, | |
Custom: Format, | |
DynamoDBDataFormat: Format, | |
DynamoDBExportDataFormat: Format, | |
RegEx: Format, | |
TSV: Format, | |
Database, | |
JdbcDatabase: Database, | |
RdsDatabase: Database, | |
RedshiftDatabase: Database, | |
Resource, | |
WorkerGroup: Resource, | |
EmrCluster: Resource, | |
Ec2Resource: Resource, | |
Activity, | |
SqlActivity: wrap(Activity, mixins.requires('database')), | |
RedshiftCopyActivity: Activity, | |
ShellCommandActivity: Activity, | |
HiveActivity: Activity, | |
} | |
let error = (fn) => (obj) => { throw fn(obj) } | |
let missingType = (obj) => new Error(`missing type for node ${obj.id}`) | |
let read = (file) => resolve(require(file).objects) | |
.map((obj, ix) => (obj.ix = ix, obj)) | |
.map(obj => (types[obj.type] || error(missingType))(obj)) | |
let propsOf = (obj) => Object | |
.keys(obj) | |
.map(key => ({ key, value: obj[key] })) | |
let resolve = (objects) => { | |
// create worker groups | |
let workerGroups = []; | |
objects.forEach(obj => { | |
if (obj.workerGroup) { | |
let workGroup = workerGroups.find(wg => wg.name == obj.workerGroup) | |
if (!workGroup) { | |
workerGroups.push(workGroup = { | |
id: `WorkGroup_${workerGroups.length}`, | |
name: obj.workerGroup, | |
type: 'WorkerGroup', | |
}); | |
} | |
obj.runsOn = { ref: workGroup.id } | |
} | |
}) | |
objects = objects.concat(workerGroups) | |
// replace references with objects | |
objects.forEach(object => propsOf(object) | |
.forEach(p => { | |
if (p.value.ref) { | |
object[p.key] = objects.find(obj => obj.id === p.value.ref) | |
} | |
}) | |
) | |
// connect config | |
objects.find(obj => obj.id === 'Default').type = 'Config' | |
// schedules | |
let schedule = objects.find(obj => obj.id === 'DefaultSchedule') | |
objects | |
.filter(obj => /(Resource|Cluster)/.test(obj.type)) | |
.forEach(obj => obj.schedule = obj.schedule || schedule) | |
objects | |
.filter(obj => /(Activity)/.test(obj.type)) | |
.forEach(obj => { | |
if (obj.runsOn && obj.runsOn.schedule === schedule) { | |
delete obj.schedule | |
} | |
}) | |
return objects | |
} | |
let render = (nodes) => { | |
let renders = { | |
style: (styles) => `${Object.keys(styles).map(k => `${k}=${renders.labelOrTable(styles[k])}`).join(', ')}`, | |
labelOrTable: (text) => text.substr(0, 2) === '<<' ? text : `"${text}"`, | |
name: (name) => name.replace(/#\{([^}]+)\}/g, '$1'), | |
table: (lines) => `<<TABLE border="0">${ | |
lines.map(line => `<TR><TD>${line}</TD></TR>`).join('') | |
}</TABLE>>`, | |
label: (node) => renders.table([ | |
`<B>${node.type}</B>`, | |
renders.name(node.name), | |
]), | |
props: (props) => Object | |
.keys(props) | |
.reduce((list, key) => list.concat({name: key, styles: props[key]}), []) | |
.map(renders.prop), | |
prop: (prop) => `${prop.name} [${renders.style(prop.styles)}];`, | |
node: (node) => { | |
let styles = Object.assign(node.styles, { | |
label: renders.label(node) | |
}) | |
return `${node.id} [${renders.style(styles)}];` | |
}, | |
arrow: (from, till, styles) => { | |
let style = styles ? ` [${renders.style(styles)}]` : '' | |
return `${from.id} -> ${till.id} ${style};` | |
}, | |
each: (xs, render) => xs.reduce((lines, node, i) => lines.concat(render(node, i)), []), | |
} | |
let props = { | |
node: { | |
shape: 'record', | |
}, | |
} | |
let lines = [].concat( | |
renders.props(props), | |
'// nodes', | |
renders.each(nodes, renders.node), | |
'// requires', | |
renders.each(nodes, node => node.requires.map(rel => renders.arrow(node, rel.node, rel.style))), | |
'// depends', | |
renders.each(nodes, node => node.depends.map(rel => renders.arrow(rel.node, node, rel.style))), | |
'// end' | |
); | |
return `digraph {\n ${lines.join('\n ')}\n}` | |
} | |
let execute = (file) => { | |
console.log(`// ${file}`) | |
console.log(render(read(file))) | |
} | |
let path = require('path') | |
execute( | |
path.resolve(process.argv[2]) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
js dp2dot.js "$1" | dot -Tsvg > "$1.svg" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment