美文网首页
Spark Streaming 初始化过程分析

Spark Streaming 初始化过程分析

作者: 荒湖 | 来源:发表于2017-01-13 10:04 被阅读0次

    —————☼—————☼—————☼—————☼—————☼—————
    Spark Streaming概述
    Spark Streaming 初始化过程
    Spark Streaming Receiver启动过程分析
    Spark Streaming 数据准备阶段分析(Receiver方式)
    Spark Streaming 数据计算阶段分析
    SparkStreaming Backpressure分析
    Spark Streaming Executor DynamicAllocation 机制分析

    —————☼—————☼—————☼—————☼—————☼—————

    Spark Streaming是一种构建在Spark上的实时计算框架。Spark Streaming应用以Spark应用的方式提交到Spark平台,其组件以长期批处理任务的形式在Spark平台运行。这些任务主要负责接收实时数据流及定期产生批作业并提交至Spark集群,本文要说明的是以下几个功能模块运行前的准备工作。

    • 数据接收
    • Job 生成
    • 流量控制
    • 动态资源伸缩

    下面我们以WordCount程序为例分析Spark Streaming运行环境的初始化过程。

    val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]") 
    val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(sc, Seconds(10)) 
    val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY) 
    val words = lines.flatMap(_.split(" ")).map(w => (w,1)) 
    val wordCount = words.reduceByKey(_+_) 
    wordCount.print 
    ssc.start()
    ssc.awaitTermination()
    

    以下流程,皆以上述WordCount源码为例。

    1、StreamingContext的初始化过程

    StreamingContext是Spark Streaming应用的执行环境,其定义很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。
    在创建Streaming应用时,首先应创建StreamingContext(WordCount应用可知),伴随StreamingContext的创建将会创建以下主要组件:

    1.1 DStreamGraph

    DStreamGraph的主要功能是记录InputDStream及OutputStream及从InputDStream中抽取出ReceiverInputStreams。因为DStream之间的依赖关系类似于RDD,并在任务执行时转换成RDD,因此,可以认为DStream Graph与RDD Graph存在对应关系. 即:DStreamGraph以批处理间隔为周期转换成RDDGraph.

    • ReceiverInputStreams: 包含用于接收数据的Receiver信息,并在启动Receiver时提供相关信息
    • OutputStream:每个OutputStream会在批作业生成时,生成一个Job.

    1.2 JobScheduler

    JobScheduler是Spark Streaming中最核心的组件,其负载Streaming各功作组件的启动。

    • 数据接收
    • Job 生成
    • 流量控制
    • 动态资源伸缩
      以及负责生成的批Job的调度及状态管理工作。

    2、 DStream的创建与转换

    StreamingContext初始化完毕后,通过调用其提供的创建InputDStream的方法创建SocketInputDStream.

    SocketInputDStream的继承关系为:
    SocketInputDStream->ReceiverInputDStream->InputDStream->DStream.
    在InputDStream中 提供如下功

     ssc.graph.addInputStream(this)
    

    JAVA中初始化子类时,会先初始化其父类。所以在创建SocketInputDStream时,会先初始化InputDStream,在InputDStream中实现将自身加入DStreamGraph中,以标识其为输入数据源。
    DStream中算子的转换,类似于RDD中的转换,都是延迟计算,仅形成pipeline链。当上述应用遇到print(Output算子)时,会将DStream转换为ForEachDStream,并调register方法作为OutputStream注册到DStreamGraph的outputStreams列表,以待生成Job。
    print算子实现方法如下:

    /**
       * Print the first num elements of each RDD generated in this DStream. This is an output
       * operator, so this DStream will be registered as an output stream and there materialized.
       */
     def print(num: Int): Unit = ssc.withScope {
        def foreachFunc: (RDD[T], Time) => Unit = {
          (rdd: RDD[T], time: Time) => {
            val firstNum = rdd.take(num + 1)
            // scalastyle:off println
            println("-------------------------------------------")
            println(s"Time: $time")
            println("-------------------------------------------")
            firstNum.take(num).foreach(println)
            if (firstNum.length > num) println("...")
            println()
            // scalastyle:on println
          }
        }
        foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
      }
    
      /**
       * Apply a function to each RDD in this DStream. This is an output operator, so
       * 'this' DStream will be registered as an output stream and therefore materialized.
       * @param foreachFunc foreachRDD function
       * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
       *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
       *                           only the scopes and callsites of `foreachRDD` will override those
       *                           of the RDDs on the display.
       */
      private def foreachRDD(
          foreachFunc: (RDD[T], Time) => Unit,
          displayInnerRDDOps: Boolean): Unit = {
        new ForEachDStream(this,
          context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
      }
    

    ForEachDStream 不同于其它DStream的地方为其重写了generateJob方法,以使DStream Graph操作转换成RDD Graph操作,并生成Job.

    3、SparkContext启动

    /**
       * Start the execution of the streams.
       *
       * @throws IllegalStateException if the StreamingContext is already stopped.
       */
      def start(): Unit = synchronized {
        state match {
          case INITIALIZED =>
            startSite.set(DStream.getCreationSite())
            StreamingContext.ACTIVATION_LOCK.synchronized {
              StreamingContext.assertNoOtherContextIsActive()
              try {
                validate()
    
                // Start the streaming scheduler in a new thread, so that thread local properties
                // like call sites and job groups can be reset without affecting those of the
                // current thread.
                ThreadUtils.runInNewThread("streaming-start") {
                  sparkContext.setCallSite(startSite.get)
                  sparkContext.clearJobGroup()
                  sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
                  savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
                  scheduler.start()
                }
                state = StreamingContextState.ACTIVE
              } catch {
                case NonFatal(e) =>
                  logError("Error starting the context, marking it as stopped", e)
                  scheduler.stop(false)
                  state = StreamingContextState.STOPPED
                  throw e
              }
              StreamingContext.setActiveContext(this)
            }
            ......
          case ACTIVE =>
            logWarning("StreamingContext has already been started")
          case STOPPED =>
            throw new IllegalStateException("StreamingContext has already been stopped")
        }
      }
    
    

    在此方法中,最核心的代码是以线程的方式启动JobScheduler,从而开启各功能组件。

    3.1 JobScheduler的启动

    JobScheduler主要负责以下几种任务:

    • 数据接收相关组件的初始化及启动
      ReceiverTracker的初始化及启动。ReceiverTracker负责管理Receiver,包括Receiver的启停,状态维护 等。
    • Job生成相关组件的启动
      JobGenerator的启动。JobGenerator负责以BatchInterval为周期生成Job.
    • Streaming监听的注册与启动
    • 作业监听
    • 反压机制
      BackPressure机制,通过RateController控制数据摄取速率。
    • Executor DynamicAllocation 的启动
      Executor 动态伸缩管理, 动态增加或减少Executor,来达到使用系统稳定运行 或减少资源开销的目的。
    • Job的调度及状态维护。

    JobScheduler的start方法的代码如下所示:

    def start(): Unit = synchronized {
        if (eventLoop != null) return // scheduler has already been started
    
        logDebug("Starting JobScheduler")
        eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
          override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    
          override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
        }
        eventLoop.start()
    
        // attach rate controllers of input streams to receive batch completion updates
        for {
          inputDStream <- ssc.graph.getInputStreams
          rateController <- inputDStream.rateController
        } ssc.addStreamingListener(rateController)
    
        listenerBus.start()
        receiverTracker = new ReceiverTracker(ssc)
        inputInfoTracker = new InputInfoTracker(ssc)
    
        val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
          case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
          case _ => null
        }
    
        executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
          executorAllocClient,
          receiverTracker,
          ssc.conf,
          ssc.graph.batchDuration.milliseconds,
          clock)
        executorAllocationManager.foreach(ssc.addStreamingListener)
        receiverTracker.start()
        jobGenerator.start()
        executorAllocationManager.foreach(_.start())
        logInfo("Started JobScheduler")
      }
    
    

    代码中存在的 eventLoop: EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。

    小结

    JobScheduler是Spark Streaming中核心的组件,在其开始执行时,会开启数据接收相关组件及Job生成相关组件,从而使数据准备和数据计算两个流程开始工作。
    另外,其还负责BackPressure, Executor DynamicAllocation 等优化机制的启动工作。
    下面的章节,将对数据准备和数据计算阶段的流程进行分析,以及BackPressure, Executor DynamicAllocation 机制进行分析。

    相关文章

      网友评论

          本文标题:Spark Streaming 初始化过程分析

          本文链接:https://www.haomeiwen.com/subject/sdkavttx.html