次にJobScheduler#startで呼ばれるのが以下の部分です。
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
コメントを見ると、NetworkInputDStreamsの管理をするクラスだと記載されています。
def start() = synchronized {
if (actor != null) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
"ReceiverTracker")
receiverExecutor.start()
logInfo("ReceiverTracker started")
}
}
ここではAkkaにReceiverTrackerActorを登録して、receiverExecutorをstartしています。
ReceiverTrackerActorは以下の動きをします。
private class ReceiverTrackerActor extends Actor {
def receive = {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
sender ! true
}
}
Receiverからメッセージを受け取るためのActorのようです。
一回ここでの受信メッセージについてはおいておきます。
receiveExecutorは実質ReceiveLancherが呼ばれています。
ReceiveLauncherはクラスタ上のReceiver全てが呼び出すものになっています。
val receiverExecutor = new ReceiverLauncher()
ReceiverLauncher#startは以下のような実装になっています。
def start() {
thread.start()
}
またthreadですね。中身は以下のようになっています。
@transient val thread = new Thread() {
override def run() {
try {
SparkEnv.set(env)
startReceivers()
} catch {
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
}
}
}
envはSparckContextのenvなので環境変数を共有しているイメージでしょうか。
startReceiversは以下のような実装です。
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
ssc.sparkContext.runJob(tempRDD, startReceiver)
logInfo("All of the receivers have been terminated")
}
長い。。。コメントを見るとReceiverInputDStreamsから受け取ってそれらをノードに渡すんだといっています。
大事そうなので細かくみていきます。
まずはreceiversから。
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
receiverInputStreamsはクラス初期化の際にssc.graph.getReceiverInputStreams()が代入されます。
最初のほうにssc.graphはDStreamGraphをインスタンス化したものが代入されていました。
def getReceiverInputStreams() = this.synchronized {
inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
.map(_.asInstanceOf[ReceiverInputDStream[_]])
.toArray
}
ということでinputDStreamの配列が帰ってくるイメージです。
次にmapの中で呼ばれているnis.getReceiver()ですがこれは入力ソースによって違ってくるものです。
KafkaInputDStreamを利用する場合KafkaReceiverのインスタンスがかえります。
def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
}
ここで次のreceiveTrackerの処理に進みます。
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
ここではもし各Receiverは自分の好きなホストを指定していればそこを受け取るようにしています。
今回はメインの流れとは関係しないためスキップします。(preferredLocationはKafkaReceiverの親クラスであるReceiverクラスでオーバーライドできるようになっています)
次。
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
コメントを見るとワーカーノードにあるReceiverたちに配布するためのコレクションを作るとしています。
ここでStreamingContextが持つSpackContextに対してRDDをつくるよう指示しています。
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
* altered after the call to parallelize and before the first action on the
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
* the argument to avoid this.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
Receiverの数分並列にRDDの形でデータを渡すようにしています。
ParallelCollectionRDDはまた後で見ます。
次にいきます。
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
各Receiverに対してstartよんでるんだとおもうんだけど、あくまで関数として保存しているだけ。
また後で呼ばれるんだろうということだがreceiverに対してReceiverSupervisorImplしてるとこは見ておく。
と思ったけど実装が長いのであとで必要なとこだけ見てみる。
やってるのはAkkaの振る舞いとかの実装。ここで見るのはexecutor.start()とexecutor.awaitTermination()とする。
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
override protected def onStart() {
blockGenerator.start()
}
blockGeneratorは後で見ます。
もう一つのstartReceiverメソッドはReciverSupervisor.scalaにあります。
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
case t: Throwable =>
stop("Error starting receiver " + streamId, Some(t))
}
}
ここでonStartが呼ばれるがReceiverインスタンスはKafkaReceiverなのでそこにあるonStart実装を見てみる。
def onStart() {
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)
// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
}
ZooKeeperにつなげてがんばってそうです。
さらにstartReceiverのなかでonReceiverStartをよんでいるので実装をみてみます。
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
}
はっ、とうとうここでResigerReceiverしています!
RegisterReceiverはReceiverTrackerActorの中で持っているAkkaイベントでここで送り先を登録していそうです。
さて、ここまででやっとreceiverExecutorにどうやってreceiverが登録されるかがわかりました。
今見てきたメソッドはまだ呼ばれていませんが次の処理で呼ばれるようになります。
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
ssc.sparkContext.runJob(tempRDD, startReceiver)
logInfo("All of the receivers have been terminated")
Receiverを実際に動かす部分です。
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark. The allowLocal
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}
ここでdagschedulerから実行してるぽい。