import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
sc.getConf.set("spark.sql.parquet.mergeSchema", "false")
val file = "/user/name/event.parquet"
val parquetFile = sqlContext.read.option("mergeSchema", "true").parquet("event.parquet")
parquetFile.registerTempTable("parquetFile")
//val ids = sqlContext.sql("SELECT * FROM parquetFile where ID='9ff10d20fdff' AND SERVER_IP='0.0.0.0'" )
val ids = sqlContext.sql("SELECT * FROM parquetFile")
ids.foreach { x => println(x.getAs[String]("ID") + " " + x.getAs[String]("SERVER_IP") )}
println(ids.count)
ids.printSchema()
sc.stop
spark-shell -i avroToParquet.scala --conf spark.driver.extraJavaOptions="-Ddataset=$1"
avroToParquet.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import scala.collection.mutable.ArrayBuffer
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import org.apache.spark.sql.SaveMode
import com.fasterxml.jackson._
import scala.util.matching.Regex
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
def matchAndExtractHost(line: String): String = {
val pattern = """^\S+ \S+ \w+ \d+ \d+:\d+:\d+ (\S+) """
if(line == null)
return null
val regex = pattern.r
val matched = (regex findFirstMatchIn line)
matched match {
case Some(regex(field)) => field
case None => null
}
}
def parseKV(line:String, pattern:String): List[(String,String)] = {
val regex = pattern.r.findAllMatchIn(line)
var list = List[(String,String)]()
regex.map { x => (x.group(1), x.group(2)) }
.foreach { case (k,v) => list = (k, v) :: list }
//add host
list = ("host",matchAndExtractHost(line)) :: list
list
}
var dataset = System.getProperty("dataset")
println("================> dataset requested : " + dataset)
if(dataset == null) dataset = "default"
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
import com.databricks.spark.avro._
sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
sqlContext.setConf("spark.sql.avro.snappy.level", "5")
sqlContext.sql("set spark.sql.caseSensitive=true")
var avro_file = "/data/user/avro/compressed-avro.avro"
val df2 = sqlContext.read.avro(avro_file)
df2.printSchema()
//df2.select("body").toJSON.take(2).foreach { x => println(x) }
//df2.select("body").take(2).foreach { x => println(convertToString(x.getAs("body")).mkString) }
var lines = df2.select("body").map { x => new String(x.getAs[Array[Byte]]("body"))}
//lines.take(3).foreach { x => println(x)}
//lines = sc.parallelize(lines.take(10))
val kvPattern_quote = """(\w+)="([^"]*)"[,\s+]"""
val kv = lines.map(x => parseKV(x, kvPattern_quote).toMap )
val jkv = kv.map(x => compact(render(x)))
var eventlog = sqlContext.read.json(jkv)
//eventlog = eventlog.withColumnRenamed("start", "start2");
eventlog.printSchema()
eventlog.write.mode(SaveMode.Append).option("mergeSchema", "false").parquet("/data/user/parquet/" + dataset + ".parquet")
sc.stop
System.exit(1)