18 Spark Streaming程序的优雅停止

作者: 海纳百川_spark | 来源:发表于2016-06-01 23:10 被阅读1865次
    1. Spark Streaming程序的停止可以是强制停止、异常停止或其他方式停止。
      首先我们看StreamingContext的stop()方法
    def stop(
          stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
         ): Unit = synchronized {
        stop(stopSparkContext, false)
    }
    

    这里定义了两个参数,stopSparkContext可以通过配置文件定义,接着看接收两个参数的stop方法,代码如下

    /**
       * Stop the execution of the streams, with option of ensuring all received data
       * has been processed.
       *
       * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
       *                         will be stopped regardless of whether this StreamingContext has been
       *                         started.
       * @param stopGracefully if true, stops gracefully by waiting for the processing of all
       *                       received data to be completed
       */
    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
        var shutdownHookRefToRemove: AnyRef = null
        if (AsynchronousListenerBus.withinListenerThread.value) {
          throw new SparkException("Cannot stop StreamingContext within listener thread of" +
            " AsynchronousListenerBus")
        }
        synchronized {
          try {
            state match {
              case INITIALIZED =>
                logWarning("StreamingContext has not been started yet")
              case STOPPED =>
                logWarning("StreamingContext has already been stopped")
              case ACTIVE =>
                scheduler.stop(stopGracefully)
                // Removing the streamingSource to de-register the metrics on stop()
                env.metricsSystem.removeSource(streamingSource)
                uiTab.foreach(_.detach())
                StreamingContext.setActiveContext(null)
                waiter.notifyStop()
                if (shutdownHookRef != null) {
                  shutdownHookRefToRemove = shutdownHookRef
                  shutdownHookRef = null
                }
                logInfo("StreamingContext stopped successfully")
            }
          } finally {
            // The state should always be Stopped after calling `stop()`, even if we haven't started yet
            state = STOPPED
          }
        }
        if (shutdownHookRefToRemove != null) {
          ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
        }
        // Even if we have already stopped, we still need to attempt to stop the SparkContext because
        // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
        if (stopSparkContext) sc.stop()
    }
    

    注释中说明要停止程序时,正确的方式是需要所有接收的数据被处理完成后再停止,那么就需要我们传入的stopGracefully参数为true,然后停止时会等待所有任务执行完成

    1. Spark Streaming提供了一个优雅停止的方法,在StreamingContext里面有一个stopOnShutdown()方法,代码如下
    private def stopOnShutdown(): Unit = {
        val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
        logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
        // Do not stop SparkContext, let its own shutdown hook stop it
        stop(stopSparkContext = false, stopGracefully = stopGracefully)
    }
    

    stopOnShutdown()方法是什么意思呢,在我们的程序退出时,不管是正常退出或异常退出,stopOnShutdown()方法都会被回调,然后调用stop方法。stopGracefully 可以通过配置项spark.streaming.stopGracefullyOnShutdown配置,生产环境需要配置为true.

    1. stopOnShutdown()方法是怎样被调用的呢?在StreamingContext的start方法中有一行代码
    shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
    

    添加stopOnShutdown函数到ShutdownHookManager中,addShutdownHook代码如下

    def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
        shutdownHooks.add(priority, hook)
    }
    

    看SparkShutdownHookManager 里都有什么,看代码注释了解SparkShutdownHookManager的功能,不一一介绍

    private [util] class SparkShutdownHookManager {
    
      // 优先级队列,优先级越大,越优先执行
      private val hooks = new PriorityQueue[SparkShutdownHook]()
      @volatile private var shuttingDown = false
    
      /**
       * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
       * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
       * the best.
       */
      // 这里实例化一个线程,添加到jvm的关闭钩子中,等到jvm退出时才会被调用
      def install(): Unit = {
        val hookTask = new Runnable() {
          override def run(): Unit = runAll()
        }    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
          case Success(shmClass) =>
            val fsPriority = classOf[FileSystem]
              .getField("SHUTDOWN_HOOK_PRIORITY")
              .get(null) // static field, the value is not used
              .asInstanceOf[Int]
            val shm = shmClass.getMethod("get").invoke(null)
            shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
              .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
          case Failure(_) =>
            Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
        }
      }
      // jvm退出时钩子回调此函数
      def runAll(): Unit = {
        shuttingDown = true
        var nextHook: SparkShutdownHook = null
        //循环从优先级队列取数据执行,优先级越大,越优先执行
        while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
          Try(Utils.logUncaughtExceptions(nextHook.run()))
        }
      }
      def add(priority: Int, hook: () => Unit): AnyRef = {
        hooks.synchronized {
          if (shuttingDown) {
            throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
          }
          val hookRef = new SparkShutdownHook(priority, hook)
          hooks.add(hookRef)
          hookRef
        }
      }
      def remove(ref: AnyRef): Boolean = {
        hooks.synchronized { hooks.remove(ref) }
      }
    }
    
    1. 看到这里就明白了,把stopOnShutdown()函数放入SparkShutdownHookManager 中的优化级队列hooks中,默认优先级为51,jvm退出时启动一个线程,调用runAll()方法,然后从hooks队列中一个一个取数据(函数),然后执行,就调用了stopOnShutdown()函数,接着调用stop()函数,我们的应用程序就可以优雅的执行停止工作了。

    相关文章

      网友评论

        本文标题:18 Spark Streaming程序的优雅停止

        本文链接:https://www.haomeiwen.com/subject/nenhdttx.html