目录
前言
AppStatusStore这个东西是在Spark 2.3.0版本才加入的,在Spark大家庭中是真正的新面孔。顾名思义,它用来存储Application的状态数据,Spark Web UI及REST API需要的数据都取自它。之前在写度量系统时,我曾经说Web UI的数据全部来自度量系统,这句话是错误的,深表歉意。
在本系列之前的文章中已经不止一次地提到了AppStatusStore,但都是几笔带过,从未认真分析,网上也没有前人发表过高见。本文是这个系列的第一篇番外,作为查漏补缺,今天就来稍微探索一下AppStatusStore的底层细节吧。
App状态数据的键值对存储
由前文的分析,我们已经知道AppStatusStore的构造依赖于两个要素:一为键值对存储KVStore,二为App状态监听器AppStatusListener。本节先来看KVStore,它位于o.a.s.util.kvstore包中,是一个Java接口,作为Spark内App状态数据键值对存储的基类。
KVStore
代码#A.1 - o.a.s.util.kvstore.KVStore接口
@Private
public interface KVStore extends Closeable {
<T> T getMetadata(Class<T> klass) throws Exception;
void setMetadata(Object value) throws Exception;
<T> T read(Class<T> klass, Object naturalKey) throws Exception;
void write(Object value) throws Exception;
void delete(Class<?> type, Object naturalKey) throws Exception;
<T> KVStoreView<T> view(Class<T> type) throws Exception;
long count(Class<?> type) throws Exception;
long count(Class<?> type, String index, Object indexedValue) throws Exception;
}
顾名思义,该接口定义的大多数方法都是非常常见的,比如read()、write()、delete()、count()等,也符合它作为键值对存储的定位。另外,它还会提供基本的元数据存取(get/setMetadata()方法)及视图操作(view()方法)。
根据其注释中的描述,KVStore的子类可以(不是必须)支持以下两个特性:
- 序列化:如果数据需要序列化及压缩存储,可以利用定义好的KVStoreSerializer序列化器来做。
- Key自动管理与:KVStore可以自动利用各个类型的名字(也就是代码#A.1中的klass或者type)来作为键,与自然键(naturalKey)配合即可唯一确定一条记录。如果为写入KVStore的数据结构的字段加上@KVIndex注解的话,就会为这些字段创建索引,方便排序。
下图示出以KVStore接口为中心的类结构。
图#A.1 - KVStore的继承体系其中,InMemoryStore是在内存中维护的键值对存储;LevelDB则是借助Google开源的KV数据库来实现,可以持久化到磁盘。ElementTrackingStore额外加上了跟踪元素个数的功能,可以根据元素个数阈值触发特定的操作,但它更多地是个包装类,需要依赖于InMemoryStore或者LevelDB。
AppStatusStore的实现依赖于InMemoryStore和ElementTrackingStore,下面分别来看。
InMemoryStore与InMemoryView
由于不需要持久化,该类只是用一个ConcurrentHashMap保存对象类型与对象列表的映射,即:
private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
对象列表由静态内部类InstanceList来实现。看官如果有兴趣,可以自行查看InstanceList的源码,其中涉及到较多Java反射方面的知识。以下则是InMemoryStore实现的read()、write()和view()方法。
代码#A.2 - o.a.s.util.kvstore.InMemoryStore.read()/write()/count()/view()方法
@Override
public <T> T read(Class<T> klass, Object naturalKey) {
InstanceList list = data.get(klass);
Object value = list != null ? list.get(naturalKey) : null;
if (value == null) {
throw new NoSuchElementException();
}
return klass.cast(value);
}
@Override
public void write(Object value) throws Exception {
InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
try {
return new InstanceList(key);
} catch (Exception e) {
throw Throwables.propagate(e);
}
});
list.put(value);
}
@Override
public <T> KVStoreView<T> view(Class<T> type){
InstanceList list = data.get(type);
return list != null ? list.view(type)
: new InMemoryView<>(type, Collections.<T>emptyList(), null);
}
read()和write()方法自不必多说,view()方法则是通过调用InstanceList.view()方法生成当前KVStore的某一个类型对应的视图InMemoryView。InMemoryView继承自抽象类KVStoreView,并且只能通过内部自定义的迭代器InMemoryIterator来访问。下面是获取InMemoryIterator的方法。
代码#A.3 - o.a.s.util.kvstore.InMemoryStore.InMemoryView.iterator()方法
@Override
public Iterator<T> iterator() {
if (elements.isEmpty()) {
return new InMemoryIterator<>(elements.iterator());
}
try {
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
int modifier = ascending ? 1 : -1;
final List<T> sorted = copyElements();
Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
Stream<T> stream = sorted.stream();
if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}
if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}
if (skip > 0) {
stream = stream.skip(skip);
}
if (max < sorted.size()) {
stream = stream.limit((int) max);
}
return new InMemoryIterator<>(stream.iterator());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
由上可知,视图内的数据首先会被排序,然后利用Java Stream API做一些过滤的操作,最后返回代表有序数据的InMemoryIterator。在AppStatusStore中,从KVStore取数据经常会用到视图,这也就解释了为什么我们在Spark UI中看到的很多信息(比如Task、Stage等)都是已经排好序的。
ElementTrackingStore
ElementTrackingStore的初始化依赖于InMemoryStore,因此它的多数方法都是直接代理了InMemoryStore的方法。为了实现跟踪元素数并触发操作的功能,其内部维护了一个类型与触发器(通过内部样例类Trigger定义)的映射关系,添加触发器的方法如下。
代码#A.4 - o.a.s.status.ElementTrackingStore.addTrigger()方法
def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = {
val existing = triggers.getOrElse(klass, Seq())
triggers(klass) = existing :+ Trigger(threshold, action)
}
其中,threshold是一个整形值,表示对应类型元素个数的阈值。action是一个偏函数,表示到达阈值之后需要执行的操作。在其重载的write()方法中,我们能清楚地看到这个逻辑。
代码A.5 - o.a.s.status.ElementTrackingStore.write()方法
def write(value: Any, checkTriggers: Boolean): Unit = {
write(value)
if (checkTriggers && !stopped) {
triggers.get(value.getClass()).foreach { list =>
doAsync {
val count = store.count(value.getClass())
list.foreach { t =>
if (count > t.threshold) {
t.action(count)
}
}
}
}
}
}
需要注意的是,可以通过checkTriggers参数来控制是否触发。另外,可以通过配置项spark.appStateStore.asyncTracking.enable设置是否异步触发操作(代码中的doAsync()方法),默认值为true。
通过以上分析,我们对App状态数据的键值对存储有了大致的了解。接下来轮到AppStatusListener了。
App状态监听器
AppStatusListener类继承自SparkListener类,因此实现了很多SparkListener中定义的监听事件处理方法。我们不急着研究这些方法,而先来看一些前置的东西。
LiveEntity
所谓LiveEntity,可以直译为“活动实体”,指的是那些在Application运行过程中状态在不断变化的Spark内部组件,比如Job、Stage、Task、Executor、RDD,它们会向ElementTrackingStore更新自己的状态数据。LiveEntity是一个抽象类,其定义如下,比较简单,不过多解释。
代码#A.6 - o.a.s.status.LiveEntity抽象类
private[spark] abstract class LiveEntity {
var lastWriteTime = -1L
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
lastWriteTime = now
}
protected def doUpdate(): Any
}
所有LiveEntity的实现类(比如LiveJob、LiveTask等)都包含了大量的监控信息和度量信息,监控信息来自AppStatusListener,度量信息来自MetricsSystem。并且它们都需要实现doUpdate()方法,该方法负责将LiveEntity最新的状况反映给ElementTrackingStore。在AppStatusListener类中,也预先定义了各个LiveEntity的缓存。
代码#A.7 - o.a.s.status.AppStatusListener中LiveEntity的缓存
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
添加清理触发器
在AppStatusListener的构造方法中,首先就调用了ElementTrackingStore.addTrigger()方法添加触发器,代码如下。
代码#A.8 - o.a.s.status.AppStatusListener添加触发器
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }
kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
cleanupJobs(count)
}
kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
cleanupStages(count)
}
这三个触发器分别用于在Job、Stage和Executor信息的数量超过SparkConf内规定的数值(名称均为spark.ui.retained***)时,将那些较旧的信息删除掉,Task信息则会随着Stage清除。以清理Job的cleanupJobs()方法为例,代码如下。
代码#A.9 - o.a.s.status.AppStatusListener.cleanupJobs()方法
private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
return
}
val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}
其具体逻辑是:先计算出需要删除的记录数目,然后从KVStore中获取Job信息包装类JobDataWrapper对应的视图,并按照Job完成时间的索引排序。接下来取出要删除的Job记录(Job不能是在执行,也不能是未知状态),再调用KVStore.delete()方法删除之。
监听事件处理方法
由于AppStatusListener监听的事件甚多,所以我们只选取其中一个有代表性的onJobStart()方法来看一看。
代码#A.10 - o.a.s.status.AppStatusListener.onJobStart()方法
override def onJobStart(event: SparkListenerJobStart): Unit = {
val now = System.nanoTime()
val numTasks = {
val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
missingStages.map(_.numTasks).sum
}
val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val jobGroup = Option(event.properties)
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
val job = new LiveJob(
event.jobId,
lastStageName,
if (event.time > 0) Some(new Date(event.time)) else None,
event.stageIds,
jobGroup,
numTasks)
liveJobs.put(event.jobId, job)
liveUpdate(job, now)
event.stageInfos.foreach { stageInfo =>
val stage = getOrCreateStage(stageInfo)
stage.jobs :+= job
stage.jobIds += event.jobId
liveUpdate(stage, now)
}
event.stageInfos.foreach { stage =>
val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes)
val uigraph = new RDDOperationGraphWrapper(
stage.stageId,
graph.edges,
graph.outgoingEdges,
graph.incomingEdges,
newRDDOperationCluster(graph.rootCluster))
kvstore.write(uigraph)
}
}
在接收到表示Job启动的SparkListenerJobStart事件后,该方法的大致流程如下:
- 根据该Job的Stage信息,估算出一个大致的Task数目,获取其最后一个Stage的名称;
- 封装一个LiveJob实例,将其放入缓存,并调用liveUpdate()方法向KVStore更新状态。liveUpdate()方法最终调用的就是LiveEntity.write()方法;
- 调用getOrCreateStage()方法生成LiveStage实例,同样向KVStore更新状态;
- 生成RDD的DAG表示,并写入KVStore中。
AppStatusListener监听的每个事件都会采用类似上面的逻辑来处理,将数据写入KVStore之后,就可以通过AppStatusStore将它们取出并且展示了。
基于KVStore和监听器包装AppStatusStore
有了存储数据的ElementTrackingStore和监听并写入数据的AppStatusListener,AppStatusStore的实现就会非常简单,只需要调用read()或view()从ElementTrackingStore中以一定的规则取出数据进行包装即可。
例如,在讲解SparkUI的文章#14中已经提到,构造EnvironmentPage时(参见代码清单#14.8),就会调用AppStatusStore.environmentInfo()方法。
代码#A.11 - o.a.s.status.AppStatusStore.environmentInfo()方法
def environmentInfo(): v1.ApplicationEnvironmentInfo = {
val klass = classOf[ApplicationEnvironmentInfoWrapper]
store.read(klass, klass.getName()).info
}
再举一例,如果要展示Stage相关的信息,就会调用stageData()方法,实现起来同样简单方便。
代码#A.12 - o.a.s.status.AppStatusStore.stageData()方法
def stageData(stageId: Int, details: Boolean = false): Seq[v1.StageData] = {
store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId)
.asScala.map { s =>
if (details) stageWithDetails(s.info) else s.info
}.toSeq
}
总结
本文首先了解了Spark内的键值对存储KVStore的部分细节,探究了与AppStatusStore相关的具体实现类InMemoryStore与ElementTrackingStore。然后通过阅读监听器AppStatusListener的部分代码,明确了AppStatusStore内状态数据的来源。最后将两者结合在一起,简述了AppStatusStore是如何将数据反馈到前端的。
网友评论