Last active
May 3, 2018 15:49
-
-
Save sambos/803cfb7137e880eccccefc87bc3b4e5d to your computer and use it in GitHub Desktop.
Spark Hadoop utils
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val rdd = loadAvroData(sc,logPath,suffix).map(x => parseKV(x._2, kvPattern_quote).toMap) | |
convertToDF(sc,rdd) | |
def convertToDF(sc: SparkContext, rdd:RDD[Map[String,String]]): DataFrame = { | |
val sqlContext = new SQLContext(sc) | |
import sqlContext.implicits._ | |
val fields = Array("name","emp","dept","nick") | |
val schema = StructType(fields.map { x => StructField(x, StringType) }) | |
val result = rdd.map(x => (x.getOrElse(fields(0), ""),x.getOrElse(fields(1), ""),x.getOrElse(fields(2), ""),x.getOrElse(fields(3), ""))) | |
val df = result.toDF(fields:_*) | |
df | |
} | |
def loadAvroData(sc:SparkContext, logPath:String, suffix:String): RDD[(String,String)] = { | |
import com.databricks.spark.avro._ | |
val sqlContext = new SQLContext(sc) | |
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") | |
sqlContext.setConf("spark.sql.avro.deflate.level", "9") | |
val rdd = sqlContext.read.format("com.databricks.spark.avro").load(logPath).select("body") | |
.map{ x => new String(x.getAs[Array[Byte]]("body"))} | |
.map(x => parseAsKV(x)).filter(x => x._1 != null).filter(x => !x._1.contains(" ")) | |
rdd | |
} | |
def saveToAvro(sc: SparkContext, jsonRdd:RDD[String], avroPath:String) = { | |
import com.databricks.spark.avro._ | |
val sqlContext = new SQLContext(sc) | |
//sqlContext.setConf("spark.sql.avro.compression.codec", "snappy") | |
//sqlContext.setConf("spark.sql.avro.snappy.level", "5") | |
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") | |
sqlContext.setConf("spark.sql.avro.deflate.level", "9") | |
val schema = StructType(Array(StructField("body", BinaryType, false))) | |
import sqlContext.implicits._ | |
import org.apache.spark.sql.functions._ | |
val df = sqlContext.createDataFrame(jsonRdd.map{x => Row.fromSeq(Seq(x.getBytes))},schema) | |
df.printSchema() | |
df.write.avro(avroPath) | |
} | |
def readAvro = { | |
var path = "C://dev//sam//avro1//*.avro" | |
def b2s(a: Array[Byte]): String = new String(a) | |
val sc: SparkContext = new SparkContext( | |
new SparkConf().setMaster("local[1]").setAppName("app")) | |
import com.databricks.spark.avro._ | |
val sqlContext = new SQLContext(sc) | |
//sqlContext.setConf("spark.sql.avro.compression.codec", "snappy") | |
//sqlContext.setConf("spark.sql.avro.snappy.level", "5") | |
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") | |
sqlContext.setConf("spark.sql.avro.deflate.level", "9") | |
val df = sqlContext.read.avro(path) | |
df.printSchema() | |
val lines = df.select("body").map { x => b2s(x.getAs[Array[Byte]]("body"))} | |
lines.take(2).foreach(x => println(x)) | |
sc.stop() | |
} | |
def clean(path:String):Unit = { | |
val hadoopConf = new Configuration() | |
val uri = FileSystem.getDefaultUri(hadoopConf) | |
log.info("Default URI:" + uri) | |
val hdfs = FileSystem.get(uri, hadoopConf) | |
try { | |
log.info(s"${path} Exists: ${hdfs.exists(new Path(path))}") | |
log.info("deleted : " + hdfs.delete(new Path(path), true)) | |
log.info(s"${path} Exists: ${hdfs.exists(new Path(path))}") | |
} catch { case e: Throwable => log.error("failed to delete " + path, e) } | |
finally{ hdfs.close} | |
} | |
def getPartitions(filepath:String):Long = { | |
val hadoopConf = new Configuration() | |
val uri = FileSystem.getDefaultUri(hadoopConf) | |
val hdfs = FileSystem.get(uri, hadoopConf) | |
val path = new Path(filepath.replaceAll("\\*", "")) | |
if(!hdfs.isDirectory(path) || !hdfs.exists(path)) | |
return -1; | |
val blockSize = hdfs.getDefaultBlockSize(path) | |
val fileSize = hdfs.getContentSummary(path).getLength | |
log.info("DefaultBlockSize=" + blockSize ) | |
log.info("File Size=" + fileSize ) | |
hdfs.close | |
val count = fileSize/blockSize | |
if(count < 1) 1 else count * 3 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment