This example uses Scala. Please see the MLlib documentation for a Java example.
Try running this code in the Spark shell. It may produce different topics each time (since LDA includes some randomization), but it should give topics similar to those listed above.
This example is paired with a blog post on LDA in Spark:
import scala.collection.mutable
import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
// Load documents from text files, 1 document per file
val corpus: RDD[String] = sc.textFile("hs/BufferLeaving.txt")
// Split each document into a sequence of terms (words)
val tokenized: RDD[Seq[String]] ="\\s")).map(_.filter(_.length > 3).filter(_.forall(java.lang.Character.isLetter)))
// Choose the vocabulary.
// termCounts: Sorted list of (term, termCount) pairs
val termCounts: Array[(String, Long)] =
tokenized.flatMap( -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)
// vocabArray: Chosen vocab (removing common terms)
val stopWords = sc.textFile("stop-words/*.txt").collect()
val vocabArray= termCounts.filter(t => !(stopWords contains t._1)).map(_._1)
// vocab: Map term -> term index
val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap
// Convert documents into term count vectors
val documents: RDD[(Long, Vector)] = { case (tokens, id) =>
val counts = new mutable.HashMap[Int, Double]()
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
(id, Vectors.sparse(vocab.size, counts.toSeq))
// Set LDA parameters
val numTopics = 30
val lda = new LDA().setK(numTopics).setMaxIterations(100)
val ldaModel =
val avgLogLikelihood = ldaModel.logLikelihood / documents.count()
// Print topics, showing top-weighted 10 terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 4)
topicIndices.foreach { case (terms, termWeights) =>
println("TOPIC:") { case (term, weight) =>
