美文网首页Java
Spark ShutdownHook

Spark ShutdownHook

作者: wangdy12 | 来源:发表于2019-01-15 20:23 被阅读0次

    按照优先级在关闭时执行一系列操作,在spark内用途很广泛,主要是释放资源,删除文件等

    使用

        // SparkContext在初始化时注册,设定优先级和要调用的函数
        _shutdownHookRef = ShutdownHookManager.addShutdownHook(
          ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
          logInfo("Invoking stop() from shutdown hook")
          try {
            stop() // 关闭SparkContext时要释放清理的对象
          } catch {
            case e: Throwable =>
              logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
          }
        }
    

    ShutdownHookManager

    ShutdownHookManager的静态函数addShutdownHook依赖SparkShutdownHookManager类进行具体的逻辑处理

      //处理的核心
      private lazy val shutdownHooks = {
        val manager = new SparkShutdownHookManager()
        manager.install()
        manager
      }
    
      // 如果在jvm执行shutdown hook过程中添加钩子,jvm会抛出异常
      def inShutdown(): Boolean = {
        try {
          val hook = new Thread {
            override def run() {}
          }
          // scalastyle:off runtimeaddshutdownhook
          Runtime.getRuntime.addShutdownHook(hook)
          // scalastyle:on runtimeaddshutdownhook
          Runtime.getRuntime.removeShutdownHook(hook)
        } catch {
          case ise: IllegalStateException => return true
        }
        false
      }
      
      //添加钩子
      def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
        shutdownHooks.add(priority, hook)
      }
    
    private [util] class SparkShutdownHookManager {
      // 每个钩子都记录到优先队列里
      private val hooks = new PriorityQueue[SparkShutdownHook]()
      @volatile private var shuttingDown = false
    
      /**
       * Install a hook to run at shutdown and run all registered hooks in order.
       */
      def install(): Unit = {
        val hookTask = new Runnable() {
          // runAll 函数会从优先队列中取出所有的钩子并运行
          override def run(): Unit = runAll()
        }
        // 依赖Hadoop的ShutdownHookManager机制
        org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
          hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
      }
    }
    

    Hadoop的ShutdownHookManager机制是通过JDK的addShutdownHook实现,收到信号后,将所有的钩子按照优先级取出执行

    JDK ShutdownHook

    java.lang.Runtime包内的函数:

    addShutdownHook(Thread hook)
    

    注册一个JVM关闭的钩子,JVM会为了响应以下两类事件而关闭:

    • 程序正常退出exits,当最后的非守护线程退出时,或者在调用System.exit方法时
    • 因为用户中断,终止terminated虚拟机,例如Ctrl + C,或发生系统级别的事件,比如用户退出(产生SIGHUP,系统的默认处理方式是终止进程)或系统关闭

    shutdown hook是一个已初始化但尚未启动的线程。虚拟机开始关闭时,它会以某种顺序启动所有已经注册的关闭钩子,并让它们同时运行。运行完所有的钩子后,虚拟机就会停止。注意,关闭期间会继续运行守护线程,如果通过调用exit方法来发起关闭,那么也会继续运行非守护线程

    一旦开始了关闭序列shutdown sequence,则只能通过调用halt方法来停止,此方法可强行终止虚拟机,在关闭过程中不能注册新的关闭钩子或取消注册先前已注册的钩子。否则会导致抛出 IllegalStateException

    shutdown hook在虚拟机生命周期中的特定时间运行,因此在编写代码时要特别注意,保证线程安全,并尽可能地避免死锁

    shutdown hook应该快速地完成其工作。当程序调用exit时,虚拟机应该迅速地关闭并退出。由于用户退出或系统关闭而终止虚拟机时,底层的操作系统可能只允许在固定的时间内关闭并退出。因此在关闭钩子中不应该进行用户交互或执行长时间的计算

    与其他所有线程一样,通过调用线程所属线程组的ThreadGroup.uncaughtException方法,可在关闭钩子中处理未捕获的异常。此方法的默认实现是将该异常的堆栈跟踪打印至System.err并终止线程;它不会导致虚拟机退出或暂停。

    仅在很少的情况下,虚拟机可能会中止abort,也就是没有完全关闭就停止运行。虚拟机被外部终止时会出现这种现象,比如在Unix上使用SIGKILL信号。如果native方法出错,虚拟机也会终止,例如内部数据结构损坏或试图访问不存在的内存。如果虚拟机中止,则无法保证是否将运行关闭钩子

    File.deleteOnExit()进程结束后删除文件,其实现机制就是ShutdownHook

    JDK信号处理相关

    主要是sun.misc包内两个类:SignalSignalHandler

    Signal构造函数

    // 参数为对应信号的名称
    public Signal(String var1)
    

    通过kill -l可以查看信号的名称

    SignalHandler接口的核心是处理函数handle(Signal var1)

    public interface SignalHandler {
        SignalHandler SIG_DFL = new NativeSignalHandler(0L);
        SignalHandler SIG_IGN = new NativeSignalHandler(1L);
    
        void handle(Signal var1);
    }
    

    静态方法Signal.handle(Signal, SignalHandler)注册Signal对应的处理器

    org.apache.spark.util.SignalUtils类中就添加了对"TERM", "HUP", "INT"三个信号的处理

      def registerLogger(log: Logger): Unit = synchronized {
        if (!loggerRegistered) {
          Seq("TERM", "HUP", "INT").foreach { sig =>
            SignalUtils.register(sig) {
              log.error("RECEIVED SIGNAL " + sig)
              false
            }
          }
          loggerRegistered = true
        }
      }
    

    内部创建对应的信号Signal,以及对应的ActionHandler,其实现了SignalHandler接口的handle方法,内部会执行注册的方法,这里就是进行log输出

    相关文章

      网友评论

        本文标题:Spark ShutdownHook

        本文链接:https://www.haomeiwen.com/subject/yokudqtx.html