美文网首页玩转大数据大数据Spark On K8S
Spark on k8s: OkHttp WebSocket 非

Spark on k8s: OkHttp WebSocket 非

作者: Kent_Yao | 来源:发表于2019-09-25 19:26 被阅读0次

    问题描述

    基于Spark 3.0-SNAPSHOT(unreleased),做Spark-Terasort相关测试,任务正常的话分如下图所示两个stage,

    spark terasort job

    第一个,stage 0,读取hdfs input目录数据,并进行shuffle write
    第二个,stage 1,进行shuffle read,并向hdfs output目录的输出

    其中一次测试由于hdfs存储的配额不足,导致stage 1失败,fail 整个spark job,如下图所示。

    spark terasort job error

    此时按照Spark on k8s正常的逻辑,会执行到SparkContext.stop, 各类线程该停停该关关,各executor进程应该收到exit的命令,然后做完这些,主线程退出,留给JVM收尾最后Driver 进程停止。当然Driver pod会留给k8s去进行垃圾回收。

    然而在实际的情况下,却发现整个Spark作业依然占着k8s集群的资源,Driver pod状态一直处于running的状态。


    image.png

    在client侧自然也无法获得该作业的“实际状态”


    image.png

    在这种情况下,Spark on k8s作业就无法像类似Spark on yarn的作业,不依靠一些额外的监控手段才能感知app的运行状态。

    测试的过程中,模拟了各种失败的场景,stage/job级别的异常基本上,都让app卡死了.

    原因分析

    分析Spark on k8s作业的异常,和其他调度器(yarn等)作业也基本一致,各进程的日志信息,jstack信息,gc信息等,还可以通过Spark UI 获取一些作业相关的信息。区别点在于Spark on k8s可能还需要看下各个Pod的状态等相关信息

    1. 首先查看Driver Pod状态,未见异常


      driver pod status
    2. 查看Driver/executor进程jstack


      driver jstack
    executor jstack

    果不其然,driver 进程中 DestoryJavaJVM被一个OkHttp WebSocket...非daemon线程给拦住了去路...

    根据线程的名字可以猜到这个driver端启动的k8s api server 通信的client有关

    翻翻Spark 源码

    1. Driver侧的 k8s client有定义关闭自己的逻辑
    2. Spark 在初始化OkHttpClient的时候把ping interval设置为0,
    val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
          .withApiVersion("v1")
          .withMasterUrl(master)
          .withWebsocketPingInterval(0)
          .withRequestTimeout(clientType.requestTimeout(sparkConf))
          .withConnectionTimeout(clientType.connectionTimeout(sparkConf))
          .withOption(oauthTokenValue) {
            (token, configBuilder) => configBuilder.withOauthToken(token)
          }.withOption(oauthTokenFile) {
            (file, configBuilder) =>
                configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
          }.withOption(caCertFile) {
            (file, configBuilder) => configBuilder.withCaCertFile(file)
          }.withOption(clientKeyFile) {
            (file, configBuilder) => configBuilder.withClientKeyFile(file)
          }.withOption(clientCertFile) {
            (file, configBuilder) => configBuilder.withClientCertFile(file)
          }.withOption(namespace) {
            (ns, configBuilder) => configBuilder.withNamespace(ns)
          }.build()
    
    

    而OkHttpClient 中 pingIntervalMillis 为0时,这个线程并不会被调度。。

    public void initReaderAndWriter(
          String name, long pingIntervalMillis, Streams streams) throws IOException {
        synchronized (this) {
          this.streams = streams;
          this.writer = new WebSocketWriter(streams.client, streams.sink, random);
          this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
          if (pingIntervalMillis != 0) {
            executor.scheduleAtFixedRate(
                new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
          }
          if (!messageAndCloseQueue.isEmpty()) {
            runWriter(); // Send messages that were enqueued before we were connected.
          }
        }
    
        reader = new WebSocketReader(streams.client, streams.source, this);
      }
    

    开始凌乱了。。。

    既然定义了自己关闭的逻辑,没有正常的关闭,想必是有没法捕获的系统异常发生,比如OOM

    但是第2点就没法解释了,jar包冲突,不兼容?尝试把okhttp的构件升级到kubernetes-client的依赖版本(3.12.0)依然没有用。。

    解决办法

    https://issues.apache.org/jira/browse/SPARK-27927
    https://issues.apache.org/jira/browse/SPARK-27812
    目前社区对这个问题有两个类似的issue跟踪,其中也不乏一些尝试,不乏一些原因的猜测,但貌似都没找到根源。

    可以尝试的方法,

    1. 回退kubernete-client到3.0版本
    2. 加上 -XX:OnOutOfMemoryError="kill -9 %p"来跑,可以规避一些driver oom导致的问题
    3. 使用UncaughtExceptionHandler
    4. 调用System.exit

    相关文章

      网友评论

        本文标题:Spark on k8s: OkHttp WebSocket 非

        本文链接:https://www.haomeiwen.com/subject/sljiyctx.html