Skip to content

Instantly share code, notes, and snippets.

@ASvyatkovskiy
Last active November 19, 2016 01:14
Show Gist options
  • Save ASvyatkovskiy/1a091800c242b52b69167e667567de68 to your computer and use it in GitHub Desktop.
Save ASvyatkovskiy/1a091800c242b52b69167e667567de68 to your computer and use it in GitHub Desktop.

Example of stat analysis with Spark and Histogrammar

Join the cosine and jaccard output files on the key-key pair, and convert it to dataframe:

val data = cosineRDD.join(jaccardRDD).toDF("cosine","jaccard")
data.write.parquet("/user/alexeys/correlations_3state")

Launch spark-shell session with histogrammar pre-loaded:

spark-shell --master yarn --queue production --num-executors 20 --executor-cores 3 --executor-memory 10g --packages "org.diana-hep:histogrammar-bokeh_2.10:1.0.3" --jars target/scala-2.11/BillAnalysis-assembly-2.0.jar 

Get basic descriptive statistics:

scala> data.describe().show()
+-------+--------------------+------------------+
|summary|              cosine|           jaccard|
+-------+--------------------+------------------+
|  count|          2632811191|        2632811191|
|   mean|   2.648025009784054|11.899197957421478|
| stddev|  3.2252594746900303|3.5343401388251032|
|    min|2.389704494502045...|1.4545454545454546|
|    max|   98.63368807585896| 81.14406779661016|
+-------+--------------------+------------------+

Get correlation coefficients and distributions in 10 bins:

scala> val cosine_rdd = data.select("cosine").rdd.map(x=>x.getDouble(0))
cosine_rdd: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[4] at map at <console>:27

scala> val jaccard_rdd = data.select("jaccard").rdd.map(x=>x.getDouble(0))
jaccard_rdd: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[7] at map at <console>:27

scala> import org.dianahep.histogrammar._
import org.dianahep.histogrammar._

scala> import org.dianahep.histogrammar.ascii._
import org.dianahep.histogrammar.ascii._

scala> val histo = Histogram(10,0,100,{x: Double => x})
histo: org.dianahep.histogrammar.Selecting[Double,org.dianahep.histogrammar.Binning[Double,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting]] = <Selecting cut=Bin>

scala> val jaccard_histo = jaccard_rdd.aggregate(histo)(new Increment, new Combine)
jaccard_histo: org.dianahep.histogrammar.Selecting[Double,org.dianahep.histogrammar.Binning[Double,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting]] = <Selecting cut=Bin>

scala> jaccard_histo.println
                       +----------------------------------------------------------+
underflow     0        |                                                          |
[  0 ,  10 )  7.943E+8 |***********************                                   |
[  10,  20 )  1.792E+9 |*****************************************************     |
[  20,  30 )  4.668E+7 |*                                                         |
[  30,  40 )  2.269E+5 |                                                          |
[  40,  50 )  5661     |                                                          |
[  50,  60 )  572      |                                                          |
[  60,  70 )  125      |                                                          |
[  70,  80 )  57       |                                                          |
[  80,  90 )  4        |                                                          |
[  90,  100)  0        |                                                          |
overflow      0        |                                                          |
nanflow       0        |                                                          |
                       +----------------------------------------------------------+

scala> val cosine_histo = cosine_rdd.aggregate(histo)(new Increment, new Combine)
cosine_histo: org.dianahep.histogrammar.Selecting[Double,org.dianahep.histogrammar.Binning[Double,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting,org.dianahep.histogrammar.Counting]] = <Selecting cut=Bin>

scala> cosine_histo.println
                       +----------------------------------------------------------+
underflow     0        |                                                          |
[  0 ,  10 )  2.552E+9 |*****************************************************     |
[  10,  20 )  6.682E+7 |*                                                         |
[  20,  30 )  1.045E+7 |                                                          |
[  30,  40 )  2.654E+6 |                                                          |
[  40,  50 )  7.953E+5 |                                                          |
[  50,  60 )  2.213E+5 |                                                          |
[  60,  70 )  6.631E+4 |                                                          |
[  70,  80 )  1.759E+4 |                                                          |
[  80,  90 )  6021     |                                                          |
[  90,  100)  444      |                                                          |
overflow      0        |                                                          |
nanflow       0        |                                                          |
                       +----------------------------------------------------------+

Finally, I calculate the Pearson correlation coefficient:

scala> import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.Statistics

scala> val correlation: Double = Statistics.corr(jaccard_rdd, cosine_rdd, "pearson") 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment