Skip to content

Instantly share code, notes, and snippets.

@mreid-moz
Created January 15, 2018 18:15
Show Gist options
  • Save mreid-moz/7374496e9d415ca85b42a95b011ac852 to your computer and use it in GitHub Desktop.
Save mreid-moz/7374496e9d415ca85b42a95b011ac852 to your computer and use it in GitHub Desktop.
Test repartitioning behaviour when writing parquet data.
import java.util.UUID.randomUUID
import scala.sys.process._
import com.mozilla.telemetry.utils.getOrCreateSparkSession
import java.util.zip.CRC32
val spark = getOrCreateSparkSession("test")
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
def getSampleId(clientId: String, modulus: Int) = {
// TODO: handle null
val crc = new CRC32
crc.update(clientId.getBytes)
crc.getValue % modulus
}
val sampleIdModulus = 100
val filesPerPartition = 4
val sampleId = (c: String) => getSampleId(c, sampleIdModulus)
spark.sqlContext.udf.register("sampleid", sampleId)
val partitionId = (c: String) => getSampleId(c, filesPerPartition * sampleIdModulus)
spark.sqlContext.udf.register("partid", partitionId)
val df = (1 to 200000).map { e => ((e%10000).toString, randomUUID().toString) }.toList.toDF("client_id", "document_id")
val df2 = df.selectExpr("*", "sampleid(client_id) as sample_id", "partid(client_id) as part_id")
df2.repartition(df2.col("part_id")).drop("part_id").write.partitionBy("sample_id").mode("overwrite").parquet("/tmp/test-output")
"find /tmp/test-output -type f -name *.parquet" #| "wc -l" !
// 396 files. Four of the sample_id prefixes have only three files. CRC collisions maybe?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment