Last active December 15, 2018 22:55
import org.apache.spark.eventhubs._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// Build connection string with the above information
val connectionString = ConnectionStringBuilder("Endpoint.....").setEventHubName("twitterhub").build
val customEventhubParameters =
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
import java.util._
case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable
case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable
case class SentimentDocuments(id: String, score: Double) extends Serializable
case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable
import scala.util.parsing.json._
object SentimentDetector extends Serializable {
// Cognitive Services API connection settings
val accessKey = "YOUR ACCESS KEY"
val host = ""
val languagesPath = "/text/analytics/v2.0/languages"
val sentimentPath = "/text/analytics/v2.0/sentiment"
val languagesUrl = new URL(host+languagesPath)
val sentimenUrl = new URL(host+sentimentPath)
val g = new Gson
def getConnection(path: URL): HttpsURLConnection = {
val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestProperty("Content-Type", "text/json")
connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
return connection
def prettify (json_text: String): String = {
val parser = new JsonParser()
val json = parser.parse(json_text).getAsJsonObject()
val gson = new GsonBuilder().setPrettyPrinting().create()
return gson.toJson(json)
// Handles the call to Cognitive Services API.
def processUsingApi(request: RequestToTextApi, path: URL): String = {
val requestToJson = g.toJson(request)
val encoded_text = requestToJson.getBytes("UTF-8")
val connection = getConnection(path)
val wr = new DataOutputStream(connection.getOutputStream())
wr.write(encoded_text, 0, encoded_text.length)
val response = new StringBuilder()
val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
var line = in.readLine()
while (line != null) {
line = in.readLine()
return response.toString()
// Calls the language API for specified documents.
def getLanguage (inputDocs: RequestToTextApi): Option[Language] = {
try {
val response = processUsingApi(inputDocs, languagesUrl)
// In case we need to log the json response somewhere
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val language = g.fromJson(niceResponse, classOf[Language])
if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)")
return None
return Some(language)
} catch {
case e: Exception => return None
// Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = {
try {
val response = processUsingApi(inputDocs, sentimenUrl)
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val sentiment = g.fromJson(niceResponse, classOf[Sentiment])
return Some(sentiment)
} catch {
case e: Exception => return None
// User Defined Function for processing content of messages to return their sentiment.
val toSentiment =
udf((textContent: String) =>
val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent)))
val detectedLanguage = SentimentDetector.getLanguage(inputObject)
detectedLanguage match {
case Some(language) =>
if(language.documents.size > 0) {
inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name
val sentimentDetected = SentimentDetector.getSentiment(inputObject)
sentimentDetected match {
case Some(sentiment) => {
if(sentiment.documents.size > 0) {
else {
"Error happened when getting sentiment: " + sentiment.errors(0).toString
case None => "Couldn't detect sentiment"
else {
"Error happened when getting language" + language.errors(0).toString
case None => "Couldn't detect language"
// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content"))
// Display the streaming data with the sentiment
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
