Created
December 18, 2019 14:21
-
-
Save IgorBerman/8175eddd27860f05e258584c0d64b963 to your computer and use it in GitHub Desktop.
structured streaming example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package igorprivate; | |
import shaded.parquet.org.slf4j.LoggerFactory; | |
import static org.apache.spark.sql.functions.concat_ws; | |
import static org.apache.spark.sql.functions.date_format; | |
import static org.apache.spark.sql.functions.from_json; | |
import static org.apache.spark.sql.functions.lit; | |
import static org.apache.spark.sql.functions.struct; | |
import static org.apache.spark.sql.functions.sum; | |
import static org.apache.spark.sql.functions.to_json; | |
import static org.apache.spark.sql.functions.when; | |
import static org.apache.spark.sql.functions.window; | |
import java.util.HashMap; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.log4j.Level; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SparkSession; | |
import org.apache.spark.sql.streaming.OutputMode; | |
import org.apache.spark.sql.streaming.StreamingQuery; | |
import org.apache.spark.sql.streaming.Trigger; | |
import org.apache.spark.sql.types.StructType; | |
public class SparkStructuredStreaminglMainExample { | |
public static void main(String[] args) throws Exception { | |
org.apache.log4j.Logger.getRootLogger().setLevel(Level.INFO); | |
SparkConf properties = new SparkConf(); | |
properties.set("spark.sql.session.timeZone", "UTC"); | |
properties.set("spark.default.parallelism", String.valueOf(8)); | |
properties.set("spark.sql.shuffle.partitions", String.valueOf(8)); | |
String master = System.getProperty("spark.master", "local["+Runtime.getRuntime().availableProcessors()+"]"); | |
try (SparkSession spark = SparkSession.builder().config(properties).master(master).appName("my-awesome-spark").getOrCreate()) { | |
spark.sparkContext().setLogLevel("INFO"); | |
Dataset<Row> df = spark.readStream().format("kafka") | |
.option("subscribe", "raw_sla_reports") | |
.option("kafka.bootstrap.servers", "0.0.0.0:29092") | |
.option("groupidprefix", "my-awesome-group-spark-streaming-dev") | |
.load(); | |
Dataset<Row> rowDataset = df.selectExpr("CAST(value AS STRING)"); | |
StructType schema = StructType.fromDDL("msgId string, sourceLabels map<string,string>, data array<struct<logicalTime: BIGINT, processingTime: BIGINT, labels: map<string,string>, value: INT>>"); | |
Dataset<Row> jsonDF = rowDataset.select(from_json(rowDataset.col("value"), schema, new HashMap<>()).as("r")); | |
jsonDF = jsonDF.select("r.*"); | |
Dataset<Row> raw_sla_reports_exploded_stream = jsonDF.selectExpr("msgId", "sourceLabels", "EXPLODE(data) as data"); | |
Dataset<Row> d = raw_sla_reports_exploded_stream.selectExpr( | |
"msgId", | |
"sourceLabels['sourceGroup'] as sourceGroup", | |
"cast(data.logicalTime/1000 as timestamp) as logicalTime", | |
"cast(data.processingTime/1000 as timestamp) as processingTime", | |
"data.processingTime - data.logicalTime as delay", | |
"data.value as value", | |
"data.labels['msgtype'] as msgtype") | |
.withWatermark("logicalTime", "24 hour"); | |
d = d.dropDuplicates("msgId", "logicalTime"); //important to have logicalTime inside | |
Dataset<Row> histo = d.groupBy( | |
window(d.col("logicalTime"), "60 seconds"), | |
d.col("sourceGroup"), | |
d.col("msgtype") | |
).agg( | |
sum(when(d.col("delay").leq(lit("60000")), | |
d.col("value")).otherwise(lit(0))).alias("zero_to_one_min"), | |
sum(when(d.col("delay").gt(lit("60000")) | |
.and(d.col("delay").leq(lit("300000"))), | |
d.col("value")).otherwise(lit(0))).alias("one_to_five_mins"), | |
sum(when(d.col("delay").gt(lit("300000")) | |
.and(d.col("delay").leq(lit("600000"))), | |
d.col("value")).otherwise(lit(0))).alias("five_to_ten_mins"), | |
sum(when(d.col("delay").gt(lit("600000")) | |
.and(d.col("delay").leq(lit("1800000"))), | |
d.col("value")).otherwise(lit(0))).alias("ten_to_thirty_mins"), | |
sum(when(d.col("delay").gt(lit("1800000")), | |
d.col("value")).otherwise(lit(0))).alias("more_than_thirty_mins"), | |
sum("value").alias("total_protos") | |
); | |
Dataset<Row> histo_projected = histo.selectExpr( | |
"sourceGroup", | |
"msgtype", | |
"window", | |
"zero_to_one_min", | |
"one_to_five_mins", | |
"five_to_ten_mins", | |
"ten_to_thirty_mins", | |
"more_than_thirty_mins", | |
"total_protos" | |
).withColumn("window_start", date_format(histo.col("window.start"), "yyyy-MM-dd-HH-mm-ss")).drop("window"); | |
//histo_projected.printSchema(); | |
Dataset<Row> histo_projected_with_agg_key = histo_projected.withColumn("aggKey", | |
concat_ws("_", histo_projected.col("sourceGroup"), histo_projected.col("msgtype"), histo_projected.col("window_start"))); | |
//histo_projected_with_agg_key.printSchema(); | |
Dataset<Row> forOutputTopic = histo_projected_with_agg_key.select( | |
to_json(struct(histo_projected_with_agg_key.col("*"))).alias("value") | |
); //value must be for kafka output | |
//forOutputTopic.printSchema(); | |
StreamingQuery streamingQuery = | |
forOutputTopic.writeStream().format("kafka") | |
.option("kafka.bootstrap.servers", "0.0.0.0:29092") | |
.option("topic", "delays_per_group_histo_table") | |
.option("checkpointLocation", "/tmp/checkpoint") | |
.outputMode(OutputMode.Update()) | |
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) | |
.start(); | |
streamingQuery.awaitTermination(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment