Last active
July 15, 2017 22:29
-
-
Save juanmf/ef6445843c1ccad1ebce43df8567a66a to your computer and use it in GitHub Desktop.
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_develop_run.html java example written with lambdas and streams, much shorter.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany; | |
/** | |
* Created by juanmf on 15/07/17. | |
*/ | |
import java.util.Arrays; | |
import java.util.stream.Collectors; | |
import org.apache.spark.api.java.*; | |
import org.apache.spark.SparkConf; | |
import scala.Tuple2; | |
public class WordCount { | |
public static void main(String[] args) { | |
// create Spark context with Spark configuration | |
try (JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"))) { | |
int threshold = Integer.parseInt(args[1]); | |
String filePath = args[0]; | |
JavaPairRDD<String, Integer> filtered = filterWordsUnderThreshold(sc, threshold, filePath); | |
// count characters | |
JavaPairRDD<Character, Integer> charCounts = filtered | |
.flatMap( | |
s -> s._1.chars().mapToObj(i -> (char)i).collect(Collectors.toList())) | |
.mapToPair(c -> new Tuple2<>(c, 1)) | |
.reduceByKey((i1, i2) -> i1 + i2); | |
System.out.println(charCounts.collect()); | |
} | |
} | |
private static JavaPairRDD<String, Integer> filterWordsUnderThreshold(JavaSparkContext sc, int threshold, String filePath) { | |
// read in text file and split each document into words | |
JavaRDD<String> tokenized = sc.textFile(filePath).flatMap(s -> Arrays.asList(s.split(" "))); | |
// count the occurrence of each word | |
JavaPairRDD<String, Integer> counts = tokenized | |
.mapToPair(s -> new Tuple2<>(s, 1)) | |
.reduceByKey((i1, i2) -> i1 + i2); | |
// filter out words with fewer than threshold occurrences | |
return counts.filter(tup -> tup._2 >= threshold); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment