美文网首页spark玩转大数据大数据
Spark Streaming源码解读之流数据不断接收全生命周期

Spark Streaming源码解读之流数据不断接收全生命周期

作者: 阳光男孩spark | 来源:发表于2016-05-24 14:05 被阅读155次

    Spark Streaming应用程序有以下特点:

    1. 不断持续接收数据

    2.  Receiver和Driver不在同一节点中

           Spark Streaming应用程序接收数据、存储数据、汇报数据的metedata给Driver。数据接收的模式类似于MVC,其中Driver是Model,Receiver是View,ReceiverSupervisorImpl是Controller。Receiver的启动由ReceiverSupervisorImpl来控制,Receiver接收到数据交给ReceiverSupervisorImpl来存储。RDD中的元素必须要实现序列化,才能将RDD序列化给Executor端。Receiver就实现了Serializable接口。

    ReceiverTracker的代码片段:

    // Create the RDD using the scheduledLocations to run the receiver in a Spark job

    val receiverRDD: RDD[Receiver[_]] =

    if (scheduledLocations.isEmpty) {

    ssc.sc.makeRDD(Seq(receiver), 1)

    } else {

    val preferredLocations = scheduledLocations.map(_.toString).distinct

    ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

    }

    Receiver的代码片段:

    @DeveloperApi

    abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

    处理Receiver接收到的数据,存储数据并汇报给Driver,Receiver是一条一条的接收数据的。

    /**

    * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]

    * which provides all the necessary functionality for handling the data received by

    * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]

    * object that is used to divide the received data stream into blocks of data.

    */

    private[streaming] class ReceiverSupervisorImpl(

    receiver: Receiver[_],

    env: SparkEnv,

    hadoopConf: Configuration,

    checkpointDirOption: Option[String]

    ) extends ReceiverSupervisor(receiver, env.conf) with Logging {

    通过限定数据存储速度来实现限流接收数据,合并成buffer,放入block队列在ReceiverSupervisorImpl启动会调用BlockGenerator对象的start方法。

    override protected def onStart() {

    registeredBlockGenerators.foreach { _.start() }

    ...

    private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]

    with mutable.SynchronizedBuffer[BlockGenerator]

          源码注释说明了BlockGenerator把一个Receiver接收到的数据合并到一个Block然后写入到BlockManager中。该类内部有两个线程,一个是周期性把数据生成一批对象,然后把先前的一批数据封装成Block。另一个线程时把Block写入到BlockManager中。

    private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)

    BlockGenerator类继承自RateLimiter类,说明我们不能限定接收数据的速度,但是可以限定存储数据的速度,转过来就限定流动的速度。

    BlockGenerator类有一个定时器(默认每200ms将接收到的数据合并成block)和一个线程(把block写入到BlockManager),200ms会产生一个Block,即1秒钟生成5个Partition。太小则生成的数据片中数据太小,导致一个Task处理的数据少,性能差。实际经验得到不要低于50ms。

    BlockGenerator代码片段:

    private val blockIntervalTimer =

    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

    ...

    private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

    那BlockGenerator是怎么被创建的?

    private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)

    ...

    override def createBlockGenerator(

    blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {

    // Cleanup BlockGenerators that have already been stopped

    registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }

    val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)

    registeredBlockGenerators += newBlockGenerator

    newBlockGenerator

    }

    BlockGenerator类中的定时器会回调updateCurrentBuffer方法。

    Receiver不断的接收数据,BlockGenerator类通过一个定时器,把Receiver接收到的数据,把多条合并成Block,再放入到Block队列中。

    /** Change the buffer to which single records are added to. */

    private def updateCurrentBuffer(time: Long): Unit = {

    try {

    var newBlock: Block = null

    // 不同线程都会访问currentBuffer,故需加锁

    synchronized {

    // 如果缓冲器不为空,则生成StreamBlockId对象,

    // 调用listener的onGenerateBlock来通知Block已生成,

    // 再实例化block对象。

    if (currentBuffer.nonEmpty) {

    val newBlockBuffer = currentBuffer

    currentBuffer = new ArrayBuffer[Any]

    val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

    listener.onGenerateBlock(blockId)

    newBlock = new Block(blockId, newBlockBuffer)

    }

    }

    // 最后,把Block对象放入

    if (newBlock != null) {

    blocksForPushing.put(newBlock)  // put is blocking when queue is full

    }

    } catch {

    case ie: InterruptedException =>

    logInfo("Block updating timer thread was interrupted")

    case e: Exception =>

    reportError("Error in block updating thread", e)

    }

    }

    该函数200ms回调一次,可以设置,但不能小于50ms。

    运行在Executor端的ReceiverSupervisorImpl需要与Driver端的ReceiverTracker进行通信,传递元数据信息metedata,其中ReceiverSupervisorImpl通过RPC的名称获取到ReceiverTrcker的远程调用。

    ReceiverSupervisorImpl代码片段:

    /** Remote RpcEndpointRef for the ReceiverTracker */

    private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)

    在ReceiverTracker调用start方法启动的时候,会以ReceiverTracker的名称创建RPC通信体。ReceiverSupervisorImpl就是和这个RPC通信体进行消息交互的。

    /** Start the endpoint and receiver execution thread. */

    def start(): Unit = synchronized {

    if (isTrackerStarted) {

    throw new SparkException("ReceiverTracker already started")

    }

    if (!receiverInputStreams.isEmpty) {

    endpoint = ssc.env.rpcEnv.setupEndpoint(

    "ReceiverTracker", newReceiverTrackerEndpoint(ssc.env.rpcEnv))

    if (!skipReceiverLaunch) launchReceivers()

    logInfo("ReceiverTracker started")

    trackerState = Started

    }

    }

    在ReceiverTrackerEndpoint接收到ReceiverSupervisorImpl发送的注册消息,把其RpcEndpoint保存起来。

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

    // Remote messages

    caseRegisterReceiver(streamId, typ, host, executorId,receiverEndpoint) =>

    val successful =

    registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)

    context.reply(successful)

    case AddBlock(receivedBlockInfo) =>

    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {

    walBatchingThreadPool.execute(new Runnable {

    override def run(): Unit = Utils.tryLogNonFatalError {

    if (active) {

    context.reply(addBlock(receivedBlockInfo))

    } else {

    throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")

    }

    }

    })

    } else {

    context.reply(addBlock(receivedBlockInfo))

    }

    case DeregisterReceiver(streamId, message, error) =>

    deregisterReceiver(streamId, message, error)

    context.reply(true)

    // Local messages

    case AllReceiverIds =>

    context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)

    case StopAllReceivers =>

    assert(isTrackerStopping || isTrackerStopped)

    stopReceivers()

    context.reply(true)

    }

    对应的Executor端的ReceiverSupervisorImpl也会创建Rpc消息通信体,来接收来自Driver端ReceiverTacker的消息。

    /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */

    private val endpoint = env.rpcEnv.setupEndpoint(

    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {

    override val rpcEnv: RpcEnv = env.rpcEnv

    override def receive: PartialFunction[Any, Unit] = {

    case StopReceiver =>

    logInfo("Received stop signal")

    ReceiverSupervisorImpl.this.stop("Stopped by driver", None)

    case CleanupOldBlocks(threshTime) =>

    logDebug("Received delete old batch signal")

    cleanupOldBlocks(threshTime)

    case UpdateRateLimit(eps) =>

    logInfo(s"Received a new rate limit: $eps.")

    registeredBlockGenerators.foreach { bg =>

    bg.updateRate(eps)

    }

    }

    })

    BlockGenerator类中的线程每隔10ms从队列中获取Block,写入到BlockManager中。

    /** Keep pushing blocks to the BlockManager. */

    private def keepPushingBlocks() {

    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {

    state != StoppedGeneratingBlocks

    }

    try {

    // While blocks are being generated, keep polling for to-be-pushed blocks and push them.

    while (areBlocksBeingGenerated) {

    Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {

    case Some(block) =>pushBlock(block)

    case None =>

    }

    }

    // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.

    logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")

    while (!blocksForPushing.isEmpty) {

    val block = blocksForPushing.take()

    logDebug(s"Pushing block $block")

    pushBlock(block)

    logInfo("Blocks left to push " + blocksForPushing.size())

    }

    logInfo("Stopped block pushing thread")

    } catch {

    case ie: InterruptedException =>

    logInfo("Block pushing thread was interrupted")

    case e: Exception =>

    reportError("Error in block pushing thread", e)

    }

    }

    ReceiverSupervisorImpl代码片段:

    /** Divides received data records into data blocks for pushing in BlockManager. */

    private val defaultBlockGeneratorListener = new BlockGeneratorListener {

    def onAddData(data: Any, metadata: Any): Unit = { }

    def onGenerateBlock(blockId: StreamBlockId): Unit = { }

    def onError(message: String, throwable: Throwable) {

    reportError(message, throwable)

    }

    defonPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {

    pushArrayBuffer(arrayBuffer, None, Some(blockId))

    }

    }

    ...

    /** Store block and report it to driver */

    defpushAndReportBlock(

    receivedBlock: ReceivedBlock,

    metadataOption: Option[Any],

    blockIdOption: Option[StreamBlockId]

    ) {

    val blockId = blockIdOption.getOrElse(nextBlockId)

    val time = System.currentTimeMillis

    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)

    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

    val numRecords = blockStoreResult.numRecords

    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)

    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

    logDebug(s"Reported block $blockId")

    }

    将数据存储在BlockManager中,并将源数据信息告诉Driver端的ReceiverTracker。

    defstoreBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

    var numRecords = None: Option[Long]

    val putResult: Seq[(BlockId, BlockStatus)] = block match {

    case ArrayBufferBlock(arrayBuffer) =>

    numRecords = Some(arrayBuffer.size.toLong)

    blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,

    tellMaster = true)

    case IteratorBlock(iterator) =>

    val countIterator = new CountingIterator(iterator)

    // 把数据写入BlockManager

    val putResult =blockManager.putIterator(blockId, countIterator, storageLevel,

    tellMaster = true)

    numRecords = countIterator.count

    putResult

    case ByteBufferBlock(byteBuffer) =>

    blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)

    case o =>

    throw new SparkException(

    s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")

    }

    if (!putResult.map { _._1 }.contains(blockId)) {

    throw new SparkException(

    s"Could not store $blockId to block manager with storage level $storageLevel")

    }

    BlockManagerBasedStoreResult(blockId, numRecords)

    }

    备注:

    资料来源于:DT_大数据梦工厂(Spark发行版本定制)

    更多私密内容,请关注微信公众号:DT_Spark

    如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

    相关文章

      网友评论

        本文标题:Spark Streaming源码解读之流数据不断接收全生命周期

        本文链接:https://www.haomeiwen.com/subject/fdszrttx.html