美文网首页
SparkStreaming优雅关闭剖析

SparkStreaming优雅关闭剖析

作者: Bloo_m | 来源:发表于2020-10-20 12:29 被阅读0次

    简介

    在前面的文章中,总结了SparkStreaming入门级的文章,了解到SparkStreaming是一种微批处理的"实时"流技术,在实际场景中,当我们使用SparkStreaming开发好功能并通过测试之后部署到生产环境,那么之后就会7*24不间断执行的,除非出现异常退出。当然SparkStreaming提供了checkpoint和WAL机制能够保证我们的程序再次启动时候不会出现数据丢失的情况。但是需求并不是一成不变的,相信读者们都经历过需求不断迭代的情况,当我们需要迭代逻辑的时候,那么我们如何停止线上正在运行的程序呢?本文将为读者们详细介绍一些关于SparkStreaming优雅关闭的手段。接下来我们将针对以下几个问题进行展开讲解:

    1. 为什么需要优雅关闭?
    2. 什么时候触发关闭?
    3. 采用什么策略关闭?

    1.为什么需要优雅关闭

    基于前面提到的,当我们的场景需要保证数据准确,不允许数据丢失,那么这个时候我们就得考虑优雅关闭了。说到关闭,那么非优雅关闭就是通过kill -9 processId的方式或者yarn -kill applicationId的方式进行暴力关闭,为什么说这种方式是属于暴力关闭呢?由于Spark Streaming是基于micro-batch机制工作的,按照间隔时间生成RDD,如果在间隔期间执行了暴力关闭,那么就会导致这段时间的数据丢失,虽然提供了checkpoin机制,可以使程序启动的时候进行恢复,但是当出现程序发生变更的场景,必须要删除掉checkpoint,因此这里就会有丢失的风险。

    因此我们需要优雅关闭,将剩余未处理的数据或者正在处理的数据能够全部执行完成后,这样才不会出现数据丢失的情况。

    2.什么时候触发关闭

    既然我们知道了需要优雅关闭,那么就需要知道什么会触发关闭,这样才能有针对性的策略实现优雅关闭。

    首先我们先来了解一下整体流程:

    1. 首先StreamContext在做初始化的时候,会增加Shutdown hook方法 ,放入到一个钩子队列中,并设置优先级为51
    2. 当程序jvm退出时,会启动一个线程从钩子队列中按照优先级取出执行,然后就会执行Shutdown钩子方法
    3. 当执行Shutdown钩子方法时,首先会将receiver进行关闭,即不再接收数据
    4. 然后停止生成BatchRDD
    5. 等待task全部完成,停止Executor
    6. 最后释放所有资源,即整个关闭流程结束

    接下来看源码的具体实现

    StreamingContext.scala:调用start方法会调用ShutdownHookManager注册stopOnShutdown函数

    def start(): Unit = synchronized {
        state match {
          case INITIALIZED =>
            startSite.set(DStream.getCreationSite())
            ......
            /**
             * StreamContext启动时会增加Shutdown钩子函数,优先级为51
             */
            shutdownHookRef = ShutdownHookManager.addShutdownHook(
              StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
           ....
          case ACTIVE =>
            logWarning("StreamingContext has already been started")
          case STOPPED =>
            throw new IllegalStateException("StreamingContext has already been stopped")
        }
      }
    

    ShutdownHookManager.scala:在增加钩子函数的时候底层调用了SparkShutdownHookManager内部类

    def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
        shutdownHooks.add(priority, hook)
    } 
    private lazy val shutdownHooks = {
        val manager = new SparkShutdownHookManager()
        manager.install()
        manager
      }
    
    private [util] class SparkShutdownHookManager {
      def install(): Unit = {
        val hookTask = new Runnable() {
          override def run(): Unit = runAll()
        }
        org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
          hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
      }
    
      /**
       * jvm退出的时候会开启一个线程按照优先级逐个调用钩子函数
       */
      def runAll(): Unit = {
        shuttingDown = true
        var nextHook: SparkShutdownHook = null
        while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
          Try(Utils.logUncaughtExceptions(nextHook.run()))
        }
      }
    
      def add(priority: Int, hook: () => Unit): AnyRef = {
        hooks.synchronized {
          if (shuttingDown) {
            throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
          }
          val hookRef = new SparkShutdownHook(priority, hook)
          hooks.add(hookRef)
          hookRef
        }
      }
    }
    
    private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
      extends Comparable[SparkShutdownHook] {
      //这里真正调用注册的函数
      def run(): Unit = hook()
    }
    

    那么接下来看下真正执行关闭的逻辑,即StreamingContext#stopOnShutdown方法

     private def stopOnShutdown(): Unit = {
        val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
        stop(stopSparkContext = false, stopGracefully = stopGracefully)
      }
     def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
        synchronized {
          state match {
            case ACTIVE =>
              //调度相关的关闭
              Utils.tryLogNonFatalError {
                scheduler.stop(stopGracefully)
              }
             
              //监控
              Utils.tryLogNonFatalError {
                env.metricsSystem.removeSource(streamingSource)
              }
              
              //ui
              Utils.tryLogNonFatalError {
                uiTab.foreach(_.detach())
              }
              Utils.tryLogNonFatalError {
                unregisterProgressListener()
              }
              StreamingContext.setActiveContext(null)
              //设置状态为停止
              state = STOPPED
          }
        }
        if (shutdownHookRefToRemove != null) {
          ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
        }
         // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
        if (stopSparkContext) sc.stop()
      }
    

    可以看到这里有一个spark.streaming.stopGracefullyOnShutdown参数来传给底层的stop方法,即调用Jobscheduler#stop方法

    JobScheduler#stop

     def stop(processAllReceivedData: Boolean): Unit = synchronized {
        //1.首先停止接收数据
        if (receiverTracker != null) {
          receiverTracker.stop(processAllReceivedData)
        }
    
        if (executorAllocationManager != null) {
          executorAllocationManager.foreach(_.stop())
        }
    
        //2.停止生成BatchRdd,处理剩余的数据
        jobGenerator.stop(processAllReceivedData)
    
        //3.停止Exectuor
        jobExecutor.shutdown()
    
        val terminated = if (processAllReceivedData) {
          jobExecutor.awaitTermination(1, TimeUnit.HOURS)  // just a very large period of time
        } else {
          jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
        }
        if (!terminated) {
          jobExecutor.shutdownNow()
        }
      
        // Stop everything else
        listenerBus.stop()
        eventLoop.stop()
        eventLoop = null
        logInfo("Stopped JobScheduler")
      }
    

    3.采用什么策略关闭?

    3.1 配置策略

    根据刚才梳理的触发关闭流程中,其实可以通过配置spark.streaming.stopGracefullyOnShutdown=true来实现优雅关闭,但是需要发送 SIGTERM 信号给driver端,这里有两种方案

    方案一,具体步骤如下:

    1. 通过Spark UI找到driver所在节点。

    2. 登录driver节点,执行 ps -ef |grep java |grep ApplicationMaster命令找到对应的pid

    3. 执行**kill -SIGTERM ** 发送SIGTERM信号

    4. 当spark driver收到该信号时,在日志中会有以下信息

      ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
      INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
      INFO streaming.StreamingContext: StreamingContext stopped successfully
      INFO spark.SparkContext: Invoking stop() from shutdown hook
      INFO spark.SparkContext: Successfully stopped SparkContext
      INFO util.ShutdownHookManager: Shutdown hook called
      

      注意:

      这里有一个坑,默认情况下在yarn模式下,spark.yarn.maxAppAttempts参数值和yarn.resourcemanager.am.max-attempts是同一个值,即为2。当通过Kill命令杀掉AM时,Yarn会自动重新启动一个AM,因此需要再发送一次Kill命令。当然也可以通过spark-submit命令提交的时候指定spark.yarn.maxAppAttempts=1这个配置参数;但这里也会有容灾风险,比如出现网络问题的时候,这里就无法自动重启了,程序就会以失败而告终。

    方案二:通过yarn application -kill < applicationid >命令来kill掉job(不建议使用)

    该命令会发送SIGTERM信号给container,同时也会立即发送 SIGKILL 命令。虽然可以通过yarn.nodemanager.sleep-delay-before-sigkill.ms参数来调整SIGTERM和SIGKILL之间的间隔,但是好像没什么作用。具体日志信息如下:

    ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
    INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
    

    3.2 标记策略

    该种策略通过借助于三方系统来标记状态, 一种方法是将标记HDFS文件,如果标记文件存在,则调用scc.stop(true,true);或者是借助于redis的key是否存在等方式

    val checkIntervalMillis = 60000
    var isStopped = false
    
    while (! isStopped) {
        isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
        checkShutdownMarker
        if (!isStopped && stopFlag) {
            ssc.stop(true, true)
        }
    }
    
    def checkShutdownMarker = {
        if (!stopFlag) {
            val fs = FileSystem.get(new Configuration())
            stopFlag = fs.exists(new Path(shutdownMarker))
        }
    

    3.3 服务策略

    即提供一个restful服务,暴露出一个接口提供关闭功能。

    def httpServer(port:Int,ssc:StreamingContext)={
        val server = new Server(port)
        val context = new ContextHandler()
        context.setContextPath("/shutdown")
        context.setHandler( new CloseStreamHandler(ssc) )
        server.setHandler(context)
        server.start()
    }
    class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
        override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
          ssc.stop(true,true)
          response.setContentType("text/html; charset=utf-8");
          response.setStatus(HttpServletResponse.SC_OK);
          val out = response.getWriter();
          baseRequest.setHandled(true);
        }
      }
    

    相关文章

      网友评论

          本文标题:SparkStreaming优雅关闭剖析

          本文链接:https://www.haomeiwen.com/subject/eyqdmktx.html