Last active
November 10, 2021 16:15
-
-
Save hadoopsters/3843326aea3627f138e5fc3a5303e812 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hadoopsters.spark.scala.monitoring.listeners | |
import org.apache.spark.streaming.kafka010.OffsetRange | |
import org.apache.spark.streaming.scheduler._ | |
import org.joda.time.DateTime | |
/** | |
* :: ExampleStreamingListener :: | |
* A simple StreamingListener that accesses summary statistics across Spark Streaming batches; inherits from DeveloperAPI. | |
* | |
* @param exampleArg You can pass whatever you want to a listener! | |
*/ | |
class ExampleStreamingListener (exampleArg: String) extends StreamingListener { | |
// ==================== | |
// onBatch_ Methods | |
// ==================== | |
/** | |
* This method executes when a Spark Streaming batch completes. | |
* | |
* @param batchCompleted Class having information on the completed batch | |
*/ | |
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { | |
println("I was passed to the listener: " + exampleArg) | |
// write performance metrics somewhere | |
writeStatsSomewhere(batchCompleted) | |
// write offsets (state) somewhere, and numRecords per topic | |
processTopicInfo(batchCompleted) | |
} | |
/** | |
* This method executes when a Spark Streaming batch is submitted to the scheduler for execution. | |
* | |
* @param batchSubmitted Class having information on the completed batch | |
*/ | |
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming batch starts. | |
* | |
* @param batchStarted Class having information on the completed batch | |
*/ | |
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { | |
} | |
// ==================== | |
// onReceiver_ Methods | |
// ==================== | |
/** | |
* This method executes when a Spark Streaming receiver has started. | |
* | |
* @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming receiver encounters an error. | |
* | |
* @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming receiver stops working. | |
* | |
* @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { | |
} | |
// ======================================================================= | |
// Convenience Methods (for use in onBatch_ methods) | |
// ======================================================================= | |
/** | |
* Pulls, parses, and logs the key performance metrics of the Streaming app and logs them somewhere. | |
* Processing Time: How many seconds needed to complete this batch (i.e. duration). | |
* Scheduling Delay: How many seconds the start time of this bach was delayed. | |
* Num Records: The total number of input records from a live stream consumed this batch. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeStatsSomewhere(batch: StreamingListenerBatchCompleted): Unit = { | |
// Store the processing time for this batch in seconds | |
val processingTime = if (batch.batchInfo.processingDelay.isDefined) { | |
batch.batchInfo.processingDelay.get / 1000 | |
} | |
else { | |
0 | |
} | |
// Store the scheduling delay for this batch in seconds | |
val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) { | |
batch.batchInfo.schedulingDelay.get / 1000 | |
} | |
else { | |
0 | |
} | |
// Store the total record count for this batch | |
val numRecords = batch.batchInfo.numRecords | |
// do something with `processingTime` | |
// do something with `schedulingDelay` | |
// do something with `numRecords` | |
} | |
/** | |
* A combination method that will process a topic in a batch. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def processTopicInfo(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
writeTopicOffsetsSomewhere(topic) | |
writeTopicCountSomewhere(topic) | |
}) | |
} | |
// ======================================================================= | |
// Topic Methods (designed for use inside of convenience methods) | |
// ======================================================================= | |
/** | |
* Takes a topic object and writes the max offset for each partition it contains this batch somewhere. | |
* | |
* @param topic A topic object within a Batch's StreamIdToInputInfo | |
*/ | |
def writeTopicOffsetsSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = { | |
// map offset info to OffsetRange objects | |
val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]] | |
// for every partition's range of offsets | |
partitionOffsets.map(offsetRange => { | |
// write the new starting offset for each partition in the topic to the state db | |
var maxOffset = offsetRange.untilOffset - 1 | |
// do something with `offsetRange.topic` | |
// do something with `offsetRange.partition` | |
// do something with `offsetRange.count` | |
// do something with `maxOffset` | |
}) | |
} | |
/** | |
* Takes a topic object and writes the number of records for said topic this batch somewhere. | |
* | |
* @param topic A topic object within a Batch's StreamIdToInputInfo | |
*/ | |
def writeTopicCountSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = { | |
// store the individual record count for this topic | |
val numRecords = topic._2.numRecords | |
// store topicName | |
val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic | |
// write record count for this topic this batch | |
// do something with `topicName` and `numRecords` | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment