美文网首页Spark源码精读分析计划
Spark Core源码精读计划 番外篇A:AppStatusS

Spark Core源码精读计划 番外篇A:AppStatusS

作者: LittleMagic | 来源:发表于2019-05-28 23:03 被阅读5次

    目录

    前言

    AppStatusStore这个东西是在Spark 2.3.0版本才加入的,在Spark大家庭中是真正的新面孔。顾名思义,它用来存储Application的状态数据,Spark Web UI及REST API需要的数据都取自它。之前在写度量系统时,我曾经说Web UI的数据全部来自度量系统,这句话是错误的,深表歉意。

    在本系列之前的文章中已经不止一次地提到了AppStatusStore,但都是几笔带过,从未认真分析,网上也没有前人发表过高见。本文是这个系列的第一篇番外,作为查漏补缺,今天就来稍微探索一下AppStatusStore的底层细节吧。

    App状态数据的键值对存储

    由前文的分析,我们已经知道AppStatusStore的构造依赖于两个要素:一为键值对存储KVStore,二为App状态监听器AppStatusListener。本节先来看KVStore,它位于o.a.s.util.kvstore包中,是一个Java接口,作为Spark内App状态数据键值对存储的基类。

    KVStore

    代码#A.1 - o.a.s.util.kvstore.KVStore接口

    @Private
    public interface KVStore extends Closeable {
      <T> T getMetadata(Class<T> klass) throws Exception;
    
      void setMetadata(Object value) throws Exception;
    
      <T> T read(Class<T> klass, Object naturalKey) throws Exception;
    
      void write(Object value) throws Exception;
    
      void delete(Class<?> type, Object naturalKey) throws Exception;
    
      <T> KVStoreView<T> view(Class<T> type) throws Exception;
    
      long count(Class<?> type) throws Exception;
    
      long count(Class<?> type, String index, Object indexedValue) throws Exception;
    }
    

    顾名思义,该接口定义的大多数方法都是非常常见的,比如read()、write()、delete()、count()等,也符合它作为键值对存储的定位。另外,它还会提供基本的元数据存取(get/setMetadata()方法)及视图操作(view()方法)。

    根据其注释中的描述,KVStore的子类可以(不是必须)支持以下两个特性:

    • 序列化:如果数据需要序列化及压缩存储,可以利用定义好的KVStoreSerializer序列化器来做。
    • Key自动管理与:KVStore可以自动利用各个类型的名字(也就是代码#A.1中的klass或者type)来作为键,与自然键(naturalKey)配合即可唯一确定一条记录。如果为写入KVStore的数据结构的字段加上@KVIndex注解的话,就会为这些字段创建索引,方便排序。

    下图示出以KVStore接口为中心的类结构。

    图#A.1 - KVStore的继承体系

    其中,InMemoryStore是在内存中维护的键值对存储;LevelDB则是借助Google开源的KV数据库来实现,可以持久化到磁盘。ElementTrackingStore额外加上了跟踪元素个数的功能,可以根据元素个数阈值触发特定的操作,但它更多地是个包装类,需要依赖于InMemoryStore或者LevelDB。

    AppStatusStore的实现依赖于InMemoryStore和ElementTrackingStore,下面分别来看。

    InMemoryStore与InMemoryView

    由于不需要持久化,该类只是用一个ConcurrentHashMap保存对象类型与对象列表的映射,即:

    private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();

    对象列表由静态内部类InstanceList来实现。看官如果有兴趣,可以自行查看InstanceList的源码,其中涉及到较多Java反射方面的知识。以下则是InMemoryStore实现的read()、write()和view()方法。

    代码#A.2 - o.a.s.util.kvstore.InMemoryStore.read()/write()/count()/view()方法

      @Override
      public <T> T read(Class<T> klass, Object naturalKey) {
        InstanceList list = data.get(klass);
        Object value = list != null ? list.get(naturalKey) : null;
        if (value == null) {
          throw new NoSuchElementException();
        }
        return klass.cast(value);
      }
    
      @Override
      public void write(Object value) throws Exception {
        InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
          try {
            return new InstanceList(key);
          } catch (Exception e) {
            throw Throwables.propagate(e);
          }
        });
        list.put(value);
      }
    
      @Override
      public <T> KVStoreView<T> view(Class<T> type){
        InstanceList list = data.get(type);
        return list != null ? list.view(type)
          : new InMemoryView<>(type, Collections.<T>emptyList(), null);
      }
    

    read()和write()方法自不必多说,view()方法则是通过调用InstanceList.view()方法生成当前KVStore的某一个类型对应的视图InMemoryView。InMemoryView继承自抽象类KVStoreView,并且只能通过内部自定义的迭代器InMemoryIterator来访问。下面是获取InMemoryIterator的方法。

    代码#A.3 - o.a.s.util.kvstore.InMemoryStore.InMemoryView.iterator()方法

        @Override
        public Iterator<T> iterator() {
          if (elements.isEmpty()) {
            return new InMemoryIterator<>(elements.iterator());
          }
          try {
            KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
            int modifier = ascending ? 1 : -1;
            final List<T> sorted = copyElements();
            Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
            Stream<T> stream = sorted.stream();
    
            if (first != null) {
              stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
            }
            if (last != null) {
              stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
            }
            if (skip > 0) {
              stream = stream.skip(skip);
            }
            if (max < sorted.size()) {
              stream = stream.limit((int) max);
            }
            return new InMemoryIterator<>(stream.iterator());
          } catch (Exception e) {
            throw Throwables.propagate(e);
          }
        }
    

    由上可知,视图内的数据首先会被排序,然后利用Java Stream API做一些过滤的操作,最后返回代表有序数据的InMemoryIterator。在AppStatusStore中,从KVStore取数据经常会用到视图,这也就解释了为什么我们在Spark UI中看到的很多信息(比如Task、Stage等)都是已经排好序的。

    ElementTrackingStore

    ElementTrackingStore的初始化依赖于InMemoryStore,因此它的多数方法都是直接代理了InMemoryStore的方法。为了实现跟踪元素数并触发操作的功能,其内部维护了一个类型与触发器(通过内部样例类Trigger定义)的映射关系,添加触发器的方法如下。

    代码#A.4 - o.a.s.status.ElementTrackingStore.addTrigger()方法

      def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = {
        val existing = triggers.getOrElse(klass, Seq())
        triggers(klass) = existing :+ Trigger(threshold, action)
      }
    

    其中,threshold是一个整形值,表示对应类型元素个数的阈值。action是一个偏函数,表示到达阈值之后需要执行的操作。在其重载的write()方法中,我们能清楚地看到这个逻辑。

    代码A.5 - o.a.s.status.ElementTrackingStore.write()方法

      def write(value: Any, checkTriggers: Boolean): Unit = {
        write(value)
        if (checkTriggers && !stopped) {
          triggers.get(value.getClass()).foreach { list =>
            doAsync {
              val count = store.count(value.getClass())
              list.foreach { t =>
                if (count > t.threshold) {
                  t.action(count)
                }
              }
            }
          }
        }
      }
    

    需要注意的是,可以通过checkTriggers参数来控制是否触发。另外,可以通过配置项spark.appStateStore.asyncTracking.enable设置是否异步触发操作(代码中的doAsync()方法),默认值为true。

    通过以上分析,我们对App状态数据的键值对存储有了大致的了解。接下来轮到AppStatusListener了。

    App状态监听器

    AppStatusListener类继承自SparkListener类,因此实现了很多SparkListener中定义的监听事件处理方法。我们不急着研究这些方法,而先来看一些前置的东西。

    LiveEntity

    所谓LiveEntity,可以直译为“活动实体”,指的是那些在Application运行过程中状态在不断变化的Spark内部组件,比如Job、Stage、Task、Executor、RDD,它们会向ElementTrackingStore更新自己的状态数据。LiveEntity是一个抽象类,其定义如下,比较简单,不过多解释。

    代码#A.6 - o.a.s.status.LiveEntity抽象类

    private[spark] abstract class LiveEntity {
      var lastWriteTime = -1L
    
      def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
        store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
        lastWriteTime = now
      }
    
      protected def doUpdate(): Any
    }
    

    所有LiveEntity的实现类(比如LiveJob、LiveTask等)都包含了大量的监控信息和度量信息,监控信息来自AppStatusListener,度量信息来自MetricsSystem。并且它们都需要实现doUpdate()方法,该方法负责将LiveEntity最新的状况反映给ElementTrackingStore。在AppStatusListener类中,也预先定义了各个LiveEntity的缓存。

    代码#A.7 - o.a.s.status.AppStatusListener中LiveEntity的缓存

      private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
      private val liveJobs = new HashMap[Int, LiveJob]()
      private val liveExecutors = new HashMap[String, LiveExecutor]()
      private val liveTasks = new HashMap[Long, LiveTask]()
      private val liveRDDs = new HashMap[Int, LiveRDD]()
    

    添加清理触发器

    在AppStatusListener的构造方法中,首先就调用了ElementTrackingStore.addTrigger()方法添加触发器,代码如下。

    代码#A.8 - o.a.s.status.AppStatusListener添加触发器

      kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
        { count => cleanupExecutors(count) }
    
      kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
        cleanupJobs(count)
      }
    
      kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
        cleanupStages(count)
      }
    

    这三个触发器分别用于在Job、Stage和Executor信息的数量超过SparkConf内规定的数值(名称均为spark.ui.retained***)时,将那些较旧的信息删除掉,Task信息则会随着Stage清除。以清理Job的cleanupJobs()方法为例,代码如下。

    代码#A.9 - o.a.s.status.AppStatusListener.cleanupJobs()方法

      private def cleanupJobs(count: Long): Unit = {
        val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
        if (countToDelete <= 0L) {
          return
        }
        val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
        val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
          j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
        }
        toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
      }
    

    其具体逻辑是:先计算出需要删除的记录数目,然后从KVStore中获取Job信息包装类JobDataWrapper对应的视图,并按照Job完成时间的索引排序。接下来取出要删除的Job记录(Job不能是在执行,也不能是未知状态),再调用KVStore.delete()方法删除之。

    监听事件处理方法

    由于AppStatusListener监听的事件甚多,所以我们只选取其中一个有代表性的onJobStart()方法来看一看。

    代码#A.10 - o.a.s.status.AppStatusListener.onJobStart()方法

      override def onJobStart(event: SparkListenerJobStart): Unit = {
        val now = System.nanoTime()
        val numTasks = {
          val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
          missingStages.map(_.numTasks).sum
        }
    
        val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
        val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
        val jobGroup = Option(event.properties)
          .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
    
        val job = new LiveJob(
          event.jobId,
          lastStageName,
          if (event.time > 0) Some(new Date(event.time)) else None,
          event.stageIds,
          jobGroup,
          numTasks)
        liveJobs.put(event.jobId, job)
        liveUpdate(job, now)
    
        event.stageInfos.foreach { stageInfo =>
          val stage = getOrCreateStage(stageInfo)
          stage.jobs :+= job
          stage.jobIds += event.jobId
          liveUpdate(stage, now)
        }
    
        event.stageInfos.foreach { stage =>
          val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes)
          val uigraph = new RDDOperationGraphWrapper(
            stage.stageId,
            graph.edges,
            graph.outgoingEdges,
            graph.incomingEdges,
            newRDDOperationCluster(graph.rootCluster))
          kvstore.write(uigraph)
        }
      }
    

    在接收到表示Job启动的SparkListenerJobStart事件后,该方法的大致流程如下:

    • 根据该Job的Stage信息,估算出一个大致的Task数目,获取其最后一个Stage的名称;
    • 封装一个LiveJob实例,将其放入缓存,并调用liveUpdate()方法向KVStore更新状态。liveUpdate()方法最终调用的就是LiveEntity.write()方法;
    • 调用getOrCreateStage()方法生成LiveStage实例,同样向KVStore更新状态;
    • 生成RDD的DAG表示,并写入KVStore中。

    AppStatusListener监听的每个事件都会采用类似上面的逻辑来处理,将数据写入KVStore之后,就可以通过AppStatusStore将它们取出并且展示了。

    基于KVStore和监听器包装AppStatusStore

    有了存储数据的ElementTrackingStore和监听并写入数据的AppStatusListener,AppStatusStore的实现就会非常简单,只需要调用read()或view()从ElementTrackingStore中以一定的规则取出数据进行包装即可。

    例如,在讲解SparkUI的文章#14中已经提到,构造EnvironmentPage时(参见代码清单#14.8),就会调用AppStatusStore.environmentInfo()方法。

    代码#A.11 - o.a.s.status.AppStatusStore.environmentInfo()方法

      def environmentInfo(): v1.ApplicationEnvironmentInfo = {
        val klass = classOf[ApplicationEnvironmentInfoWrapper]
        store.read(klass, klass.getName()).info
      }
    

    再举一例,如果要展示Stage相关的信息,就会调用stageData()方法,实现起来同样简单方便。

    代码#A.12 - o.a.s.status.AppStatusStore.stageData()方法

      def stageData(stageId: Int, details: Boolean = false): Seq[v1.StageData] = {
        store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId)
          .asScala.map { s =>
            if (details) stageWithDetails(s.info) else s.info
          }.toSeq
      }
    

    总结

    本文首先了解了Spark内的键值对存储KVStore的部分细节,探究了与AppStatusStore相关的具体实现类InMemoryStore与ElementTrackingStore。然后通过阅读监听器AppStatusListener的部分代码,明确了AppStatusStore内状态数据的来源。最后将两者结合在一起,简述了AppStatusStore是如何将数据反馈到前端的。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划 番外篇A:AppStatusS

        本文链接:https://www.haomeiwen.com/subject/ciagzqtx.html