Skip to content

Instantly share code, notes, and snippets.

@sscdotopen
Created June 25, 2019 16:03
Show Gist options
  • Save sscdotopen/564799f70cca650eb87c683469b92c6b to your computer and use it in GitHub Desktop.
Save sscdotopen/564799f70cca650eb87c683469b92c6b to your computer and use it in GitHub Desktop.
Serializing nested data in Spark
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.types._
val attributesType = new MapType(StringType, StringType, valueContainsNull = false)
val historyEntryType = new StructType()
.add("intervalStart", LongType)
.add("intervalEnd", LongType)
.add("type", StringType)
.add("attributes", attributesType)
val schema = new StructType()
.add("id", LongType)
.add("history", ArrayType(historyEntryType))
val data = Array(
Row(1L, Array(
Row(1L, 5L, "person", Map("name" -> "John")),
Row(5L, 7L, "person", Map("name" -> "Jim"))
)),
Row(2L, Array(
Row(1L, 5L, "person", Map("type" -> "person", "name" -> "Smith")),
Row(5L, 7L, "person", Map("name" -> "Miller"))
))
)
val rdd = session.sparkContext.parallelize(data, numSlices = 2)
val df = session.sqlContext.createDataFrame(rdd, schema)
println(s"Dataframe has ${df.count()} rows.")
val collected = df.collect()
collected.foreach { println }
df.write.mode(SaveMode.Overwrite).parquet("/tmp/test.pqt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment