某日遇到一个数据倾斜的SQL, 首先想到的方法就是加大Partition 看看数据hash 之后会不会落得 均匀,所以就将spark.sql.shuffle.partitions从原来的500 加大到2700 .
结果反而失败了, 错误如下:
FetchFailed(BlockManagerId(516, nfjd-hadoop02-node352.jpushoa.com, 7337, None), shuffleId=3, mapId=59, reduceId=917, message=
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:941)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:811)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:326)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (usd: 2147483648, max: 2147483648)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
... 1 more
可以看出是 shuffle 阶段 fetch 数据 导致的内存溢出
一开始我拿到这个错误的时候有点蒙蔽了,按我的理解我加大了shuffle partitions 每个task的数据量应该是有所减少才对的,(而且在spark web ui 上也是如此体现.),那为什么数据量小了反而会在Shuffle Read 阶段Fetch Fail呢?
所以就有了以下一系列的原理性排查:
Spark Shuffle Read 是如何进行的
Spark Shuffle Read 是发生在 Reduce Task 阶段的, 由ReduceTask 所在的executor 需要去Driver 获取Shuffle数据(包括数据大小), 在哪个executor 上.并且去Fetch ,这里有一个机制就是:
当要fetch 的block size 比较大的时候使用的是stream的方式流取到本地磁盘不会一次性加载到内存.防止了内存溢出. 但当block size小于某个阈值时则将整个Block读到内存进行排序等操作. 那这个阈值是多少呢?
就是spark.maxRemoteBlockSizeFetchToMem 默认的大小是: Int.MaxValue - 512 大概就是2G这样. 源码如下:
//在BlockManager 初始化的时候就会将配置中的 spark.maxRemoteBlockSizeFetchToMem 设置进成员变量private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
/** * Get block from remote block managers as serialized bytes. */
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
// 这里通过BlockManager 去与driver的BlockManager 进行调用获取block信息.
val locationsAndStatus = master.getLocationsAndStatus(blockId)
val blockSize = locationsAndStatus.map { b => b.status.diskSize.max(b.status.memSize) }.getOrElse(0L)
val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)
//这里就是 获取配置 spark.maxRemoteBlockSizeFetchToMem 的值 然后判断block 是否是需要使用流方式spill读取,还是直接读进内存
val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
remoteBlockTempFileManager
} else {
null
}
这么看来好像是这个数据设置的太大了, 假设我的多个Block数据都是1G多的那么多几个 那肯定吃不消,是会内存溢出的. 接下来就把这个值spark.maxRemoteBlockSizeFetchToMem 设置成10m ,使其都进行流的方式读取到磁盘这样就不会导致错误了. em...很不幸,不成功,依旧是失败的...一样的错误, 整个人裂开了....(卡了一天)
继续开始想 ,这里很迷惑,为什么用流的方式读取还是错误呢? 经过了不断地翻阅 , 把问题锁定在了block size 这里了,因为只有这里会有不一样,所以通过阅读当前执行计划中的使用sortmerge join中的SortShuffleWriter.scala中写数据并且记录block size大小的源码:
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
"这里需要去创建一个mapStatus 去返回当前map完的数据输出地址,以及大小等信息"
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
从上面的 MapStatus跳进来:
private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
这里根据一个参数:SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS(对应配置:spark.shuffle.minNumPartitionsToHighlyCompress 就是这里!设置了一个2000 的阈值) 根据 shuffle partitions 的数量来判断是要使用HighlyCompressedMapStatus
还是CompressedMapStatus
- HighlyCompressedMapStatus: 从名字就可以得知是一个高压缩的MapStatus, 适用于2000以上的shuffle partitions的情况,所以只会记录一个平均值的block size,还有某些超过一定阈值大小的block size 配置:spark.shuffle.accurateBlockThreshold
- CompressedMapStatus : 这个是会记录所有block size的MapStatus
谜底揭开:原来没有想到的是,在超过shuffle parititon 2000 的情况下尽然会有这样的变化
结论:
- Spark 在处理 shuffle partition >2000 的时候为了优化起见并不会记录所有Map阶段产生的Block 大小而是会转而使用HighlyCompressedMapStatus记录. 由参数spark.shuffle.minNumPartitionsToHighlyCompress(默认2000)控制
- HighlyCompressedMapStatus 内部也会记录部分超过阈值的block size ,由参数:spark.shuffle.accurateBlockThreshold(默认 100 * 1024 * 1024 B)配置
- shuffle read 的时候会根据读取的block size 判断是否要是要使用流读取还是一次性加载到内存 . 由参数: spark.maxRemoteBlockSizeFetchToMem(默认 Int.MaxValue - 512)
网友评论