查看源码发现spark 2.4.0 目前只有SortShuffleManager一个实现类了,该类中已经实现了Tungsten-sort优化
在Spark 2.0 Hash Based Shuffle退出历史舞台
Tungsten-sort优化点主要在三个方面:
- 引入了堆外内存,直接在serialized binary data上sort而不是java objects,减少了memory的开销和GC的overhead。
- 提供cache-efficient sorter,使用一个8bytes的指针,把排序转化成了一个指针数组的排序,
在flink的内存池中,使用的memorySegment存储结构一直,point+key,对象的二进制数据 - spill的merge过程也无需反序列化即可完成
根据以下代可以知道spark会自动判断进行shuffle优化,选择最佳合适的shuffleHandle
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
BypassMergeSortShuffle
以前的版本中,HashShuffle会输出和reduce数量相等的bucket,从而形成大量的小文件,老版本中开启了map端的combine,在map端小文的合并,在 spark 2.4.0中只要spark.shuffle.sort.bypassMergeThreshold <= 200
会未每个reduce产生一个文件,Sort Shuffle只是产生一个按照reducer id排序可索引的文件(该文件中只是按照partitionid进行排序,同一个partition中的数据是无序的),这样,只需获取有关文件中的相关数据块的位置信息,这样优化的好处是对于reduce较少的情况下,Hash Shuffle明显要比Sort Shuffle快
BypassMergeSortShuffleWrite map端输出不会对record进行排序
1、map端输出无序,shuffle中只根据分区id进行排序,意味着用合并在reduce端预先排序的数据的优化和TimSort利用预排序数据的优势将会不复存在
2、每个溢写的数据块首先将描述指针数组排序然后输出一个索引的分区文件,然后将这些分区合并到一个索引的输出文件中。
SerializedShuffleHandle
Serialized sorting: used when all three of the following conditions hold: (Tungsten-sort)
- The shuffle dependency specifies no aggregation or output ordering,aggregation意味着需要将数据反序列化存储然后才能将新来的value合并。这种情况下就无法利用这种shuffle直接操作序列化数据的优势
- The shuffle serializer supports relocation of serialized values (this is currently
supported by KryoSerializer and Spark SQL's custom serializers).shuffle序列化器支持序列化value的重定位 - The shuffle produces fewer than 16777216 output partitions.
-
序列化时,单条记录不能大于 128 MB (PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,该值就被定义成了128M。)
image.png
BaseShuffleHandle
1、特点是map端排序,但是在reduce端不合并排序结果—-除非需要对数据排序才会对数据进行重排序,
当发生溢写的时候,会在存储在当前的ApppendOnlyMap中的数据上调用排序器对其进行TimSort(归并排序的改进版本)形成一个溢写文件,然后数据被写到磁盘上,每个溢写的文件都独立被写入磁盘,只有在数据被reducer请求并且要实时合并时才会进行合并操作
2、spark reduce端,也是采用TimSort(归并排序的改进版本)
image.png
参考
shuffle writer spark reader ExternalShuffleService
spark executor动态分配:spark.dynamicAllocation.enabled
https://zhmin.github.io/2019/08/05/spark-external-shuffle-service/
http://spark.coolplayer.net/?p=2700
https://spark.apache.org/docs/latest/configuration.html
https://zhmin.github.io/2019/01/26/spark-shuffle-writer/
https://zhmin.github.io/2019/07/31/spark-shuffle-reader/
网友评论