Last active
March 22, 2016 18:01
-
-
Save thash/1a40ea6e619e110762c7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// $ curl -s -L http://dumps.wikimedia.org/enwiki/latest/\ | |
// enwiki-latest-pages-articles-multistream.xml.bz2 \ | |
// | bzip2 -cd \ | |
// | hadoop fs -put - /user/ds/wikidump.xml | |
// | |
// $ sudo yum install -y git | |
// $ git clone https://github.com/sryza/aas ~/aas | |
// $ cd ~/aas | |
// $ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo | |
// $ sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo | |
// $ sudo yum install -y apache-maven | |
// $ mvn package ;; https://github.com/sryza/aas/issues/14 -- should run from repo root | |
// $ cd ch06-lsa | |
// $ spark-shell --jars target/ch06-lsa-1.0.2-jar-with-dependencies.jar | |
import com.cloudera.datascience.common.XmlInputFormat | |
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation} | |
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP} | |
import edu.umd.cloud9.collection.wikipedia.WikipediaPage | |
import edu.umd.cloud9.collection.wikipedia._ | |
import edu.umd.cloud9.collection.wikipedia.language.EnglishWikipediaPage | |
import edu.umd.cloud9.collection.wikipedia.language._ | |
import java.io.{FileOutputStream, PrintStream} | |
import java.util.Properties | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io._ | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.mllib.linalg.{Vector, Vectors} | |
import org.apache.spark.mllib.linalg.distributed.RowMatrix | |
import org.apache.spark.rdd.RDD | |
import scala.collection.JavaConverters._ | |
import scala.collection.Map | |
import scala.collection.mutable.ArrayBuffer | |
import scala.collection.mutable.HashMap | |
val path = "hdfs:///user/ds/wikidump.xml" | |
@transient val conf = new Configuration() | |
// conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml | |
conf.set(XmlInputFormat.START_TAG_KEY, "<page>") | |
conf.set(XmlInputFormat.END_TAG_KEY, "</page>") | |
val kvs = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) | |
// 16/03/21 10:36:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 202.9 KB, free 202.9 KB) | |
// 16/03/21 10:36:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.8 KB, free 226.8 KB) | |
// 16/03/21 10:36:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.26.245:58525 (size: 23.8 KB, free: 518.0 MB) | |
// 16/03/21 10:36:26 INFO SparkContext: Created broadcast 0 from newAPIHadoopFile at <console>:36 | |
// kvs: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = hdfs:///user/ds/wikidump.xml NewHadoopRDD[0] at newAPIHadoopFile at <console>:36 | |
val rawXmls = kvs.map(p => p._2.toString) | |
// rawXmls: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:38 | |
def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = { | |
val page = new EnglishWikipediaPage() | |
WikipediaPage.readPage(page, pageXml) | |
if (page.isEmpty || !page.isArticle || page.isRedirect || | |
page.getTitle.contains("(disambiguation)")) { | |
None | |
} else { | |
Some((page.getTitle, page.getContent)) | |
} | |
} | |
// wikiXmlToPlainText: (pageXml: String)Option[(String, String)] | |
val plainText = rawXmls.flatMap(wikiXmlToPlainText) | |
// plainText: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at flatMap at <console>:69 | |
def createNLPPipeline(): StanfordCoreNLP = { | |
val props = new Properties() | |
props.put("annotators", "tokenize, ssplit, pos, lemma") | |
new StanfordCoreNLP(props) | |
} | |
// createNLPPipeline: ()edu.stanford.nlp.pipeline.StanfordCoreNLP | |
def isOnlyLetters(str: String): Boolean = { | |
// While loop for high performance | |
var i = 0 | |
while (i < str.length) { | |
if (!Character.isLetter(str.charAt(i))) { | |
return false | |
} | |
i += 1 | |
} | |
true | |
} | |
// isOnlyLetters: (str: String)Boolean | |
def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP): Seq[String] = { | |
val doc = new Annotation(text) | |
pipeline.annotate(doc) | |
val lemmas = new ArrayBuffer[String]() | |
val sentences = doc.get(classOf[SentencesAnnotation]) | |
for (sentence <- sentences.asScala; | |
token <- sentence.get(classOf[TokensAnnotation]).asScala) { | |
val lemma = token.get(classOf[LemmaAnnotation]) | |
if (lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)) { | |
lemmas += lemma.toLowerCase | |
} | |
} | |
lemmas | |
} | |
// assume that pwd => aas/ch06-lsa | |
val stopWords = sc.broadcast(scala.io.Source.fromFile("src/main/resources/stopwords.txt").getLines().toSet).value | |
// 16/03/21 10:59:37 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 15.7 KB, free 242.5 KB) | |
// 16/03/21 10:59:37 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1114.0 B, free 243.6 KB) | |
// 16/03/21 10:59:37 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.31.26.245:58525 (size: 1114.0 B, free: 518.0 MB) | |
// 16/03/21 10:59:37 INFO SparkContext: Created broadcast 1 from broadcast at <console>:59 | |
// stopWords: scala.collection.immutable.Set[String] = Set(down, it's, that's, for, further, she'll, any, there's, this, haven't, in, ought, myself, have, your, off, once, i'll, are, is, his, why, too, why's, am, than, isn't, didn't, himself, but,... | |
val lemmatized = plainText.mapPartitions(iter => { | |
val pipeline = createNLPPipeline() | |
iter.map{ case(title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline))} | |
}) | |
// lemmatized: org.apache.spark.rdd.RDD[(String, Seq[String])] = MapPartitionsRDD[3] at mapPartitions at <console>:79 | |
// 16/03/21 17:54:15 WARN TaskSetManager: Lost task 24.0 in stage 0.0 (TID 56, ip-172-31-13-188.ap-northeast-1.compute.internal): java.lang.IllegalArgumentException: No annotator named tokenize | |
// at edu.stanford.nlp.pipeline.AnnotatorPool.get(AnnotatorPool.java:83) | |
// at edu.stanford.nlp.pipeline.StanfordCoreNLP.construct(StanfordCoreNLP.java:292) | |
// val docTermFreqs = docs.mapValues(terms => { | |
val docTermFreqs = lemmatized.mapValues(terms => { | |
val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) { | |
(map, term) => map += term -> (map.getOrElse(term, 0) + 1) | |
} | |
termFreqsInDoc | |
}) | |
// docTermFreqs: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.HashMap[String,Int])] = MapPartitionsRDD[4] at mapValues at <console>:81 | |
// this RDD will be used at least twice, so it's good idea to cache it. | |
docTermFreqs.cache() | |
// comb: (dfs1: scala.collection.mutable.HashMap[String,Int], dfs2: scala.collection.mutable.HashMap[String,Int])scala.collection.mutable.HashMap[String,Int] | |
//// It's too large, so don't execute code below: | |
// docTermFreqs.flatMap(_._2.keySet).distinct().count() | |
// docTermFreqs.map(_._2).aggregate(zero)(merge, comb) | |
// in github code: documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms) | |
val docFreqs = docTermFreqs.flatMap(_._2.keySet).map((_, 1)).reduceByKey(_ + _) | |
// docFreqs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:81 | |
val numTerms = 50000 // original: 50000 | |
val numDocs = 10 | |
val ordering = Ordering.by[(String, Int), Int](_._2) | |
// *** // | |
// slow... 12 core => 2.5hr | |
val topDocFreqs = docFreqs.top(numTerms)(ordering) | |
// *** // | |
// 16/03/21 12:22:28 INFO TaskSetManager: Finished task 19.0 in stage 0.0 (TID 15) in 2264441 ms on ip-172-31-18-236.ap-northeast-1.compute.internal (13/421) | |
val idfs = topDocFreqs.map{ | |
case (term, count) => (term, math.log(numDocs.toDouble / count)) | |
}.toMap | |
// idfs: scala.collection.immutable.Map[String,Double] = Map(dredd -> -4.55597994179732, gaiden -> -4.550714000192032, quotient -> -5.633360010359655, clarissa -> -5.196838197598183, incident -> -9.008530138598365, meteorologist -> -5.95246380975825, misfire -> -4.600157644164547, serious -> -9.03063937850481, wgbh -> -4.3528552573736015, brink -> -6.359227788032135, turnstile -> -4.898585790287632, gans -> -4.4953553199808844, isamu -> -4.4796069630127455, acronym -> -7.0597894348376355, youthful -> -6.348264483234865, sinister -> -6.602859225795668, comply -> -7.574301777744903, ebb -> -5.602118820879701, breaks -> -5.236441962829949, mafioso -> -4.484131857611035, marr -> -5.518255786913298, dns -> -6.392084603552947, forgotten -> -6.736017515727827, layton -> -5.728475087246572, waterv... | |
val idTerms = idfs.keys.zipWithIndex.toMap | |
val termIds = idTerms.map(_.swap) | |
// termIds are very large and will be used in multiple places, so broadcast it. | |
val bTermIds = sc.broadcast(termIds).value | |
val bIdfs = sc.broadcast(idfs).value | |
val bIdTerms = sc.broadcast(idTerms).value | |
val termDocMatrix = docTermFreqs.map(_._2).map(termFreqs => { | |
val docTotalTerms = termFreqs.values.sum // Typographical error. values() => values | |
val termScores = termFreqs.filter { | |
case (term, freq) => bIdTerms.contains(term) | |
}.map{ | |
case (term, freq) => (bIdTerms(term), | |
bIdfs(term) * termFreqs(term) / docTotalTerms) | |
}.toSeq | |
Vectors.sparse(bIdTerms.size, termScores) | |
}) | |
// termDocMatrix: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[10] at map at <console>:99 | |
termDocMatrix.cache() | |
val mat = new RowMatrix(termDocMatrix) | |
val k = 10 // original : 1000 | |
// in RunLSA.scala : val k = if (args.length > 0) args(0).toInt else 100 | |
val svd = mat.computeSVD(k, computeU=true) | |
// # java.lang.OutOfMemoryError: Java heap space | |
// # -XX:OnOutOfMemoryError="kill -9 %p" | |
// # Executing /bin/sh -c "kill -9 11363"... | |
// /usr/lib/spark/bin/spark-shell: 44 行: 11363 強制終了 | |
// "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" | |
// k = 10 OK | |
// svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix] = | |
// SingularValueDecomposition(org.apache.spark.mllib.linalg.distributed.RowMatrix@6e63c7a0,[874.5968147274564,566.6543192053327,509.50910044225265,454.66005075189275,424.82823437080754,389.6360554949427,368.56883392687575,357.90655425497454,342.85972656956886,331.69166280914004],8.439268848830005E-6 2.020865219101397E-5 ... (10 total) | |
// 3.0862992102284116E-6 6.617427606824673E-6 ... | |
// 1.8824418738801623E-5 2.3732964695046555E-5 ... | |
// 1.0500150984320169E-5 1.9438089761772934E-5 ... | |
// 4.978838951252806E-4 7.897882750222361E-4 ... | |
// 4.8058326325708486E-5 8.077698461211198E-5 ... | |
// 1.5016628935073547E-6 2.9483585966287222E-6 .... | |
// println("Singular values: " + svd.s) | |
// // => Singular values: [874.5968147274564,566.6543192053327,509.50910044225265,454.66005075189275,424.82823437080754,389.6360554949427,368.56883392687575,357.90655425497454,342.85972656956886,331.69166280914004] | |
// val v = svd.v | |
// => <console>:108: error: value v is not a member of org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix] | |
val v = svd.V | |
val numConcepts = 10 | |
val topTerms = new ArrayBuffer[Seq[(String, Double)]]() | |
// below also require some time (a few minutes) | |
val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap().toMap | |
// # java.lang.OutOfMemoryError: Java heap space | |
// # -XX:OnOutOfMemoryError="kill -9 %p" | |
// # Executing /bin/sh -c "kill -9 26024"... | |
// /usr/lib/spark/bin/spark-shell: 44 行: 26024 強制終了 "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" | |
for (i <- 0 until numConcepts) { | |
val offs = i * v.numRows | |
val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex | |
val sorted = termWeights.sortBy(-_._1) | |
topTerms += sorted.take(numTerms).map{ | |
case (score, id) => (termIds(id), score) | |
} | |
} | |
topTerms |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment