Last active
April 4, 2022 03:17
-
-
Save bvaradar/d892c6c6a69664463f8601d09c187271 to your computer and use it in GitHub Desktop.
Structured Streaming Simple Testbed setup using kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Download Confluent platform as zip locally : https://www.confluent.io/download/?_ga=2.149291254.1340520780.1594928883-290224092.1594928883&_gac=1.220398892.1594951593.EAIaIQobChMIm6Cmz5nT6gIVCa_ICh0IeAjlEAAYASAAEgLWkfD_BwE | |
#Choose zip option. Unzip after download. setup in your home directory. | |
export CONFLUENT_HOME=<path_to_confluent_home> | |
export PATH=$PATH:$CONFLUENT_HOME/bin | |
# Start services | |
confluent local start | |
# Start load-gen - Orders table | |
ksql-datagen value-format=json msgRate=20000 quickstart=orders printRows=false nThreads=20 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' | |
:paste | |
import spark.implicits._ | |
import scala.concurrent.duration._ | |
import org.apache.spark.sql.types.DataTypes | |
import org.apache.spark.sql.types.StructType | |
import org.apache.spark.sql.streaming._ | |
import org.apache.spark.sql.functions._ | |
import java.util.Calendar | |
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions} | |
import org.apache.spark.{SparkConf, SparkContext}; | |
sc.setLogLevel("ERROR") | |
class EventCollector extends StreamingQueryListener { | |
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { | |
println("Starting batch " + ", Time :" + Calendar.getInstance().getTime()) | |
} | |
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { | |
println("Progress : " + event.progress + ", Time :" + Calendar.getInstance().getTime()) | |
} | |
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { | |
println("Terminating batch " + ", Time :" + Calendar.getInstance().getTime()) | |
} | |
} | |
// Read from Kafka - Topick : orders_kafka_topic_json | |
// Note maxOffsetsPerTrigger gives an ability to reproduce the same load for repeated experiments when starting from fresh | |
// Use earliest starting offset to ensure we are restarting from same state | |
val inputDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "orders_kafka_topic_json") | |
.option("maxOffsetsPerTrigger", 10000) | |
.option("startingOffsets", "earliest") | |
.load() | |
val ordersJsonDf : Dataset[Row] = inputDf.selectExpr("CAST(value AS STRING)") | |
// Schema for value part of kafka payload. Matches Orders schema | |
val addr_struct = new StructType().add("city", DataTypes.StringType).add("state", DataTypes.StringType).add("zipcode", DataTypes.IntegerType) | |
val struct = new StructType().add("ordertime", DataTypes.LongType).add("orderid", DataTypes.IntegerType).add("itemid", DataTypes.StringType).add("orderunits", DataTypes.DoubleType).add("address", addr_struct) | |
// Add partition path col with a literal value | |
val outputDF = ordersJsonDf.select(from_json(col("value"), struct).as("orders")).withColumn("partitionpath", functions.lit("default")) | |
// start writing to Hudi | |
val query = outputDF.writeStream.format("hudi").option("path", "file:///tmp/orders_stream_hudi/") | |
.option("checkpointLocation", "file:///tmp/orders_stream_hudi_ckpt/") | |
//.option("hoodie.parquet.small.file.limit", "0") | |
.option("hoodie.insert.shuffle.parallelism", "4") | |
.option("hoodie.upsert.shuffle.parallelism", "4") | |
.option("hoodie.datasource.write.operation", "upsert") | |
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") | |
.option("hoodie.datasource.write.recordkey.field", "orders.orderid") | |
.option("hoodie.datasource.write.precombine.field","orders.ordertime") | |
.option("hoodie.datasource.write.partitionpath.field", "partitionpath") | |
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator") | |
.option("hoodie.table.name", "hudi_streaming_orders") | |
.trigger(Trigger.ProcessingTime("60 seconds")).start() | |
query.awaitTermination() | |
val inputDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "orders_kafka_topic_json").load() | |
val ordersJsonDf = inputDf.selectExpr("CAST(value AS STRING)") | |
// Extract value as json | |
val addr_struct = new StructType().add("city", DataTypes.StringType).add("state", DataTypes.StringType).add("zipcode", DataTypes.IntegerType) | |
// Schema for changing to structTypes | |
val struct = new StructType().add("ordertime", DataTypes.LongType).add("orderid", DataTypes.IntegerType).add("itemid", DataTypes.StringType).add("orderunits", DataTypes.DoubleType).add("address", addr_struct) | |
val outputDF = ordersJsonDf.select(from_json($"value", struct).as("orders")) | |
// start writing to Hudi | |
outputDF.writeStream.format("hudi").option("path", "file:///tmp/orders_stream_hudi/").option("checkpointLocation", "file:///tmp/orders_stream_hudi_ckpt/").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option("hoodie.datasource.write.table.type", "MERGE_ON_READ").option("hoodie.datasource.write.recordkey.field", "orders.orderid").option("hoodie.datasource.write.precombine.field","orders.ordertime").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").option("hoodie.table.name", "hudi_streaming_orders").start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I just created an application based on the steps in your code to write data from Kafka to HDFS using Hudi 0.8.0 and Spark 2.4.0.
In general everything seems to work as intended, however I observed issues with the checkpoint, which seems to be written even when there is an error or when
query.stop()
is called.E.g. I called
query.stop()
pretty much right after a new micro-batch started, so effectively there was only 1 message processed and no data written to Hudi, however both the checkpoint files in/offsets
and/commits
were created, so the next batch would start from the wrong offset and effectively skip messages.Could this issue be related to using Hudi as a Sink for Spark Structured Streaming or originate from something else?