美文网首页spark玩转大数据大数据
Spark Streaming源码解读之Driver中的Rece

Spark Streaming源码解读之Driver中的Rece

作者: 阳光男孩spark | 来源:发表于2016-05-24 14:17 被阅读47次

    启动Receiver的方式:

    1.把每个Receiver都封装成为task,这个task是这个job中唯一的task,实质上讲ReceiverTracker启动Receiver的方式就是封装成一个一个的job,有多少个job就会启动多少Receiver。每个task就一条数据,就是Receiver的数据。

    2.ReceiverTracker在启动Receiver的时候有一个ReceiverSupervisor,

          ReceiverSupervisorImp做为ReceiverSupervisor的实现,ReceiverSupervisor在启动的时候会启动Receiver,然后Receiver不断的接收数据,会通过blockGenerate把自己接收的数据变成一个一个的block,背后自己有个定时器,这个定时器会不断的存储数据。一种是直接通过blockGenerate存储,一种是先写日志WAL。ReceiverSupervisorImpl会把存储的元数据汇报给ReceiverTracker(实际上是ReceiverTracker中的RPC通信消息实体)。后面进行下一步的数据管理工作。

    ReceiverTracker:

    /** RpcEndpoint to receive messages from the receivers. */

    private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {

    RPC消息通信体。

    接下来以接收到来自Executor端的ReceiverSupervisorImpl发来添加元数据信息的AddBlock消息,进行讲解具体的处理流程。

    ReceiverTracker:

    ...

    override defreceiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

    // Remote messages

    case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>

    val successful =

    registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)

    context.reply(successful)

    // 若启用WAL方式,则在线程池中执行addBlock函数,否则直接执行addBlock函数,回复给ReceiverSupervisorImpl添加源数据是否成功的结果。

    caseAddBlock(receivedBlockInfo) =>

    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {

    walBatchingThreadPool.execute(new Runnable {

    override def run(): Unit = Utils.tryLogNonFatalError {

    if (active) {

    context.reply(addBlock(receivedBlockInfo))

    } else {

    throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")

    }

    }

    })

    } else {

    context.reply(addBlock(receivedBlockInfo))

    }

    case DeregisterReceiver(streamId, message, error) =>

    deregisterReceiver(streamId, message, error)

    context.reply(true)

    // Local messages

    case AllReceiverIds =>

    context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)

    case StopAllReceivers =>

    assert(isTrackerStopping || isTrackerStopped)

    stopReceivers()

    context.reply(true)

    }

    ...

    /** Add new blocks for the given stream */

    private defaddBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

    receivedBlockTracker.addBlock(receivedBlockInfo)

    }

    ...

    ReceivedBlockInfo类包含了StreamID,Block中记录条数,元数据Metadata,接收Block的存储结果(BlockID和记录数量)

    ReceivedBlockInfo:

    ...

    /** Information about blocks received by the receiver */

    private[streaming] case classReceivedBlockInfo(

    streamId: Int,

    numRecords: Option[Long],

    metadataOption: Option[Any],

    blockStoreResult: ReceivedBlockStoreResult

    ) {

    ...

    ReceiverBlockTracker类是addBlock方法的具体实现。

    ...

    /** Add received block. This event will get written to the write ahead log (if enabled). */

    defaddBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

    try {

    // 调用writeToLog来判断是否需要预写日志

    val writeResult =writeToLog(BlockAdditionEvent(receivedBlockInfo))

    if (writeResult) {

    synchronized {

    // 将receiverBlockInfo添加到队列中

    getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

    }

    logDebug(s"Stream ${receivedBlockInfo.streamId} received " +

    s"block ${receivedBlockInfo.blockStoreResult.blockId}")

    } else {

    logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +

    s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

    }

    writeResult

    } catch {

    case NonFatal(e) =>

    logError(s"Error adding block $receivedBlockInfo", e)

    false

    }

    }

    ...

    调用ReceiverBlockTracker的writeToLog方法

    /** Write an update to the tracker to the write ahead log */

    private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

    if (isWriteAheadLogEnabled) {

    logTrace(s"Writing record: $record")

    try {

    writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

    clock.getTimeMillis())

    true

    } catch {

    case NonFatal(e) =>

    logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)

    false

    }

    } else {

    true

    }

    }

    调用ReceiverBlockTracker的getReceivedBlockQueue方法,其中streamIdToUnallocatedBlockQueues为HashMap,Key为StreamID,Value为ReceivedBlockQueue。而ReceivedBlockQueue 的定义为private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

    /** Get the queue of received blocks belonging to a particular stream */

    private defgetReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {

    // 保存到对应StreamID的ReceivedBlockQueue中

    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)

    }

    ReceivedBlockTracker类,可以从源码中看出,他会记录所有接收到的Block信息,根据需要把Block分配给Batch。如果设置了checkpoint,开启WAL,则会把所有的操作保存到预写日志中,因此当Driver失败后就可以从checkpoint和WAL中恢复ReceiverTracker的状态。

    private[streaming] class ReceivedBlockTracker(

    conf: SparkConf,

    hadoopConf: Configuration,

    streamIds: Seq[Int],

    clock: Clock,

    recoverFromWriteAheadLog: Boolean,

    checkpointDirOption: Option[String])

    extends Logging {

    private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

    //存储批处理时刻,分配到的Blocks数据。

    private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]

    ReceiverBlockTracker类中重要的方法allocateBlocksToBatch。

    /**

    * Allocate all unallocated blocks to the given batch.

    * This event will get written to the write ahead log (if enabled).

    */

    def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

    // 判断是否到下一次批处理时刻

    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {

    // 从队列中获取ReceivedBlock数据,组装成key为streamId、value为

    val streamIdToBlocks = streamIds.map { streamId =>

    (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))

    }.toMap

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    // 判断是否预写日志

    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

    // 数据存储到timeToAllocatedBlocks中

    timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

    lastAllocatedBatchTime = batchTime

    } else {

    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    } else {

    // This situation occurs when:

    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

    // possibly processed batch job or half-processed batch job need to be processed again,

    // so the batchTime will be equal to lastAllocatedBatchTime.

    // 2. Slow checkpointing makes recovered batch time older than WAL recovered

    // lastAllocatedBatchTime.

    // This situation will only occurs in recovery time.

    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    }

    该方法是被ReceiverTracker调用的。

    /** Allocate all unallocated blocks to the given batch. */

    def allocateBlocksToBatch(batchTime: Time): Unit = {

    if (receiverInputStreams.nonEmpty) {

    receivedBlockTracker.allocateBlocksToBatch(batchTime)

    }

    }

    而ReceiverTracker的allocateBlocksToBatch方法是被JobGenerator的generateJobs方法调用的。

    /** Generate jobs and perform checkpoint for the given `time`.  */

    private defgenerateJobs(time: Time) {

    // Set the SparkEnv in this thread, so that job generation code can access the environment

    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager

    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

    SparkEnv.set(ssc.env)

    Try {

    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

    graph.generateJobs(time) // generate jobs using allocated block

    } match {

    case Success(jobs) =>

    val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

    case Failure(e) =>

    jobScheduler.reportError("Error generating jobs for time " + time, e)

    }

    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

    }

    ReceiverBlockTracker类中重要的方法,getBlocksOfBatch。

    /** Get the blocks allocated to the given batch. */

    def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {

    timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)

    }

    该方法是被ReceiverTracker的getBlocksOfBatch调用。

    /** Get the blocks for the given batch and all input streams. */

    defgetBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {

    receivedBlockTracker.getBlocksOfBatch(batchTime)

    }

    ReceiverTracker的getBlocksOfBatch方法是被ReceiverInputDStream的compute方法调用的。

    /**

    * Generates RDDs with blocks received by the receiver of this stream. */

    override def compute(validTime: Time): Option[RDD[T]] = {

    val blockRDD = {

    if (validTime < graph.startTime) {

    // If this is called for any time before the start time of the context,

    // then this returns an empty RDD. This may happen when recovering from a

    // driver failure without any write ahead log to recover pre-failure data.

    new BlockRDD[T](ssc.sc, Array.empty)

    } else {

    // Otherwise, ask the tracker for all the blocks that have been allocated to this stream

    // for this batch

    val receiverTracker = ssc.scheduler.receiverTracker

    val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

    // Register the input blocks information into InputInfoTracker

    val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)

    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    // Create the BlockRDD

    createBlockRDD(validTime, blockInfos)

    }

    }

    Some(blockRDD)

    }

    总结:

          Receiver接收到数据,然后合并并存储数据之后,ReceiverSupervisorImpl会把Block的元数据汇报给ReceiverTracker内部的消息通信体ReceiverTrackerEndpoint。ReceiverTracker接收到Block的元数据信息之后,由ReceivedBlockTracker管理Block的元数据的分配,JobGenerator会将每个Batch,从ReceivedBlockTracker中获取属于该Batch的Block元数据信息来生成RDD。从设计模式来讲:ReceiverTrackerEndpoint和ReceivedBlockTracker是门面设计模式,内部实际干事情的是ReceivedBlockTracker,外部通信体或者代表者就是ReceiverTrackerEndpoint。

    备注:

    资料来源于:DT_大数据梦工厂(Spark发行版本定制)

    更多私密内容,请关注微信公众号:DT_Spark

    如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

    相关文章

      网友评论

        本文标题:Spark Streaming源码解读之Driver中的Rece

        本文链接:https://www.haomeiwen.com/subject/reszrttx.html