美文网首页Spark在简书Spark深入学习Spark 应用
SPARK-21444踩坑记录: DAGSchedulerEve

SPARK-21444踩坑记录: DAGSchedulerEve

作者: 旺旺鸽不鸽 | 来源:发表于2019-05-12 08:13 被阅读2次

    原文

    前段时间工作中踩到SPARK-21444的坑,这里做个记录。

    1 场景描述

    我们的一个spark app在正常运行几个月后经常出现driver端hang住的情况,用yarn container -list查看发现只有driver的container还在,executors已全部退出。然后查看driver端的日志,发现DAGSchedulerEventProcessLoop failed,SparkContext已shut down,错误栈如下:

    19/03/27 13:39:37 ERROR scheduler.DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext

    org.apache.spark.SparkException: Exception thrown in awaitResult:

            at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

            at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

            at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:152)

            at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:306)

            at org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:197)

            at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111)

            at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)

            at org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:180)

            at org.apache.spark.ShuffleStatus$$anonfun$removeOutputsOnExecutor$1.apply$mcVI$sp(MapOutputTracker.scala:118)

            at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)

            at org.apache.spark.ShuffleStatus.removeOutputsOnExecutor(MapOutputTracker.scala:114)

            at org.apache.spark.MapOutputTrackerMaster$$anonfun$removeOutputsOnExecutor$2.apply(MapOutputTracker.scala:424)

            at org.apache.spark.MapOutputTrackerMaster$$anonfun$removeOutputsOnExecutor$2.apply(MapOutputTracker.scala:424)

            at scala.collection.Iterator$class.foreach(Iterator.scala:893)

            at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

            at org.apache.spark.MapOutputTrackerMaster.removeOutputsOnExecutor(MapOutputTracker.scala:424)

            at org.apache.spark.scheduler.DAGScheduler.handleExecutorLost(DAGScheduler.scala:1471)

            at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1428)

            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1787)

            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1745)

            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1734)

            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    Caused by: java.io.IOException: Connection reset by peer

            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

            at sun.nio.ch.IOUtil.read(IOUtil.java:192)

            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

            at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)

            at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)

            at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)

            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)

            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)

            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)

            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)

            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)

            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)

            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)

            at java.lang.Thread.run(Thread.java:748)

    19/03/27 13:39:37 INFO scheduler.DAGScheduler: Executor lost: 388 (epoch 5955)

    2 源码分析

    2.1 为何调用handleExecutorLost

    从错误栈看到dagscheduler在处理CompletionEvent时(handleTaskCompletion方法中)调用了handleExecutorLost,我们看下handleTaskCompletion的源码:

    /**

      * Responds to a task finishing. This is called inside the event loop so it assumes that it can

      * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.

      */

      private[scheduler] def handleTaskCompletion(event: CompletionEvent) {

    //省略无关代码 (下面以。。。代替)

        event.reason match {

        //。。。

        case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>

          //。。。

              // TODO: mark the executor as failed only if there were lots of fetch failures on it

              if (bmAddress != null) {

                handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))

              }

            }

        //...

        }

    可以看到,当task complete的原因是FetchFailed并且FetchFailed传回的blockManager地址不为空,则DAGScheduler会认为对应的executor挂了,然后调用handleExecutorLost进行处理。

    注意:这里有标注 //TODO: mark the executor as failed only if there were lots of fetch failures on it

    但是当前版本(spark 2.2.3)中,只要一个executor上有一次fetchFailed就会调用handleExecutorLost处理。

    2.2 为何会有FetchFailed

    为什么会有FetchFailed产生呢?我们在driver日志中发现:

    19/03/26 00:42:32 ERROR cluster.YarnClusterScheduler: Lost executor 674 on hostA: Container container_e131_1552474836332_972818_01_000882 on host: hostA was preempted.

    还发现:

    org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostA/xxxxxx:xxx

    这说明FetchFailed是由于hostA上的executor 674的yarn container被preempted了。这是因为随着我们集群用户增多,hadoop集群负载逐渐增加,导致出现资源抢占。

    下面我们看看handleExecutorLost做了什么。

    2.3 handleExecutorLost做了什么

      private[scheduler] def handleExecutorLost(

          execId: String,

          filesLost: Boolean,

          maybeEpoch: Option[Long] = None) {

        val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)

        if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {

          failedEpoch(execId) = currentEpoch

          logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))

          blockManagerMaster.removeExecutor(execId)

          if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {

            logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))

            mapOutputTracker.removeOutputsOnExecutor(execId)

            clearCacheLocs()

          }

        } else {

          logDebug("Additional executor lost message for " + execId +

                  "(epoch " + currentEpoch + ")")

        }

      }

    handleExecutorLost除了调用blockManagerMaster.removeExecutor移除lost executor外,还调用了mapOutputTracker.removeOutputsOnExecutor方法 (关于MapOutputTracker, 读者可参考[Spark MapOutputTracker浅析](https://www.jianshu.com/p/1409dbc78a15)):

      def removeOutputsOnExecutor(execId: String): Unit = {

        shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnExecutor(execId) }

        incrementEpoch()

      }

    shuffleStatuses记录了所有shuffle的shuffleStatus对象,对每个shuffleStatus调用其removeOutputsOnExecutor方法移除指定executor上运行的所有shuffle map tasks的mapStatus :

      def removeOutputsOnExecutor(execId: String): Unit = synchronized {

        for (mapId <- 0 until mapStatuses.length) {

          if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId == execId) {

            _numAvailableOutputs -= 1

            mapStatuses(mapId) = null

            invalidateSerializedMapOutputStatusCache()

          }

        }

      }

    2.4 removeOutputsOnExecutor做了什么

    shuffleStatus的removeOutputsOnExecutor方法最后会调用invalidateSerializedMapOutputStatusCache方法,这个方法会将mapStatus的cache清理掉,清理cache的步骤之一就是销毁广播变量cachedSerializedBroadcast(这个广播变量包含了序列化后的所有shuffle map tasks的output状态信息,即mapStatus对象)。

      /**

      * Clears the cached serialized map output statuses.

      */

      def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {

        if (cachedSerializedBroadcast != null) {

          cachedSerializedBroadcast.destroy()

          cachedSerializedBroadcast = null

        }

        cachedSerializedMapStatus = null

      }

    这里需要简单介绍一下destroy广播变量的过程:

    销毁一个broadcast变量其实就是删除所有节点上对应的broacast block,大致流程是:

    1. 向blockManagerMasterEndpoint发送RemoveBroadcast消息。

    2. blockManagerMasterEndpoint接收到RemoveBroadcast消息后会将此消息转发给所有executors和driver上的blockManagerSlaveEndpoint.

    3. blockManagerSlaveEndpoint接收到RemoveBroadcast消息后会调用本地blockManager的removeBroadcast方法移除指定broadcast的所有blocks.

    2.5 DAGSchedulerEventProcessLoop是如何fail的

    通过上面的介绍,我们知道:

    1. preemption导致FetchFailed异常;

    2. FetchFailed异常导致dagScheduler.handleExecutorLost方法被调用;

    3. handleExecutorLost方法会调用shuffleStatus.invalidateSerializedMapOutputStatusCache方法清理map outputs状态信息缓存;

    4. invalidateSerializedMapOutputStatusCache方法调用broadcast.destroy方法销毁map outputs状态信息缓存的广播变量;

    5. destroy方法最终会通过blockManagerMasterEndpoint向所有的blockManagerSlaveEndpoint发送RemoveBroadcast消息并等待返回。

    现在我们来分析DAGSchedulerEventProcessLoop failed的原因:

    因为FetchFailed是preemption产生的,所以必然有一个executor因为container被preempted而挂掉,而这个时候向所有blockManagerSlaveEndpoint发送的RemoveBroadcast消息必然也会发向这个挂掉的executor,从而导致connection exception。而这个异常在invalidateSerializedMapOutputStatusCache方法及其外层调用方法中都没有被捕获,最终被抛到DAGSchedulerEventProcessLoop中并导致其failed.

    3 解决方案

    这个bug已经在spark 2.3.0中被fix了,但是为了避免引入新的bug,我们没有直接升级到2.3版本,而是在2.2.3版本上加上了2.3.0中针对这个bug的fix,fix的代码很简单,只是修改了一下invalidateSerializedMapOutputStatusCache方法:

      /**

      * Clears the cached serialized map output statuses.

      */

      def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {

        if (cachedSerializedBroadcast != null) {

          // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)

          Utils.tryLogNonFatalError{

            // Use `blocking = false` so that this operation doesn't hang while trying to send cleanup

            // RPCs to dead executors.

            cachedSerializedBroadcast.destroy(blocking = false)

          }

          cachedSerializedBroadcast = null

        }

        cachedSerializedMapStatus = null

      }

    4 总结

    本文从报错栈开始,通过源码分析一步步找出DAGSchedulerEventProcessLoop在executor被preempted的情况下fail掉的root cause,并展示了spark官方的fix方法。

    5 说明

    本文源码版本:2.2.3

    相关文章

      网友评论

        本文标题:SPARK-21444踩坑记录: DAGSchedulerEve

        本文链接:https://www.haomeiwen.com/subject/nlyqaqtx.html