Last active
August 29, 2015 14:21
-
-
Save juanpampliega/991f3f09b65b836a22d9 to your computer and use it in GitHub Desktop.
Code for running Twitter sentiment analysis with Spark Streaming in spark-shell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.gson.Gson | |
import org.apache.spark.streaming.twitter.TwitterUtils | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.twitter._ | |
import org.apache.spark.storage.StorageLevel | |
import scala.io.Source | |
import scala.collection.mutable.HashMap | |
import java.io.File | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
import sys.process.stringSeqToProcess | |
/** Configures the Oauth Credentials for accessing Twitter */ | |
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { | |
val configs = new HashMap[String, String] ++= Seq( | |
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) | |
println("Configuring Twitter OAuth") | |
configs.foreach{ case(key, value) => | |
if (value.trim.isEmpty) { | |
throw new Exception("Error setting authentication - value for " + key + " not set") | |
} | |
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") | |
System.setProperty(fullKey, value.trim) | |
println("\tProperty " + fullKey + " set as [" + value.trim + "]") | |
} | |
println() | |
} | |
// Configure Twitter credentials | |
val apiKey = "xxx" | |
val apiSecret = "xxx" | |
val accessToken = "xxxx" | |
val accessTokenSecret = "xxx" | |
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) | |
val ssc = new StreamingContext(sc, Seconds(15)) | |
val stream = TwitterUtils.createStream(ssc, None) | |
def sentiment(s:String) : String = { | |
val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that") | |
val negative = Array("hate", "bad", "stupid", "is") | |
var st = 0; | |
val words = s.split(" ") | |
positive.foreach(p => | |
words.foreach(w => | |
if(p==w) st = st+1 | |
) | |
) | |
negative.foreach(p=> | |
words.foreach(w=> | |
if(p==w) st = st-1 | |
) | |
) | |
if(st>0) | |
"positive" | |
else if(st<0) | |
"negative" | |
else | |
"neutral" | |
} | |
sqlContext.udf.register("sentiment", sentiment _) | |
val ssc = new StreamingContext(sc, Seconds(20)) | |
val stream = TwitterUtils.createStream(ssc, None) | |
case class Tweet(createdAt:Long, text:String) | |
val twts = stream.window(Seconds(60)).map(status=> | |
Tweet(status.getCreatedAt().getTime()/1000, status.getText()) | |
) | |
twts.foreachRDD(rdd => { | |
rdd.toDF().registerTempTable("tweets") | |
println("\nSentiments in the last 60 seconds (%s tweets total):".format(rdd.count())) | |
val sentimentsCount = sqlContext.sql("select sentiment(text), count(1) from tweets group by sentiment(text)") | |
sentimentsCount.foreach{t => println("%s --- %s".format(t(0), t(1)))} | |
}) | |
ssc.start() | |
ssc.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment