Spark是一种MapReduce框架的实现,与Hadoop一样,具有Shuffle阶段
Shuffle是连接map和reduce之间的桥梁,它将map(ShuffleMapTask)的输出对应到reduce(ShuffleMapTask或ResultTask)输入中,这期间涉及到序列化反序列化、网络数据传输以及磁盘读写,可能还会有压缩解压,加密解密等,是非常耗时的操作
Shuffle有三个模块组成
- ShuffleManager:shuffle管理器,所有的shuffle过程都需要先向shuffleManger注册,决定采取何种具体的shuffle方式
- Shuffle Write:map端进行shuffle的写操作,每个shuffleMapTask将计算结果通过shuffleWriter写出到磁盘中
- Shuffle Read:reduce端进行shuffle的读操作,即下游的Task通过shuffleReader获取上游写出的数据
Spark Shuffle演进
- Spark 1.0及以前只有Hash Shuffle
- Spark 1.1+,引入Sort Shuffle
- Spark 1.2+,默认使用Sort Shuffle
- Spark 1.4+,引入Tungsten Sort或者叫做Unsafe Shuffle
- Spark 2.0+,统一使用Sort Shuffle
Tungsten Project 钨丝计划,是对Spark进行优化的一个项目,旨在提升CPU和内存的效率,
tungsten-sort
中涉及的内存分配,底层可以通过sun.misc.Unsafe
类直接获取内存实现
Hash Shuffle
在map阶段(ShuffleMapTask),每个ShuffleMapTask都会为下游stage的每个partition写一个临时文件,它的问题在于创建的文件数目过多,现在已经不再使用
如果有M个ShuffleMapTask,下游stage有R个ShuffleMapTask或ResultTask,理论上会生成M * R
个数据文件
涉及多个小文件的随机读取,硬盘的性能容易称为瓶颈,为了解决文件过多的问题,后来加入了Consolidate Files机制(文件合并机制),运行在同一个core的ShuffleMapTask,第一个会创建R个文件,后续task只是追加写入到已经创建的文件中,当每个task分配1个core时,文件总数为total-executor-cores * R
Sort Shuffle
ShuffleMapTask不再为每个Reducer生成一个单独的文件,而是将所有的结果写到一个文件里,同时产生一个索引index文件,减少了文件的数量,文件中的记录首先是按照分区的partition id顺序排列,index文件记录的各个分区对应文件内的偏移,文件总数是2 * M
,Reduce阶段可以通过索引,获取相关数据,这样降低了随机磁盘IO与缓存的开销
Sort Based Shuffle的缺点是必须要进行排序,至少是要按照分区排序,如果指定了keyOrdering
才需要在分区内部根据数据的key再次排序,相较于hash shuffle这是额外的性能消耗
它对应于SortShuffleManager
,也是目前唯一的shuffle管理器
SortShuffleManager
SortShuffleManager
在SparkContext
初始化过程中的SparkEnv
类初始化,创建ShuffleManager
时初始化,当然也可以通过spark.shuffle.manager
指定一种自定义的ShuffleManager
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
在RDD创建过程中,如果对应为宽依赖,即ShuffleDependency
,该依赖初始化过程中会通过SparkContext获取一个ShuffleId,然后通过ShuffleManager
注册Shuffle,根据不同的情况返回不同的ShuffleHandle
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// reduce端分区数量少于spark.shuffle.sort.bypassMergeThreshold,没有map端的聚合操作
// 不再进行归并排序,直接写numPartitions个文件,最后连接到一起,避免了序列化和反序列化,但是缓存需要较高
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// 以序列化的形式缓存输出
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// 以反序列化的形式缓存
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
ShuffleMapTask
中的runTask(context: TaskContext)
函数会通过shuffleManager
获取ShuffleWriter
,对RDD元素进行写出
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
}
SortShuffleManager
根据ShuffleHandle的不同类型,初始化对应的Shuffle Writer
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
//记录shuffle id => 该shuffle产生输出的mapper数目
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
在ShuffledRDD
的compute
方法中获取shuffle结果,此外还有CoGroupedRDD
等RDD也会请求获取Reader
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
SortShuffleManager
中getReader
实现
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
Shuffle Writer
共有三种类型的ShuffleWriter
- BypassMergeSortShuffleWriter:带hash风格的基于sort的shuffle机制,在不进行map端聚合,且reduce端的分区数目不大于阈值(默认200)时优先使用,每个
ShuffleMapTask
内部同时创建打开Partitioner
指定的分区数目个文件,每条record根据分区器,分别写入到不同的文件内,然后创建一个新的文件,默认使用NIO将每个分区文件的数据顺序合并到该文件后删除,最终再同时产生一个记录分区数据偏移的index文件 - UnsafeShuffleWriter:没有map端聚合,reduce端的分区数目小于等于16777216,序列化器支持重定位的情况下使用,依靠
ShuffleExternalSorter
对串行化的数据依据partitionId排序,文件内数据按照分区ID顺序排列,如果之前内存不足,会写出多个数据文件,这些文件最终会合并为一个文件,同时产生一个index文件,详细说明 - SortShuffleWriter:不满足以上两中情况就使用的最基本的写出方式,将数据写入到
ExternalSorter
,可以进行聚合操作,内存不足时可以先写出到磁盘,最终进行按照分区顺序,不同文件同个分区的数据以及内存中的数据再合并/排序,写出到一个文件中,并产生一个index文件详细说明
Shuffle Reader
只有一种ShuffleReader:BlockStoreShuffleReader
从MapOutputTracker
中获取上游shuffle数据,通过网络获取远程数据块,本地数据直接读取,如果有需要在对数据进行聚合和排序,最终返回数据的一个迭代器详细说明
Hadoop shuffle
本质上与Spark shuffle的原理是一致的,但是在具体实现上有差别
MapReduce Shuffle | Spark Shuffle | |
---|---|---|
map端写出 | 在内存中构造一个缓冲区(默认100MB),超过则溢写,最终合并写出到本地 | BypassMergeSortShuffleWriter直接写出,另外两种类型会存储在内存中,直到内存不足(这里内存相对较大)再溢写,最终同样合并写出到本地 |
map端数据的顺序 | 分区内部的数据同样是有序的 | 除非需要进行map端合并,分区内部不进行排序 |
copy | reduce在每个map完成后立即开始复制它们的输出,即reduce端的任务不是等map端结束才启动的 | map,reduce分别归属不同的Stage,Stage有明确的先后顺序,必须等到map task全部完成,才会启动reduce人进行拉取 |
内存使用 | 一般内存都不大,如果内存不足会进行写出,最后进行合并 | 通常内存较大,有专门的内存管理器,还可以通过配置允许使用堆外内存,数据尽可能的存在内存中 |
Spark中shuffle数据直接写出,每个文件对应的缓冲器大小默认为32KB,通过spark.shuffle.file.buffer
设定
spill merge过程是否是两次IO
拉取的文件是直接位于内存还是写入到磁盘
Hadoop MapReduce Shuffle map过程的数据在写磁盘时,task首先将数据写出到环形缓冲区(默认大小100MB,mapreduce.task.io.sort.mb
),达到阈值就启动后台线程将数据溢出写到磁盘,此时map的输出会继续写入缓存,如果缓存被填满,map会被阻塞直到溢写过程完成
写出到磁盘之前,后台线程首先要划分分区,在每个分区内部,后台线程按照键值进行内存中排序,如果存在combiner
函数,就在排序后的数据上进行结合
每次内存缓冲区达到溢出阈值,会新建一个溢出文件,最后所有的溢出文件被合并成一个已分区且已排序的输出文件,如果至少存在3个溢出文件,那么combiner
就会在输出文件写到磁盘之前再次运行
reduce端因为map task可能在不同时间完成,因此reduce task在每个map完成后立即开始复制它们的输出,这称为reduce任务的复制阶段copy phase
,reduce task具有少量的复制线程(默认是5,mapreduce.reduce.shuffle.parallelcopies
),可以并行获取map输出,获取的数据优先放置在内存中,否则写出到磁盘,随着拷贝的增多,后台线程将他们合并为更大的有序文件,方便之后的merger
当所有的map输出都复制过来以后,reduce task进入sort phase
或者叫做merger phase
,将map的输出合并,并保证顺序,这里可能涉及到多轮次合并(合并因子默认为10,mapreduce.task.io.sort.factor
),在此过程中会省略最后一轮次合并结果写入磁盘的过程,直接将数据输入reduce函数,即reduce phase
网友评论