问题描述
基于Spark 3.0-SNAPSHOT(unreleased),做Spark-Terasort相关测试,任务正常的话分如下图所示两个stage,
spark terasort job第一个,stage 0,读取hdfs input目录数据,并进行shuffle write
第二个,stage 1,进行shuffle read,并向hdfs output目录的输出
其中一次测试由于hdfs存储的配额不足,导致stage 1失败,fail 整个spark job,如下图所示。
spark terasort job error此时按照Spark on k8s正常的逻辑,会执行到SparkContext.stop, 各类线程该停停该关关,各executor进程应该收到exit的命令,然后做完这些,主线程退出,留给JVM收尾最后Driver 进程停止。当然Driver pod会留给k8s去进行垃圾回收。
然而在实际的情况下,却发现整个Spark作业依然占着k8s集群的资源,Driver pod状态一直处于running的状态。
image.png
在client侧自然也无法获得该作业的“实际状态”
image.png
在这种情况下,Spark on k8s作业就无法像类似Spark on yarn的作业,不依靠一些额外的监控手段才能感知app的运行状态。
测试的过程中,模拟了各种失败的场景,stage/job级别的异常基本上,都让app卡死了.
原因分析
分析Spark on k8s作业的异常,和其他调度器(yarn等)作业也基本一致,各进程的日志信息,jstack信息,gc信息等,还可以通过Spark UI 获取一些作业相关的信息。区别点在于Spark on k8s可能还需要看下各个Pod的状态等相关信息
-
首先查看Driver Pod状态,未见异常
driver pod status -
查看Driver/executor进程jstack
driver jstack
果不其然,driver 进程中 DestoryJavaJVM被一个OkHttp WebSocket...非daemon线程给拦住了去路...
根据线程的名字可以猜到这个driver端启动的k8s api server 通信的client有关
翻翻Spark 源码
- Driver侧的 k8s client有定义关闭自己的逻辑
- Spark 在初始化OkHttpClient的时候把ping interval设置为0,
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withRequestTimeout(clientType.requestTimeout(sparkConf))
.withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
而OkHttpClient 中 pingIntervalMillis 为0时,这个线程并不会被调度。。
public void initReaderAndWriter(
String name, long pingIntervalMillis, Streams streams) throws IOException {
synchronized (this) {
this.streams = streams;
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
if (!messageAndCloseQueue.isEmpty()) {
runWriter(); // Send messages that were enqueued before we were connected.
}
}
reader = new WebSocketReader(streams.client, streams.source, this);
}
开始凌乱了。。。
既然定义了自己关闭的逻辑,没有正常的关闭,想必是有没法捕获的系统异常发生,比如OOM
但是第2点就没法解释了,jar包冲突,不兼容?尝试把okhttp的构件升级到kubernetes-client的依赖版本(3.12.0)依然没有用。。
解决办法
https://issues.apache.org/jira/browse/SPARK-27927
https://issues.apache.org/jira/browse/SPARK-27812
目前社区对这个问题有两个类似的issue跟踪,其中也不乏一些尝试,不乏一些原因的猜测,但貌似都没找到根源。
可以尝试的方法,
- 回退kubernete-client到3.0版本
- 加上 -XX:OnOutOfMemoryError="kill -9 %p"来跑,可以规避一些driver oom导致的问题
- 使用UncaughtExceptionHandler
- 调用System.exit
网友评论