美文网首页
Spark Streaming 数据计算阶段分析

Spark Streaming 数据计算阶段分析

作者: 荒湖 | 来源:发表于2017-01-13 10:17 被阅读0次

    —————☼—————☼—————☼—————☼—————☼—————
    Spark Streaming概述
    Spark Streaming 初始化过程
    Spark Streaming Receiver启动过程分析
    Spark Streaming 数据准备阶段分析(Receiver方式)
    Spark Streaming 数据计算阶段分析
    SparkStreaming Backpressure分析
    Spark Streaming Executor DynamicAllocation 机制分析

    —————☼—————☼—————☼—————☼—————☼—————

    SparkStreaming的全过程分为两个阶段:数据准备阶段和数据计算阶段。两个阶段在功能上相互独立,仅通过数据联系在一起。"Spark Streaming 数据准备阶段分析"从源码角度分析了Spark Streaming数据准备阶段的具体流程。本文将从源码的角度对数据计算阶段的具体流程进行分析。
    Spark Streaming数据计算阶段包含批次数据划分,批作业生成,批wt提交三个部分。

    1、 JobGenerator 启动

    JobGenerator用于定期生成Job并进行提交 。"Spark Streaming 初始化过程分析"中提到,在启动JobScheduler时,其会调用JobGenerator的start方法,启动JobGenerator.
    JobGenerator的start方法实现如下:

      /** Start generation of jobs */
      def start(): Unit = synchronized {
        if (eventLoop != null) return // generator has already been started
    
        // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
        // See SPARK-10125
        checkpointWriter
    
        eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
          override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
    
          override protected def onError(e: Throwable): Unit = {
            jobScheduler.reportError("Error in job generator", e)
          }
        }
        eventLoop.start()
    
        if (ssc.isCheckpointPresent) {
          restart()
        } else {
          startFirstTime()
        }
      }
    

    通过分析上述代码可知,在JobGenerator.start()被调用时,其将创建

    • eventLoop对象并启动,其中eventLoop定义事件交由processEvent(event).
      processEvent其依据事件的类型,对其进行不同的处理。
    • 调用startFirstTime()方法。通过分析startFirstTime的实现逻辑,可知其进行两项主要工作:
    • 调用 timer.start方法、 定期生成Job
    • 调用graph.start方法
      /** Starts the generator for the first time */
      private def startFirstTime() {
        val startTime = new Time(timer.getStartTime())
        graph.start(startTime - graph.batchDuration)
        timer.start(startTime.milliseconds)
        logInfo("Started JobGenerator at " + startTime)
      }
    

    下面我分别对这二者进行解析。

    2、Job 生成及提交

    2.1 周期性触发Job生成事件

    startFirstTime()方法中调用了timer.start方法,其中timer[RecurringTimer]为定时器,与Spark Streaming 数据准备阶段分析一文中介绍切片时所有定时器一样。其按设置的时间周期,重复的执行计划的任务。此处Timer的具体实现为:

      private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
        longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    

    其每个batchDuration规定时间,都会向eventLoop发送一GenerateJobs事件,eventLoop收到GenerateJobs事件,则使用processEvent进行相应处理,此处为调用 generateJobs()方法 ,生成job.

      /** Processes all events */
      private def processEvent(event: JobGeneratorEvent) {
        logDebug("Got event " + event)
        event match {
          case GenerateJobs(time) => generateJobs(time)
          case ClearMetadata(time) => clearMetadata(time)
          case DoCheckpoint(time, clearCheckpointDataLater) =>
            doCheckpoint(time, clearCheckpointDataLater)
          case ClearCheckpointData(time) => clearCheckpointData(time)
        }
      }
    

    2.2 Job生成详细过程

    上文搞到Timer分周期性的触发Job生成事件,并通过generateJobs来生成Job.
    JobGenerator在每个Batch Interval都会为应用中的每个Output Stream建立一个Job, 该批次中的所有Job组成一个JobSet.使用JobScheduler的submitJobSet进行批量Job提交。
    下面来分析generateJobs的实现逻辑。

      /** Generate jobs and perform checkpointing for the given `time`.  */
      private def generateJobs(time: Time) {
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
        Try {
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          graph.generateJobs(time) // generate jobs using allocated block
        } match {
          case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
          case Failure(e) =>
            jobScheduler.reportError("Error generating jobs for time " + time, e)
            PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
        }
        eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
      }
    
    

    在generateJobs生成Job时, 其首先通过ReceiverTracker 取得其中注册的未分配的数据信息。然后通过DStreamGraph生成Job。

    2.2.1 批数据信息划分

    本部分会将Spark Streaming 数据准备阶段分析 中生成的未分配的block,划归到某个批次进行处理。具体过程如下:

    在生成Job时,首先调用如下语句:

    jobScheduler.receiverTracker.allocateBlocksToBatch(time)
    

    该语句用来划分某批次(time)要处理的数据。下面对其获取过程进行详说说明。

    其中allocateBlocksToBatch的实现如下:

    
      /** Allocate all unallocated blocks to the given batch. */
      def allocateBlocksToBatch(batchTime: Time): Unit = {
        if (receiverInputStreams.nonEmpty) {
          receivedBlockTracker.allocateBlocksToBatch(batchTime)
        }
      }
    

    其将调用receivedBlockTracker的allocateBlocksToBatch方法,将未分配数据信息取出,并划分给batchTime所指批次。首先receivedBlockTracker从streamIdToUnallocatedBlockQueues中取出未分配的block信息,将其包装为AllocatedBlocks,并注册在timeToAllocatedBlocks表中,等待某批次(batchTime)生成Job时,与Job进行绑定。

      /**
       * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
       */
      def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
        if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
          val streamIdToBlocks = streamIds.map { streamId =>
              (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
          }.toMap
          val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
          if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
            timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
            lastAllocatedBatchTime = batchTime
          } else {
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
          }
        } else {
          // This situation occurs when:
          // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
          // possibly processed batch job or half-processed batch job need to be processed again,
          // so the batchTime will be equal to lastAllocatedBatchTime.
          // 2. Slow checkpointing makes recovered batch time older than WAL recovered
          // lastAllocatedBatchTime.
          // This situation will only occurs in recovery time.
          logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
        }
      }
    

    2.2.2 批作业(Job)生成

    通过graph.generateJobs(time)方法分别将DStreamGraph中的每个OutputStream转换了一个Job(如果应用中有多个OutputStream算子,则一个批次会生成多个Job)。generateJobs实现逻辑如下:

    def generateJobs(time: Time): Seq[Job] = {
        logDebug("Generating jobs for time " + time)
        val jobs = this.synchronized {
          outputStreams.flatMap { outputStream =>
            val jobOption = outputStream.generateJob(time)
            jobOption.foreach(_.setCallSite(outputStream.creationSite))
            jobOption
          }
        }
        logDebug("Generated " + jobs.length + " jobs for time " + time)
        jobs
      }
    

    通过分析源码,其将逐个调用OutputStream的generateJob方法来将每个OutputStream转化为Job. OutputStream不同于其它DStream的地方为其重写了generateJob方法, 以WordCount程序中使用的print算子中的ForEachDStream为例,其 generateJob实现如下:

    override def generateJob(time: Time): Option[Job] = {
        parent.getOrCompute(time) match {
          case Some(rdd) =>
            val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
              foreachFunc(rdd, time)
            }
            Some(new Job(time, jobFunc))
          case None => None
        }
      }
    

    通过程序,可以看出,其将调用父DStream中的getOrCompute方法,生成RDD,然后包装成Job。

    2.2.2.1 RDD 生成

    以WordCount为例,先来看一下WordCount应用中DStream的转换,转换关系如下:


    WordCount应用中DStream转换关系

    通过分析,getOrCompute( compute方法与之类似)方法由DStream基类创建, 如果子类重写该方法,则执行子类方法; 若未重写,则执行基类中的方法。通过查看上述转换关系链中ForEachDStream的父亲-ShuffledDStream,发现其未重写getOrCompute方法,因此将使用继承自基类DStream中的getOrCompute, 代码如下。

      /**
       * Get the RDD corresponding to the given time; either retrieve it from cache
       * or compute-and-cache it.
       */
      private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
        // If RDD was already generated, then retrieve it from HashMap,
        // or else compute the RDD
        generatedRDDs.get(time).orElse {
          // Compute the RDD if time is valid (e.g. correct time in a sliding window)
          // of RDD generation, else generate nothing.
          if (isTimeValid(time)) {
    
            val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
              // Disable checks for existing output directories in jobs launched by the streaming
              // scheduler, since we may need to write output to an existing directory during checkpoint
              // recovery; see SPARK-4835 for more details. We need to have this call here because
              // compute() might cause Spark jobs to be launched.
              SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
                compute(time)
              }
            }
    
            rddOption.foreach { case newRDD =>
              // Register the generated RDD for caching and checkpointing
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
                logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
              }
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
                logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
              }
              generatedRDDs.put(time, newRDD)
            }
            rddOption
          } else {
            None
          }
        }
      }
    

    此代码将调用ShuffledDStream的compute生成RDD,其compute实现为:

    override def compute(validTime: Time): Option[RDD[(K, C)]] = {
        parent.getOrCompute(validTime) match {
          case Some(rdd) => Some(rdd.combineByKey[C](
              createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
          case None => None
        }
      }
    

    通过分析,其将调用其父DStream的compute方法,其父DStream继续递归向上调用父DStream的compute直到源头DStream(SocketInputDStream),
    SocketInputDStream的compute方法继承自ReceiverInputDStream,其compute方法将生成源头RDD,并按DStream递归逆向生成RDD Graph.

    ReceiverInputDStream定义的compute的实现如下:

     /**
       * Generates RDDs with blocks received by the receiver of this stream. */
      override def compute(validTime: Time): Option[RDD[T]] = {
        val blockRDD = {
    
          if (validTime < graph.startTime) {
            // If this is called for any time before the start time of the context,
            // then this returns an empty RDD. This may happen when recovering from a
            // driver failure without any write ahead log to recover pre-failure data.
            new BlockRDD[T](ssc.sc, Array.empty)
          } else {
            // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
            // for this batch
            val receiverTracker = ssc.scheduler.receiverTracker
            val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
    
            // Register the input blocks information into InputInfoTracker
            val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
            ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    
            // Create the BlockRDD
            createBlockRDD(validTime, blockInfos)
          }
        }
        Some(blockRDD)
      }
    
    

    此处,通过如下逻辑

    val receiverTracker = ssc.scheduler.receiverTracker 
    val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
    

    将 2.2.1 节中提到的划分过批次的数据信息(blockInfos)取出,包装成StreamInputInfo,然后通过createBlockRDD方法生成RDD. 此处,如果blockInfos信息不空,则生成正常的RDD;若blockInfos为空,则没有Block的空RDD(new BlockRDD(ssc.sc, Array.empty))。

    2.2.3 Job 的提交

    当成功转化为Job之后,然后通过JobScheduler对JobSet进行提交。

    case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    

    其中submitJobSet方法实现如下:

      def submitJobSet(jobSet: JobSet) {
        if (jobSet.jobs.isEmpty) {
          logInfo("No jobs added for time " + jobSet.time)
        } else {
          listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
          jobSets.put(jobSet.time, jobSet)
          jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)
        }
      }
    
    • 首先将JobSet加入JobSets表中,以便监控系统可以追踪。
    • 将Job通过JobHandler进行包装,然后由ThreadPoolExecutor的execute增加到其workQueue中,等待被调度执行。如果线程池有空闲线程,则其将被调度。(此部分为Java并发编程中Executor的相关内容。)
      其中线程池的定义如下所示:
      private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
      private val jobExecutor =
        ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
    

    通过分析代码可知,JobScheduler创建一固定长度的daemon线程池jobExecutor ,大小由“spark.streaming.concurrentJobs”,默认为1。 线程池中有多个线程则可以同时执行多少个Job, 默认情况下每次只能提交一个Job。当Job来不及执行时,会产生堆集,堆集的Job会保存在ThreadPoolExecutor中的workQueue队列中,等待有空闲线程时被调度。

    • JobHandler是ThreadPoolExecutor中Executor运行的主要任务,其功能是对提交的Job进行处理,实现如下, 其将通过EventLoop对Job状态进行管理,并通过调用job.run方法,使用Job开始运行。
        def run() {
          val oldProps = ssc.sparkContext.getLocalProperties
          try {
            ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
            val formattedTime = UIUtils.formatBatchTime(
              job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
            val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
            val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
    
            ssc.sc.setJobDescription(
              s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
            ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
            ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
            // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
            // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
            ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    
            // We need to assign `eventLoop` to a temp variable. Otherwise, because
            // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
            // it's possible that when `post` is called, `eventLoop` happens to null.
            var _eventLoop = eventLoop
            if (_eventLoop != null) {
              _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
              // Disable checks for existing output directories in jobs launched by the streaming
              // scheduler, since we may need to write output to an existing directory during checkpoint
              // recovery; see SPARK-4835 for more details.
              SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
                job.run()
              }
              _eventLoop = eventLoop
              if (_eventLoop != null) {
                _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
              }
            } else {
              // JobScheduler has been stopped.
            }
          } finally {
            ssc.sparkContext.setLocalProperties(oldProps)
          }
        }
    

    其中Job.run方法,实现如下:

     def run() {
        _result = Try(func())
      }
    

    其将执行创建Job时的方法func。WordCount应用是ForEachDStream中进行Job创建。其创建方法 上文已经提到:

    override def generateJob(time: Time): Option[Job] = {
        parent.getOrCompute(time) match {
          case Some(rdd) =>
            val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
              foreachFunc(rdd, time)
            }
            Some(new Job(time, jobFunc))
          case None => None
        }
      }
    

    分析代码可知,其将调用foreachFunc, 该方法是创建ForEachDStream时引入的参数,由print方法定义

    /**
       * Print the first num elements of each RDD generated in this DStream. This is an output
       * operator, so this DStream will be registered as an output stream and there materialized.
       */
      def print(num: Int): Unit = ssc.withScope {
        def foreachFunc: (RDD[T], Time) => Unit = {
          (rdd: RDD[T], time: Time) => {
            val firstNum = rdd.take(num + 1)
            // scalastyle:off println
            println("-------------------------------------------")
            println(s"Time: $time")
            println("-------------------------------------------")
            firstNum.take(num).foreach(println)
            if (firstNum.length > num) println("...")
            println()
            // scalastyle:on println
          }
        }
        foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
      }
    

    其中,会调用rdd.take()算子, take算子属于action算子,会触发SparkJob的提交,接下来的处理流程与spark 批处理相同。
    前述生成的Job,只是Streaming中定义的抽象,与SparkJob(真正进行调度,生成Task)不同。

    相关文章

      网友评论

          本文标题:Spark Streaming 数据计算阶段分析

          本文链接:https://www.haomeiwen.com/subject/ckxtbttx.html