Created
June 13, 2016 17:44
-
-
Save larkz/f11ea00f19ff5f66c08b6b9287e1b051 to your computer and use it in GitHub Desktop.
User wallet and customer age segmentation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val num = "27" | |
val measurementData = sqlContext.read.parquet("/workspace/midgar/prod/ws/measurements/base") | |
val mData = measurementData.filter($"dateday" > "2016-05-31" && $"dateday" < "2016-06-10") | |
val mDataSubset = mData.select("customer_id", "channel", "group", "impression", "view", "click", "path_event_type", "dateday") | |
val salesOrderData = sqlContext.table("marketplace.sales_order_snapshot")//.filter($"created_at" < "2016-04-01" || $"created_at" > "2016-03-01") | |
val salesOrderData2 = salesOrderData.filter(!$"title".contains("Recharge")).filter(!$"title".contains("Bill Payment")) | |
val salesOrderGMVAgg = salesOrderData2.groupBy($"customer_id").sum("grandtotal").toDF("customer_id", "sum_gmv") | |
val activeSalesOrderGMVAggSample = salesOrderGMVAgg.filter($"sum_gmv" > 0).sample(false, 0.07) | |
// Join these ACTIVE customers with number of transactions | |
val salesOrderNumTxnAgg = salesOrderData.groupBy($"customer_id").count().toDF("customer_id", "txn_count") | |
val featureSet = activeSalesOrderGMVAggSample.join(salesOrderNumTxnAgg, "customer_id") | |
//Spark Aggregation Step | |
val mDataAggregate = mDataSubset.groupBy("customer_id", "group", "channel", "path_event_type").agg( | |
"impression" -> "sum", | |
"view" -> "sum", | |
"click" -> "sum" | |
) | |
val mFeatureAttSet = featureSet.join(mDataAggregate, "customer_id") | |
mFeatureAttSet.write.format("com.databricks.spark.csv").option("header", "false").save("mFeatureAttSet" + num + ".csv") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment