美文网首页Spark源码精读分析计划
Spark Core源码精读计划#17:上下文清理器Contex

Spark Core源码精读计划#17:上下文清理器Contex

作者: LittleMagic | 来源:发表于2019-06-06 20:10 被阅读138次

    目录

    前言

    话休絮烦,本文讲解SparkContext初始化的最后一个组件——ContextCleaner,即上下文清理器。顾名思义,它扮演着Spark Core中垃圾收集器的角色,因此虽然我们在平时编码时甚少见到它,但它算是一个幕后英雄了。如果看官对Java GC的相关知识有所了解的话,本篇讲的内容应该容易理解。

    初始化与类定义

    SparkContext中的初始化逻辑

    代码#17.1 - SparkContext构造方法中初始化ContextCleaner

        _cleaner =
          if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
            Some(new ContextCleaner(this))
          } else {
            None
          }
        _cleaner.foreach(_.start())
    

    ContextCleaner的初始化非常简单,只需要依赖于SparkContext本身,由spark.cleaner.referenceTracking配置项控制是否启用,默认为true。

    ContextCleaner类的属性成员

    代码#17.2 - ContextCleaner类的属性成员

      private val referenceBuffer =
        Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
    
      private val referenceQueue = new ReferenceQueue[AnyRef]
    
      private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
    
      private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
    
      private val periodicGCService: ScheduledExecutorService =
        ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
    
      private val periodicGCInterval =
        sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
    
      private val blockOnCleanupTasks = sc.conf.getBoolean(
        "spark.cleaner.referenceTracking.blocking", true)
    
      private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
        "spark.cleaner.referenceTracking.blocking.shuffle", false)
    
      @volatile private var stopped = false
    
      private def blockManagerMaster = sc.env.blockManager.master
      private def broadcastManager = sc.env.broadcastManager
      private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
    
    • referenceBuffer:缓存CleanupTaskWeakReference的集合。CleanupTaskWeakReference是Java自带WeakReference类的简单封装,其中保存有需要清理的Spark组件实例的弱引用。
    • referenceQueue:缓存弱引用实例的引用队列(java.lang.ref.ReferenceQueue类型)。对弱引用和软引用实例,当其被GC之后就会存入引用队列中,用户程序通过从队列中取得这些引用信息,就可以执行自定义的清理操作。
    • listeners:ContextCleaner的监听器队列,目前只是在测试代码中用到,没有实际用途。
    • cleaningThread:执行具体清理工作的线程,具体是调用了keepCleaning()方法。后面会讲到该方法的实现。
    • periodicGCService:一个单线程的调度线程池,用来周期性地执行GC操作。
    • periodicGCInterval:periodicGCService执行GC的周期长度,由配置项spark.cleaner.periodicGC.interval控制,默认为30分钟。
    • blockOnCleanupTasks:执行清理任务的时候是否阻塞(不包含Shuffle数据的清理任务),由配置项spark.cleaner.referenceTracking.blocking控制,默认值true。
    • blockOnShuffleCleanupTasks:执行清理Shuffle数据的任务时是否阻塞,由配置项spark.cleaner.referenceTracking.blocking.shuffle控制,默认值false。
    • stopped:该ContextCleaner是否停止的标记。

    剩余的三个则分别是从SparkEnv中获取的BlockManagerMaster、BroadcastManager与MapOutputTrackerMaster的对应实例,它们会在之后的清理步骤中用到。

    清理任务及弱引用的封装

    ContextCleaner中共有5种清理任务,分别对应RDD、Shuffle、广播变量、累加器和检查点,都继承自CleanupTask这个空的特征。它们的定义极其简单,如下。

    代码#17.3 - o.a.s.CleanupTask及其子类

    private sealed trait CleanupTask
    private case class CleanRDD(rddId: Int) extends CleanupTask
    private case class CleanShuffle(shuffleId: Int) extends CleanupTask
    private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
    private case class CleanAccum(accId: Long) extends CleanupTask
    private case class CleanCheckpoint(rddId: Int) extends CleanupTask
    

    在上一节讲到的CleanupTaskWeakReference定义如下。当其中的referent对象可达性变为弱可达(weakly reachable)时,对应的CleanupTaskWeakReference实例就会被加入ReferenceQueue中,用于执行清理任务。

    代码#17.4 - o.a.s.CleanupTaskWeakReference类

    private class CleanupTaskWeakReference(
        val task: CleanupTask,
        referent: AnyRef,
        referenceQueue: ReferenceQueue[AnyRef])
      extends WeakReference(referent, referenceQueue)
    

    ContextCleaner的执行流程

    启动

    在代码#17.1中已经调用了ContextCleaner.start()方法。该方法将清理线程cleaningThread设为守护线程并启动之,然后按照periodicGCInterval的间隔来调度执行System.gc()方法,进而可能触发一次GC。因此,在Spark Application中指定Driver或Executor的JVM参数时,一定不要加上-XX:-DisableExplicitGC,该参数会使System.gc()的调用无效化。

    代码#17.5 - o.a.s.ContextCleaner.start()方法

      def start(): Unit = {
        cleaningThread.setDaemon(true)
        cleaningThread.setName("Spark Context Cleaner")
        cleaningThread.start()
        periodicGCService.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = System.gc()
        }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
      }
    

    清理逻辑

    ContextCleaner提供了registerForCleanup()方法,用来将CleanupTask及其对应要清理的对象加入referenceBuffer集合中。下面来看代码#17.2中提到的keepCleaning()方法。

    代码#17.6 - o.a.s.ContextCleaner.keepCleaning()方法

      private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
        while (!stopped) {
          try {
            val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
              .map(_.asInstanceOf[CleanupTaskWeakReference])
            synchronized {
              reference.foreach { ref =>
                logDebug("Got cleaning task " + ref.task)
                referenceBuffer.remove(ref)
                ref.task match {
                  case CleanRDD(rddId) =>
                    doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                  case CleanShuffle(shuffleId) =>
                    doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                  case CleanBroadcast(broadcastId) =>
                    doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                  case CleanAccum(accId) =>
                    doCleanupAccum(accId, blocking = blockOnCleanupTasks)
                  case CleanCheckpoint(rddId) =>
                    doCleanCheckpoint(rddId)
                }
              }
            }
          } catch {
            case ie: InterruptedException if stopped => // ignore
            case e: Exception => logError("Error in cleaning thread", e)
          }
        }
      }
    

    该方法从ReferenceQueue中取出CleanupTaskWeakReference,然后将其包含的CleanupTask进行模式匹配,并对五种情况分别调用不同的方法。以清理RDD和Shuffle数据的方法为例来看一看。

    代码#17.7 - o.a.s.ContextCleaner.doCleanupRDD()/doCleanupShuffle()方法

      def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
        try {
          logDebug("Cleaning RDD " + rddId)
          sc.unpersistRDD(rddId, blocking)
          listeners.asScala.foreach(_.rddCleaned(rddId))
          logInfo("Cleaned RDD " + rddId)
        } catch {
          case e: Exception => logError("Error cleaning RDD " + rddId, e)
        }
      }
    
      def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
        try {
          logDebug("Cleaning shuffle " + shuffleId)
          mapOutputTrackerMaster.unregisterShuffle(shuffleId)
          blockManagerMaster.removeShuffle(shuffleId, blocking)
          listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
          logInfo("Cleaned shuffle " + shuffleId)
        } catch {
          case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
        }
      }
    

    可见,清理RDD是调用了SparkContext.unpersistRDD()方法来反持久化一个RDD。清理Shuffle则需要同时从MapOutputTracker与BlockManager中反注册Shuffle。清理完毕后再调用各个监听器的监听方法进行记录。

    总结

    本文简要介绍了ContextCleaner的初始化、启动和清理的具体流程。

    在讲完ContextCleaner之后,围绕SparkContext展开的这部分体系也进入了尾声。我们会检查一下前面是否还有漏掉的重要内容,如果没有的话,大概是时候进入Spark Core的核心之一——RDD了。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#17:上下文清理器Contex

        本文链接:https://www.haomeiwen.com/subject/kjxpxctx.html