美文网首页
Spark Shuffle FetchFailedExcepti

Spark Shuffle FetchFailedExcepti

作者: JaxMa | 来源:发表于2020-06-17 22:40 被阅读0次

    某日遇到一个数据倾斜的SQL, 首先想到的方法就是加大Partition 看看数据hash 之后会不会落得 均匀,所以就将spark.sql.shuffle.partitions从原来的500 加大到2700 .
    结果反而失败了, 错误如下:

    FetchFailed(BlockManagerId(516, nfjd-hadoop02-node352.jpushoa.com, 7337, None), shuffleId=3, mapId=59, reduceId=917, message=
    org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
        at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:941)
        at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:811)
        at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:326)
    Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (usd: 2147483648, max: 2147483648)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
        ... 1 more
    
    

    可以看出是 shuffle 阶段 fetch 数据 导致的内存溢出
    一开始我拿到这个错误的时候有点蒙蔽了,按我的理解我加大了shuffle partitions 每个task的数据量应该是有所减少才对的,(而且在spark web ui 上也是如此体现.),那为什么数据量小了反而会在Shuffle Read 阶段Fetch Fail呢?
    所以就有了以下一系列的原理性排查:

    Spark Shuffle Read 是如何进行的

    Spark Shuffle Read 是发生在 Reduce Task 阶段的, 由ReduceTask 所在的executor 需要去Driver 获取Shuffle数据(包括数据大小), 在哪个executor 上.并且去Fetch ,这里有一个机制就是:
    当要fetch 的block size 比较大的时候使用的是stream的方式流取到本地磁盘不会一次性加载到内存.防止了内存溢出. 但当block size小于某个阈值时则将整个Block读到内存进行排序等操作. 那这个阈值是多少呢?
    就是spark.maxRemoteBlockSizeFetchToMem 默认的大小是: Int.MaxValue - 512 大概就是2G这样. 源码如下:

    
    //在BlockManager 初始化的时候就会将配置中的  spark.maxRemoteBlockSizeFetchToMem  设置进成员变量private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
    
    /** * Get block from remote block managers as serialized bytes. */
    
    def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {  
    
     logDebug(s"Getting remote block $blockId")  
     require(blockId != null, "BlockId is null")  
     var runningFailureCount = 0  
     var totalFailureCount = 0   
     
     // 这里通过BlockManager 去与driver的BlockManager 进行调用获取block信息. 
     val locationsAndStatus = master.getLocationsAndStatus(blockId)  
     
     val blockSize = locationsAndStatus.map { b =>    b.status.diskSize.max(b.status.memSize)  }.getOrElse(0L)  
     val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)  
     
     
    //这里就是 获取配置 spark.maxRemoteBlockSizeFetchToMem 的值 然后判断block 是否是需要使用流方式spill读取,还是直接读进内存
    val tempFileManager = if (blockSize > maxRemoteBlockToMem) {   
    remoteBlockTempFileManager 
    } else {
    null  
    }
    
    

    这么看来好像是这个数据设置的太大了, 假设我的多个Block数据都是1G多的那么多几个 那肯定吃不消,是会内存溢出的. 接下来就把这个值spark.maxRemoteBlockSizeFetchToMem 设置成10m ,使其都进行流的方式读取到磁盘这样就不会导致错误了. em...很不幸,不成功,依旧是失败的...一样的错误, 整个人裂开了....(卡了一天)

    继续开始想 ,这里很迷惑,为什么用流的方式读取还是错误呢? 经过了不断地翻阅 , 把问题锁定在了block size 这里了,因为只有这里会有不一样,所以通过阅读当前执行计划中的使用sortmerge join中的SortShuffleWriter.scala中写数据并且记录block size大小的源码:

    override def write(records: Iterator[Product2[K, V]]): Unit = {
        sorter = if (dep.mapSideCombine) {
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
          // care whether the keys get sorted in each partition; that will be done on the reduce side
          // if the operation being run is sortByKey.
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
        sorter.insertAll(records)
    
        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
        val tmp = Utils.tempFileWith(output)
        try {
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    
          "这里需要去创建一个mapStatus 去返回当前map完的数据输出地址,以及大小等信息"
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
    

    从上面的 MapStatus跳进来:

      private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
        .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
        .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)
    
      def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
        if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
          HighlyCompressedMapStatus(loc, uncompressedSizes)
        } else {
          new CompressedMapStatus(loc, uncompressedSizes)
        }
      }
    
    

    这里根据一个参数:SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS(对应配置:spark.shuffle.minNumPartitionsToHighlyCompress 就是这里!设置了一个2000 的阈值) 根据 shuffle partitions 的数量来判断是要使用HighlyCompressedMapStatus
    还是CompressedMapStatus

    • HighlyCompressedMapStatus: 从名字就可以得知是一个高压缩的MapStatus, 适用于2000以上的shuffle partitions的情况,所以只会记录一个平均值的block size,还有某些超过一定阈值大小的block size 配置:spark.shuffle.accurateBlockThreshold
    • CompressedMapStatus : 这个是会记录所有block size的MapStatus

    谜底揭开:原来没有想到的是,在超过shuffle parititon 2000 的情况下尽然会有这样的变化

    结论:

    • Spark 在处理 shuffle partition >2000 的时候为了优化起见并不会记录所有Map阶段产生的Block 大小而是会转而使用HighlyCompressedMapStatus记录. 由参数spark.shuffle.minNumPartitionsToHighlyCompress(默认2000)控制
    • HighlyCompressedMapStatus 内部也会记录部分超过阈值的block size ,由参数:spark.shuffle.accurateBlockThreshold(默认 100 * 1024 * 1024 B)配置
    • shuffle read 的时候会根据读取的block size 判断是否要是要使用流读取还是一次性加载到内存 . 由参数: spark.maxRemoteBlockSizeFetchToMem(默认 Int.MaxValue - 512)

    相关文章

      网友评论

          本文标题:Spark Shuffle FetchFailedExcepti

          本文链接:https://www.haomeiwen.com/subject/bvpexktx.html