美文网首页Spark源码解析Spark深入学习
6 Spark Streaming 中Job的动态生成

6 Spark Streaming 中Job的动态生成

作者: 海纳百川_spark | 来源:发表于2016-05-18 12:52 被阅读292次
    1. 本文内容以以Socket数据来源为例,通过WordCount计算来跟踪Job的生成
      代码如下:
        objectNetworkWordCount {
          defmain(args:Array[String]) {
            if (args.length< 2) {
              System.err.println("Usage:NetworkWordCount<hostname> <port>")
              System.exit(1)
            }
            val sparkConf= newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
            val ssc = newStreamingContext(sparkConf,Seconds(1))
            val lines= ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
            val words= lines.flatMap(_.split(""))
            val wordCounts= words.map(x => (x,1)).reduceByKey(_+ _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
          }
        }
    
    1. 从ssc.start()开始看,在start方法中调用了scheduler的start()方法,这里的scheduler就是
      JobScheduler,我们看start的代码
    def start(): Unit = synchronized {
        if (eventLoop != null) return // scheduler has already been started
        
        logDebug("Starting JobScheduler")
        eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
        override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
        override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
        }
        // 启动JobScheduler的事件循环器
        eventLoop.start()
        
        // attach rate controllers of input streams to receive batch completion updates
        for { inputDStream <- ssc.graph.getInputStreams
        rateController <- inputDStream.rateController
        } ssc.addStreamingListener(rateController)
        
        listenerBus.start(ssc.sparkContext)
        receiverTracker = new ReceiverTracker(ssc)
        inputInfoTracker = new InputInfoTracker(ssc)
        // 启动ReceiverTracker,数据的接收逻辑从这里开始
        receiverTracker.start()
        // 启动JobGenerator,job的生成从这里开始
        jobGenerator.start()
        logInfo("Started JobScheduler")
    }
    

    Spark Streaming由JobScheduler、ReceiverTracker、JobGenerator三大组件组成,其中ReceiverTracker、
    JobGenerator包含在JobScheduler中。这里分别执行三大组件的start方法。

    1. 我们先看Job的生成,jobGenerator.start()方法。在JobGenerator的start方法中都做了什么,继续往下看。
      首先启动了一个EventLoop并来回调processEvent方法,那么什么时候会触发回调呢,来看一下EventLoop的内部结构
     private[spark] abstract class EventLoop\[E](name: String) extends Logging {
    
      //线程安全的阻塞队列
      private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque\[E]()
    
      private val stopped = new AtomicBoolean(false)
    
      private val eventThread = new Thread(name) {
        //后台线程
        setDaemon(true)
    
        override def run(): Unit = {
          try {
            while (!stopped.get) {
              val event = eventQueue.take()
              try {
                //回调子类的onReceive方法,就是事件的逻辑代码
                onReceive(event)
              } catch {
                case NonFatal(e) => {
                  try {
                    onError(e)
                  } catch {
                    case NonFatal(e) => logError("Unexpected error in " + name, e)
                  }
                }
              }
            }
          } catch {
            case ie: InterruptedException => // exit even if eventQueue is not empty
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
      }
    
      def start(): Unit = {
        if (stopped.get) {
          throw new IllegalStateException(name + " has already been stopped")
        }
        // Call onStart before starting the event thread to make sure it happens before onReceive
        onStart()
        // 启动事件循环器
        eventThread.start()
      }
    
      def stop(): Unit = {
        // stopped.compareAndSet(false, true) 判断是否为false,同时赋值为true
        if (stopped.compareAndSet(false, true)) {
         eventThread.interrupt()
          var onStopCalled = false
          try {
            eventThread.join()
            // Call onStop after the event thread exits to make sure onReceive happens before onStop
            onStopCalled = true
            onStop()
          } catch {
            case ie: InterruptedException =>
              Thread.currentThread().interrupt()
              if (!onStopCalled) {
                // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
                // it's already called.
                onStop()
              }
          }
        } else {
          // Keep quiet to allow calling `stop` multiple times.
        }
      }
    
      def post(event: E): Unit = {
        eventQueue.put(event)
      }
    
      def isActive: Boolean = eventThread.isAlive
    
      protected def onStart(): Unit = {}
    
      protected def onStop(): Unit = {}
    
      protected def onReceive(event: E): Unit
    
      protected def onError(e: Throwable): Unit
    
     }
    

    在EventLoop内部其实是维护了一个队列,开辟了一条后台线程来回调实现类的onReceive方法。
    那么是什么时候把事件放入EventLoop的队列中呢,就要找EventLoop的post方法了。在JobGenerator实例化的时
    候创建了一个RecurringTimer,代码如下:

     private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
      // 回调 eventLoop.post(GenerateJobs(new Time(longTime)))将GenerateJobs事件放入事件循环器
      longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    

    RecurringTimer就是一个定时器,看一下他的构造参数和内部代码,
    * @param clock 时钟
    * @param period 间歇时间
    * @param callback 回调方法
    * @param name 定时器的名称
    很清楚的知道根据用户传入的时间间隔,周期性的回调callback方法。Callback就是前面看到的

    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    

    将GenerateJobs事件提交到EventLoop的队列中,此时RecurringTimer还没有执行。
    回到JobGenerator中的start方法向下看,因为是第一次运行,所以调用了startFirstTime方法。
    在startFirstTime方法中,有一行关键代码timer.start(startTime.milliseconds),终于看到了定时器的启动

    1. 从定时器的start方法开始往回看,周期性的回调eventLoop.post方法将GenerateJobs事件发送到EvenLoop的队列,然后回调rocessEvent方法,看generateJobs(time)。
      generateJobs代码如下
    private def generateJobs(time: Time) {
      // Set the SparkEnv in this thread, so that job generation code can access the environment
      // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
      // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
      SparkEnv.set(ssc.env)
      Try {
        jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
        graph.generateJobs(time) // generate jobs using allocated block
      } match {
        case Success(jobs) =>
          // 获取元数据信息
          val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
          // 提交jobSet
          jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
        case Failure(e) =>
          jobScheduler.reportError("Error generating jobs for time " + time, e)
      }
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
    }
    进入graph.generateJobs(time) ,调用每一个outputStream的generateJob方法,generateJob代码如下
    private[streaming] def generateJob(time: Time): Option[Job] = {
      getOrCompute(time) match {
        case Some(rdd) => {
          // jobRunc中包装了runJob的方法
          val jobFunc = () => {
            val emptyFunc = { (iterator: Iterator[T]) => {} }
            context.sparkContext.runJob(rdd, emptyFunc)
          }
          Some(new Job(time, jobFunc))
        }
        case None => None
      }
    } 
    

    getOrCompute返回一个RDD,RDD的生成以后再说,定义了一个函数jobFunc,可以看到函数的作用是提交job,
    把jobFunc封装到Job对象然后返回。

    1. 返回的是多个job,jobs生成成功后提交JobSet,代码如下
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      然后分别提交每一个job,把job包装到JobHandler(Runnable子类)交给线程池运行,执行JobHandler的run
      方法,调用job.run(),在Job的run方法中就一行,执行Try(func()),这个func()函数就是上面代码中
      的jobFunc,看到这里整个Job的生成与提交就连通了。

    2. 下面附上一张Job动态生成流程图

    以上内容如有错误,欢迎指正

    相关文章

      网友评论

        本文标题:6 Spark Streaming 中Job的动态生成

        本文链接:https://www.haomeiwen.com/subject/uzkwrttx.html