Skip to content

Instantly share code, notes, and snippets.

@s8sg
Last active January 2, 2019 03:07
Show Gist options
  • Save s8sg/a16f35346216f794ab8f3c5720eb5d87 to your computer and use it in GitHub Desktop.
Save s8sg/a16f35346216f794ab8f3c5720eb5d87 to your computer and use it in GitHub Desktop.
Faasflow parralel
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := faasflow.CreateDag()
dag.AddModifier("ReadAllItems", ReadSoldItemFromKafkaStream)
itemDag := faasflow.CreateDag()
itemDag.AddModifier("FilterRequiredFields", FilterSellBuyPriceFields)
itemDag.AddModifier("SumSellPrice", ComputeTotalSellPrice)
itemDag.AddModifier("SumBuyPrice", ComputeTotalBuyPrice)
// To Serialize multiple input the vertex need an Aggregator
itemDag.AddVertex("ComputeProfit", faasflow.Aggregator(func(inputs map[string][]byte) ([]byte, error) {
Sell := inputs["SumSellPrice"]
Buy := inputs["SumBuyPrice"]
// Serialize input for callback
profit := atoi(Buy) - atoi(Sell)
// Map Profit my Items Id
jsonProfit, _ := json.Marshal(map[string]int{ context.GetChoice(): profit }
return jsonProfit, nil
}))
itemDag.AddEdge("FilterRequiredFields", "SumSellPrice")
itemDag.AddEdge("FilterRequiredFields", "SumBuyPrice")
itemDag.AddEdge("SumSellPrice", "ComputeProfit")
itemDag.AddEdge("SumSellPrice", "ComputeProfit")
dag.AddForEachDag("ComputeProfitForEachItem", itemDag, faasflow.ForEach(MapItemsById, MapProfitById))
dag.AddCallback("callback", "storage.io/bucket?id=3345612358265349126&file=" + context.Query.Get("duration"))
dag.AddEdge("ReadAllItems", "ComputeProfitForEachItem")
dag.AddEdge("ComputeProfitForEachItem", "callback")
flow.ExecuteDag(dag)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment