Created
March 8, 2021 19:56
-
-
Save jacace/6965669d626aefb1eb2bb5db4d14bd1c to your computer and use it in GitHub Desktop.
Joins in Java Spak Streaming API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static void joinDemo(JavaDStream<ConsumerRecord<String, String>> productStream, | |
JavaDStream<ConsumerRecord<String, String>> salesStream) { | |
ObjectMapper jacksonParser = new ObjectMapper(); | |
JavaPairDStream<Object, Object> s1 = productStream.mapToPair(record -> new Tuple2<Object, Object>(record.key(), | |
jacksonParser.readValue(record.value(), Item.class))); | |
JavaPairDStream<Object, Object> s2 = salesStream.mapToPair(record -> new Tuple2<Object, Object>(record.key(), | |
jacksonParser.readValue(record.value(), DailySales.class))); | |
JavaPairDStream<Object, Tuple2<Object, Object>> s3 = s1.join(s2); | |
s3.foreachRDD(new VoidFunction<JavaPairRDD<Object, Tuple2<Object, Object>>>() { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public void call(JavaPairRDD<Object, Tuple2<Object, Object>> rdd) throws Exception { | |
System.out.println("Num of Records in RDD: " + Long.toString(rdd.count())); | |
rdd.foreach(data -> { | |
System.out.println("Key: " + data._1().toString() + ". Obj in Value 1: " + ((Item) data._2()._1()).name | |
+ ". Obj in Value 2: " + Integer.toString(((DailySales) data._2()._2()).soldUnits)); | |
}); | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment