美文网首页
spark streaming源码分析之JobScheduler

spark streaming源码分析之JobScheduler

作者: cclucc | 来源:发表于2019-11-17 15:08 被阅读0次

    jobGenerator做了哪些事情呢?

    持有一个定时器实例
    private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
        longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    

    这个定时器会依据batchDuration提交GenerateJobs消息,也就是说每隔一个batch生成一组job

    有一个方法接收GenerateJobs消息,并且执行
    /** Processes all events */
      private def processEvent(event: JobGeneratorEvent) {
        logDebug("Got event " + event)
        event match {
          case GenerateJobs(time) => generateJobs(time)//当前重点!!!
          //下面是考点,后面会讲,重要的!!!
          case ClearMetadata(time) => clearMetadata(time)
          case DoCheckpoint(time, clearCheckpointDataLater) =>
            doCheckpoint(time, clearCheckpointDataLater)
          case ClearCheckpointData(time) => clearCheckpointData(time)
        }
      }
    
    /** Generate jobs and perform checkpointing for the given `time`.  */
      private def generateJobs(time: Time) {
        //....
        Try {
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // receiverTracker出现了!它来了!它把recevier接收到的数据block分配给具体的batch,上面讲啦!
          graph.generateJobs(time) //DStreamGraph通过其持有outputstreams来
        } match {
          case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
          case Failure(e) =>
            do something
        }
        eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
      }
    

    jobScheduler.receiverTracker.allocateBlocksToBatch(time)
    graph.generateJobs(time)
    spark streaming源码分析之job、rdd、blocks之间是如何对应的?会详细解析这一段代码

    相关文章

      网友评论

          本文标题:spark streaming源码分析之JobScheduler

          本文链接:https://www.haomeiwen.com/subject/arasictx.html