Skip to content

Instantly share code, notes, and snippets.

@lordlinus
Last active October 11, 2022 13:53
Show Gist options
  • Save lordlinus/a9942992db691818e14c478f8ef0f811 to your computer and use it in GitHub Desktop.
Save lordlinus/a9942992db691818e14c478f8ef0f811 to your computer and use it in GitHub Desktop.
// Databricks notebook source
import org.graphframes.GraphFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.struct
sc.setCheckpointDir("/dbfs/cp")
// COMMAND ----------
val data =
spark.read.option("header", "true").csv("dbfs:/FileStore/sampleHashData.csv")
val v1 = data
.withColumnRenamed("_beforeDataHash", "id")
.withColumn("changeSequence", col("changeSequence").cast("decimal(38,0)"))
.select("id", "changeSequence")
val v2 = data
.withColumnRenamed("_rowHash", "id")
.withColumn("changeSequence", col("changeSequence").cast("decimal(38,0)"))
.select("id", "changeSequence")
val v = v1.union(v2).repartition()
val e = data
.withColumnRenamed("_beforeDataHash", "src")
.withColumnRenamed("_rowHash", "dst")
.select("src", "dst", "operation")
val g = GraphFrame(v.dropDuplicates, e.dropDuplicates)
// COMMAND ----------
data.printSchema
// COMMAND ----------
// val v = sqlContext.createDataFrame(List(
// ("a", 10),
// ("b", 20),
// ("c", 30),
// ("x", 10),
// ("h", 80),
// ("y", 20),
// ("z", 30),
// ("d", 40),
// ("e", 50),
// ("f", 60),
// ("g", 70),
// )).toDF("id", "changeSequence")
// val e = sqlContext.createDataFrame(List(
// ("a", "b", "UPDATE"),
// ("b", "c", "UPDATE"),
// ("g", "h", "UPDATE"),
// ("x", "y", "UPDATE"),
// ("y", "z", "UPDATE"),
// ("c", "d", "UPDATE"),
// ("d", "e", "UPDATE"),
// ("e", "f", "UPDATE"),
// ("f", "g", "UPDATE"),
// )).toDF("src", "dst", "operation")
// val g = GraphFrame(v, e)
// COMMAND ----------
val df1 = g.connectedComponents
.run()
.withColumn("col1", struct(col("id"), col("changeSequence")))
.groupBy("component")
.agg(collect_list(col("col1")))
.withColumnRenamed("collect_list(col1)", "col2")
.select("col2")
.withColumn("x_max", expr("""array_max(col2.changeSequence)"""))
.withColumn("x_min", expr("""array_min(col2.changeSequence)"""))
// COMMAND ----------
display(df1)
// COMMAND ----------
val _ = df1.createOrReplaceTempView("tmpTable")
val df2 = spark
.sql(
""" select filter(col2, a -> a.changeSequence=x_min ) startHash, filter(col2, a -> a.changeSequence=x_max ) endHash from tmpTable """
)
.withColumn("startids", explode(col("startHash.id")))
// COMMAND ----------
// val df3 = df2
// .as("joineddf1")
// .join(data.as("maindf1"), $"startids" === $"_beforeDataHash")
// val df4 =
// df3.as("joineddf2").join(data.as("maindf2"), $"endids" === "maindf2._rowHash")
// COMMAND ----------
display(df3)
// COMMAND ----------
df4.show(false)
// COMMAND ----------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment