对UnsafeShuffleWriter可优化配置主要在最终多个spill合并时,input和output缓存配置(mergeSpills方法),有transferTo(跳过Ring0,Ring 3层直接copy,主要在大文件场景,小文件或者有些内存bug场景不合适)和普通FileStream操作
TransferTo分支配置:
1.spark.shuffle.unsafe.fastMergeEnabled:True
2.spark.shuffle.compress为False ,或者为True时,spark.io.compression.codec:Snappy,LZF,LZ4,ZStd,默认LZ4,这点比Hadoop要好,Hadoop默认为zip,导致为不能blocks并发处理,也就失去并发计算框架能力。
普通FileStream优化参数,input/output buffer
spark.shuffle.file.buffer:32K
spark.shuffle.unsafe.file.output.buffer:32K
内存array:LongArray初始大小spark.shuffle.sort.initialBufferSize:4K
默认使用spark.shuffle.sort.useRadixSort:True 排序算法,比Tim Sort排序算法要快。缺点是在数组中排序,内存要增加0.5倍左右。
BypassMergeSortShuffleWriter条件分支:
1.没有order.
2.没有map combine.
3.partitions(Reduce数)小于spark.shuffle.sort.bypassMergeThreshold
BypassMergeSortShuffleWriter,同时打开partitions个临时文件写入,没有内存缓存,没有sort和merge,只是在最后阶段把NumberOf(partitions)临时文件合并到一个大文件,缺点是如果map*reduce足够多,走了hashsort老路。
SortShuffleWriter,数据写入内存(map combine:PartitionedAppendOnlyMap,否则PartitionedPairBuffer,都是array(2*len):k1v1k2v2...knvn,区别,前者返回在前面buckets,需要移动位置),内存不够时sort并spill产生临时文件,最终多个spill文件与内存数据排序与merge。
Shuffle落地磁盘文件,以map为单位落地数据文件和索引文件
localdirs[x]/subDirsPerLocalDir[y]/shuffle_[shuffle_id]_[map_id]_0_data
localdirs[x]/subDirsPerLocalDir[y]/shuffle_[shuffle_id]_[map_id]_0_index
网友评论