美文网首页
spark streaming源码分析之ReceiverTrac

spark streaming源码分析之ReceiverTrac

作者: cclucc | 来源:发表于2019-11-17 15:08 被阅读0次

    我们关注的问题是数据是怎么被接收的?又是怎么存储的?

    数据是被executor上的线程receiver接收的,接收之后交由executor上的线程ReceiverSupervisorImpl处理。

    JobScheduler的重要成员之一登场!!ReceiverTracker!!!
    ReceiverTracker的简单介绍?

    ReceiverTracker的目的是为每个batch的RDD提供输入数据。通过以下三步完成:

    1. 分发receiver到executor,启动接收的线程。
    2. 分发ReceiverSupervisorImpl到executor,启动处理数据的线程,并掌握数据的信息
    3. 一个job提交了,它是怎么为其提供数据进行etl的?

    ++首先看下Receiver是怎么被分发到各个executor上的++

    def start(): Unit = synchronized {
        //....
    
        if (!receiverInputStreams.isEmpty) {
          endpoint = ssc.env.rpcEnv.setupEndpoint(
            "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))//用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息
          if (!skipReceiverLaunch) launchReceivers() //重要!考点!!!将receiver分发到executers
          //.....
        }
      }
    
    //来!具体来看launchReceivers
    private def launchReceivers(): Unit = {
        val receivers = receiverInputStreams.map {...}//DStreamGraph持有所有的inputDS,获取到这些inputDS的receiver
    
        
        endpoint.send(StartAllReceivers(receivers))//拿到receivers后分发的具体实现
    }
    
    override def receive: PartialFunction[Any, Unit] = {
          // 确定了每个 receiver 要分发到哪些 executors 
          case StartAllReceivers(receivers) =>
            val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
            for (receiver <- receivers) {
              val executors = scheduledLocations(receiver.streamId)
              updateReceiverScheduledExecutors(receiver.streamId, executors)
              receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
              startReceiver(receiver, executors)
            }
          //.....  
    }
    
    private def startReceiver(
            receiver: Receiver[_],
            scheduledLocations: Seq[TaskLocation]): Unit = {
    
          // Function to start the receiver on the worker node
          //重点!考点!!这个函数会和rdd一起提交,它new了一个ReceiverSupervisorImpl用来具体处理接收的数据,后面会具体讲!!
          val startReceiverFunc: Iterator[Receiver[_]] => Unit =
            (iterator: Iterator[Receiver[_]]) => {
              
              if (TaskContext.get().attemptNumber() == 0) {
                val receiver = iterator.next()
                val supervisor = new ReceiverSupervisorImpl(
                  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//真正处理接收到的数据
                supervisor.start()//启动线程
                supervisor.awaitTermination()//重要!堵塞线程,源源不断的从reciver处获取数据!
              }
            }
    
          // Create the RDD using the scheduledLocations to run the receiver in a Spark job
          //重点!考点!!这里把recever和location打包成一个rdd了,所以recevier可以在多个executor上运行!!!
          val receiverRDD: RDD[Receiver[_]] =
            if (scheduledLocations.isEmpty) {
              ssc.sc.makeRDD(Seq(receiver), 1)
            } else {
              val preferredLocations = scheduledLocations.map(_.toString).distinct
              ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
            }
          
          //.....
    
         //提交啦!⚠️ 到这里recevier就被分发到具体的executor上了
          val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
            receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
          
          //....
        }
      
    

    ++来,再看一下具体在executor上是怎么实现处理数据的?++

    第一部分,怎么接收数据?

    recevier被分发到具体的executor上之后会怎么实现数据的处理呢?reciver会调用supervisor的put方法!!!也就是说recevier其实只关心从哪儿接数据以及数据接过来怎么解析,而并不关心数据怎么存!!!谁在用!!!

    //先看下recevier怎么把数据给ReceiverSupervisorImpl,比如KafkaReceiver
    class KafkaReceiver(....) extends Receiver[(K, V)](storageLevel) with Logging {
    
      def onStart() {
    
      
        //去哪儿接收数据
        // Kafka connection properties
        // Create the connection to the cluster
    
        //接收到的数据怎么解析
        val keyDecoder = ...
        val valueDecoder = ...
    
    
        //线程池接收数据
        val executorPool = ...
        topicMessageStreams.values.foreach { streams =>
            streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
      }
    
      // 处理接收到的数据,store!!!这里会调用supervisor.pushSingle!!!!
      private class MessageHandler(stream: KafkaStream[K, V])
        extends Runnable {
        def run() {
          val streamIterator = stream.iterator()
            while (streamIterator.hasNext()) {
              val msgAndMetadata = streamIterator.next()
              store((msgAndMetadata.key, msgAndMetadata.message))
            }
        }
      }
    }
    
    

    第二部分,那么数据接过来了,怎么存储呢?这里是ReceiverSupervisorImpl实现的,主要有三个方法:

    //put类,会把一条条的数据交给BlockGenerator,汇聚成block
    def pushSingle(data: Any) {
        defaultBlockGenerator.addData(data)
    }
    
    
    def pushAndReportBlock(
          receivedBlock: ReceivedBlock,
          metadataOption: Option[Any],
          blockIdOption: Option[StreamBlockId]
        ) {
        
        //存储block的具体逻辑
        val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
        
        //存储成功之后,发送新增的blockInfo到ReceiverTracker
        val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
        trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
        
    }
    
    //把每个block通过blockManager存到内存/硬盘,同rdd逻辑一致
    private val receivedBlockHandler: ReceivedBlockHandler = {
        if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
         //wal,重点!预写!!防丢数据
          new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
            receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
        } else {
          new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
        }
      }
    
    

    第三部分,数据怎么被用呢?数据被存储之后告知了ReceiverTracker,但是怎么用呢?

    //ReceiverTracker自己是不管block的,它有一个成员receivedBlockTracker来处理!它是个老板!!!
    private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
        receivedBlockTracker.addBlock(receivedBlockInfo)
    }
    
    
    //注意⚠️定时器JobGenerate在定时提交job的时候会调用ReceiverTracker的allocateBlocksToBatch方法来把block和batch对应起来,可以看到block怎么被分配到batch这个过程是receivedBlockTracker处理的!!
    def allocateBlocksToBatch(batchTime: Time): Unit = {
        if (receiverInputStreams.nonEmpty) {
          receivedBlockTracker.allocateBlocksToBatch(batchTime)
        }
      }
    

    关于数据被存储之后,是怎么和rdd关联起来的,更多的内容在spark streaming源码分析之job、rdd、blocks之间是如何对应的?

    相关文章

      网友评论

          本文标题:spark streaming源码分析之ReceiverTrac

          本文链接:https://www.haomeiwen.com/subject/ilasictx.html