美文网首页spark||flink||scala
Spark推测执行解决SparkStreaming任务task卡

Spark推测执行解决SparkStreaming任务task卡

作者: invincine | 来源:发表于2018-11-09 17:13 被阅读0次

    背景:测试环境运行一个SparkStreaming任务,yarn-cluster模式,duration为5分钟一个批次,每个批次平均2000w条records,并行度为60

    资源配置为:

    ${SPARK_HOME}/bin/spark-submit --name ${jobname} --driver-cores 3 --driver-memory 6g --num-executors 24 --executor-memory 6g --executor-cores 2
    

    问题:
    观察了前几个批次的任务,运行正常,效率也能赶得上批次,平均每个批次的task都在4分钟以内处理完了,结果数据也没问题,心想应该没问题了,就下班了。结果第二天上班后再观察,发现数据延迟了几个小时,任务卡在了某个批次,再看这个批次的详细情况,发现除了运行在某台服务器上的task还在running,其他的task都已经完成了,而这台服务器上的task已经running了几个小时了...

    尝试解决问题:
    首先登陆这台服务器查看yarn的container日志,并没有发现ERROR,再看了看这台服务器的资源,cpu和内存使用情况,也是正常的!顿时有点摸不着头脑,没有报错,任务就卡在那儿了。

    重启大法:
    重启大法好,秉承着重启解决一切问题的思想,我把SparkStreaming任务重启了,万一是偶然现象呢。
    结果没过两个小时就被打脸了,同样的问题又出现了。

    查看日志:
    没有ERROR日志,那就看看INFO日志,任务在哪个环节卡住的,结果发现了以下日志内容

    INFO executor.Executor: Finished task 51.0 in stage 148.0 (TID 8931). 903 bytes result sent to driver
    INFO storage.BlockManager: Dropping broadcast blocks older than 1541581868061
    INFO util.MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS
    INFO storage.BlockManager: Dropping non broadcast blocks older than 1541581868066
    INFO util.MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER
    INFO storage.BlockManager: Dropping broadcast blocks older than 1541582048061
    INFO util.MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS
    INFO storage.BlockManager: Dropping non broadcast blocks older than 1541582048066
    INFO util.MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER
    INFO storage.BlockManager: Dropping broadcast blocks older than 1541582228061
    ...
    

    在卡死的服务器的yarn stderr日志中,卡死批次的时间点找到以上日志内容,每三分钟出现一次,正常的日志内容应该是如下所示:

    INFO executor.Executor: Finished task 28.0 in stage 145.0 (TID 8728). 1852 bytes result sent to driver
    INFO storage.BlockManager: Dropping broadcast blocks older than 1541581328061
    INFO storage.BlockManager: Dropped block broadcast_133_piece0
    INFO storage.BlockManager: Dropping non broadcast blocks older than 1541581328066
    INFO storage.BlockManager: Dropped block broadcast_133
    INFO util.MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER
    INFO util.MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS
    INFO output.FileOutputCommitter: Saved output of task 'attempt_201811071730_0218_r_000037_0' to hdfs://xxxxxxx/xxxxxxx/log/_temporary/0/task_201811071730_0218_r_000037
    INFO executor.Executor: Finished task 37.0 in stage 145.0 (TID 8737). 1852 bytes result sent to driver
    INFO output.FileOutputCommitter: Saved output of task 'attempt_201811071730_0218_r_000058_0' to hdfs://xxxxxxx/xxxxxxx/log/_temporary/0/task_201811071730_0218_r_000058
    INFO executor.Executor: Finished task 58.0 in stage 145.0 (TID 8758). 1852 bytes result sent to driver
    INFO storage.BlockManager: Removing RDD 213
    

    MetadataCleaner用于定期清理persist的RDD缓存和stage task中产生的元数据,实质上是一个TimerTask实现的定时器,其中BLOCK_MANAGER和BROADCAST_VARS属于MetadataCleanerType枚举类的其中两个元数据类别,代表blockmanager中非broadcast的元数据部分和broadcast的元数据部分。

    但是搞不懂为什么会一直重复的在清理,导致task卡死,查阅了网上各种文档也没有类似的问题解决方法。

    于是在想有没有办法能够跳过这个task或者让这个task重启,最后想到了Spark的推测执行。
    Spark推测执行就是适用于个别task比其他的task慢的情况,当某些个task特别慢的时候(满足条件),Spark就重启一个task处理同样的一份数据,谁先处理好就用谁的数据,把另外一个task杀掉,正好可以解决我的问题。

    配置推测执行:
    sparkConf.set("spark.speculation", "true")
    sparkConf.set("spark.speculation.interval", "300s")
    sparkConf.set("spark.speculation.quantile","0.9")

    spark.speculation设置为true表示打开推测执行功能
    spark.speculation.interval表示检测周期,spark会开启一个线程来检测是否需要推测执行
    spark.speculation.quantile表示阈值,设为0.9表示该批次所有的task有90%执行完成即对剩余的task执行推测执行
    spark.speculation.multiplier默认值为1.5,表示慢的task执行时间比完成的task平均时间多耗费1.5倍开启推测执行

    这里把检测周期设为5分钟是为了防止资源浪费,过了批次时间之后再检测是否有卡死的task,毕竟task卡死是偶尔出现,当卡死情况出现时,是个别几个task出现问题,不会超过60个task总数的10%,所以把阈值设为0.9,这样设置不会在正常批次开启推测执行,从而节约了资源。

    总结:
    通过巧妙的设置推测执行,这个问题暂时被我绕开了,等有时间一定会研究MetadataCleaner的源码,尝试正面解决问题,立个flag。

    相关文章

      网友评论

        本文标题:Spark推测执行解决SparkStreaming任务task卡

        本文链接:https://www.haomeiwen.com/subject/pghoxqtx.html