美文网首页Spark源码解析Spark深入学习
7 Spark Streaming源码解读之JobSchedul

7 Spark Streaming源码解读之JobSchedul

作者: 海纳百川_spark | 来源:发表于2016-05-21 10:58 被阅读281次

    本编内容根据以下例子从源码的角度解读JobScheduler的内幕实现和深度思考

    例子,代码如下

    objectNetworkWordCount { defmain(args:Array[String]) { if objectNetworkWordCount {
    
      defmain(args:Array[String]) {
        if (args.length< 2) {
          System.err.println("Usage: NetworkWordCount<hostname> <port>")
          System.exit(1)
        }
    
        val sparkConf= newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
        val ssc = newStreamingContext(sparkConf,Seconds(1))
        val lines= ssc.socketTextStream(args(0), args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
        val words= lines.flatMap(_.split(""))
        val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    1. 在Spark Streaming中,JobScheduler就像Spark Core中的DAGScheduler,JobScheduler根据用户定义的batchDuration(时间间隔,上面代码中的Seconds(1)就是时间间隔)生成job,DStreamGraph只是逻辑级别的,当他遇到时间维度,Job就变成物理级别,从而根据batchDuration不断的提交Job

    2. JobScheduler在StreamingContext实例化的时候被创建,代码如下

    private[streaming] val scheduler = new JobScheduler(this)
    

    从StreamingContext的start方法开始看JobScheduler启动,代码如下

    def start(): Unit = synchronized {
        state match {
          case INITIALIZED =>
            startSite.set(DStream.getCreationSite())
            StreamingContext.ACTIVATION_LOCK.synchronized {
              StreamingContext.assertNoOtherContextIsActive()
              try {
                validate()
    
                // Start the streaming scheduler in a new thread, so that thread local properties
                // like call sites and job groups can be reset without affecting those of the
                // current thread.
                ThreadUtils.runInNewThread("streaming-start") {
                  sparkContext.setCallSite(startSite.get)
                  sparkContext.clearJobGroup()
                  sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
                  // JobScheduler的启动
                  scheduler.start()
                }
                state = StreamingContextState.ACTIVE
              } catch {
                case NonFatal(e) =>
                  logError("Error starting the context, marking it as stopped", e)
                  scheduler.stop(false)
                  state = StreamingContextState.STOPPED
                  throw e
              }
              StreamingContext.setActiveContext(this)
            }
            shutdownHookRef = ShutdownHookManager.addShutdownHook(
              StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
            // Registering Streaming Metrics at the start of the StreamingContext
            assert(env.metricsSystem != null)
            env.metricsSystem.registerSource(streamingSource)
            uiTab.foreach(_.attach())
            logInfo("StreamingContext started")
          case ACTIVE =>
            logWarning("StreamingContext has already been started")
          case STOPPED =>
            throw new IllegalStateException("StreamingContext has already been stopped")
        }
    }
    

    通过scheduler.start()调用来启动JobScheduler,scheduler.start()放在一个线程池中调用,然后给sparkContext set了一些参数,而这些参数是线程私有的,不会影响全局的SparkContext。ThreadUtils.runInNewThread接收两个参数,以下代码做为第二个参数被传递,也就是ThreadUtils.runInNewThread中的body,这种写法是scala中的柯里化

    sparkContext.setCallSite(startSite.get)
    sparkContext.clearJobGroup()
    sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
    scheduler.start()
    

    这里启动的线程和应用程序开始定义的线程数量(.setMaster("local[2]"))没有任何关系。local[2]是task运行的并行度,这里的线程只就程序设置的需要而已

    1. 接着看JobScheduler的start方法,代码如下
    def start(): Unit = synchronized {
        if (eventLoop != null) return // scheduler has already been started
    
        logDebug("Starting JobScheduler")
        eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
          override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
          override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
        }
        // 启动JobScheduler的事件循环器
        eventLoop.start()
    
        // attach rate controllers of input streams to receive batch completion updates
        for { inputDStream <- ssc.graph.getInputStreams
              rateController <- inputDStream.rateController
        } ssc.addStreamingListener(rateController)
    
        listenerBus.start(ssc.sparkContext)
        receiverTracker = new ReceiverTracker(ssc)
        inputInfoTracker = new InputInfoTracker(ssc)
        // 启动ReceiverTracker,数据的接收逻辑从这里开始
        receiverTracker.start()
        // 启动JobGenerator,job的生成从这里开始
        jobGenerator.start()
        logInfo("Started JobScheduler")
      }
    
    1. 首先定义了一个eventLoop ,eventLoop 是一个事件循环器,根据时间间隔不断的回调processEvent(event)。
    2. 实例化ReceiverTracker,并调用receiverTracker.start()方法启动ReceiverTracker,ReceiverTracker不断的接收数据的元数据信息,并通过WAL的方式容错,然后将元数据写入到队列中
    3. 通过jobGenerator.start()启动JobGenerator,JobGenerator在JobScheduler实例化的时候被创建,代码如下
    private val jobGenerator = new JobGenerator(this)
    
    1. 跟踪jobGenerator的start()方法,定义了eventLoop并start(),然后在startFirstTime()中启动timer,代码如下
    timer.start(startTime.milliseconds)
    

    timer定时回调发送GenerateJobs消息,代码如下

    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    

    最后调用generateJobs方法,怎么产生jobs看下面的代码

    private def generateJobs(time: Time) {
        // Set the SparkEnv in this thread, so that job generation code can access the environment
        // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
        // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
        SparkEnv.set(ssc.env)
        Try {
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          graph.generateJobs(time) // generate jobs using allocated block
        } match {
          case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            // 提交jobSet
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
          case Failure(e) =>
            jobScheduler.reportError("Error generating jobs for time " + time, e)
        }
        eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
    }ss BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        // 提交jobSet
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
    }
    
    1. graph.generateJobs(time) 生成jobs,我们跟踪进去。
    2. 首先看generateJobs 代码如下
    def generateJobs(time: Time): Seq[Job] = {
        logDebug("Generating jobs for time " + time)
        val jobs = this.synchronized {
          outputStreams.flatMap { outputStream =>
            val jobOption = outputStream.generateJob(time)
            jobOption.foreach(_.setCallSite(outputStream.creationSite))
            jobOption
          }
        }
        logDebug("Generated " + jobs.length + " jobs for time " + time)
        jobs
    }
    
    1. 接着看outputStream.generateJob(time),generateJob是Dstream的方法,跟踪进去看到这里有一个runJob动作,就知道生成的job在哪提交到集群了。
    private[streaming] def generateJob(time: Time): Option[Job] = {
        getOrCompute(time) match {
          case Some(rdd) => {
            // 这时将sparkContext.runJob调用包装到了jobFunc函数中,
            val jobFunc = () => {
              val emptyFunc = { (iterator: Iterator[T]) => {} }
              context.sparkContext.runJob(rdd, emptyFunc)
            }
            Some(new Job(time, jobFunc))
          }
          case None => None
        }
    }
    

    3、回到本文开始的例子,我们跟踪一下代码
    wordCounts.print()进入Dstream的print()方法,代码如下

    def print(): Unit = ssc.withScope {  print(10)}
    

    调用print(10),代码如下

      def print(num: Int): Unit = ssc.withScope {
        def foreachFunc: (RDD[T], Time) => Unit = {
          (rdd: RDD[T], time: Time) => {
            val firstNum = rdd.take(num + 1)
            // scalastyle:off println
            println("-------------------------------------------")
            println("Time: " + time)
            println("-------------------------------------------")
            firstNum.take(num).foreach(println)
            if (firstNum.length > num) println("...")
            println()
            // scalastyle:on println
          }
        }
        foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
      }.clean(foreachFunc), displayInnerRDDOps = false)}
    

    定义了foreachFunc方法,然后调用foreachRDD,并传入foreachFunc
    foreachRDD代码如下

      private def foreachRDD(
          foreachFunc: (RDD[T], Time) => Unit,
          displayInnerRDDOps: Boolean): Unit = {
        new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
      }
    

    这里实例化了一个ForEachDStream,并传入foreachFunc函数,看ForEachDStream的代码

    private[streaming]
    class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
    )extends DStream[Unit](parent.ssc) {
    
        override def dependencies: List[DStream[_]] = List(parent)
        
        override def slideDuration: Duration = parent.slideDuration
        
        override def compute(validTime: Time): Option[RDD[Unit]] = None
        
        override def generateJob(time: Time): Option[Job] = {
        parent.getOrCompute(time) match {
            case Some(rdd) =>
                val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
                  foreachFunc(rdd, time)
                }
                Some(new Job(time, jobFunc))
              case None => None
            }
        }
    }
    

    ForEachDStream继承了Dstream,并重写了generateJob方法。看到这里回想上面调用outputStream.generateJob(time)方法,是不是流程打通了。

    1. 接着回到第4段的代码 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)),调用 JobScheduler的submitJobSet方法,将生成的jobs封装到JobSet提交,看submitJobSet的代码
    def submitJobSet(jobSet: JobSet) {
        if (jobSet.jobs.isEmpty) {
          logInfo("No jobs added for time " + jobSet.time)
        } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
          //以时间为key来保存jobSet
          jobSets.put(jobSet.time, jobSet)
          // 将job封装到JobHandler,提交每一个job,其实jobExecutor.execute是运行一个线程
          jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)
        }
    }
    

    这里实例化了一个JobHandler来封装job,JobHandler其实就是一个实现Runnable接口的类,将JobHandler交给线程池运行,其他就是执行JobHandler的run方法。JobHandler的代码如下

    private class JobHandler(job: Job) extends Runnable with Logging {
        import JobScheduler._
    
        def run() {
          try {
            val formattedTime = UIUtils.formatBatchTime(job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
            val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
            val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
    
            ssc.sc.setJobDescription(
              s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
            ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
            ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
    
            // We need to assign `eventLoop` to a temp variable. Otherwise, because
            // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
            // it's possible that when `post` is called, `eventLoop` happens to null.
            var _eventLoop = eventLoop
            if (_eventLoop != null) {
              _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
              // Disable checks for existing output directories in jobs launched by the streaming
              // scheduler, since we may need to write output to an existing directory during checkpoint
              // recovery; see SPARK-4835 for more details.
              PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
                // run方法中包含了job的提交函数,触发sparkContext.runJob,真正的提交job
                job.run()
              }
              _eventLoop = eventLoop
              if (_eventLoop != null) {
                _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
              }
            } else {
              // JobScheduler has been stopped.
            }
          } finally {
            ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
            ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
          }
        }
    }
    

    看关键的一行代码 job.run(),run方法代码如下

    def run() {  _result = Try(func())}
    

    这里的func()函数,就是上面Dtream中print(10)的函数,至此所有的流程就全通了。

    1. 回到JobScheduler中看这行代码,定义了提交JobSet线程池的线程数
    private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
    

    默认线程数为1,如果应用程序中有多个输出就会生成多个outputDstrem,每个batchDuration就会产生多个job,如果想同时将多个job提交到集群运行就需要开辟多条线程,调整spark.streaming.concurrentJobs参数,根据outputDstrem的数量把线程数调整到合适的值。

    相关文章

      网友评论

        本文标题:7 Spark Streaming源码解读之JobSchedul

        本文链接:https://www.haomeiwen.com/subject/agonrttx.html