1. 概述
shuffleManager
是一个trait类型, 定义了几个基本的接口用于实现.主要目的是实现shuffle相关信息从executor到driver的注册和汇总等等
我们看到一个方法
registerShuffle
, 前面在DAG计算时. 我们记得有两种依赖, 一种是narrowDependency
, 一种是shuffleDependency
, 这里注册的就是Shuffle Dependency, 源码在之前的章节也出现过.
trait ShuffleManager {
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
def unregisterShuffle(shuffleId: Int): Boolean
def shuffleBlockResolver: ShuffleBlockResolver
def stop(): Unit
}
它在spark1.6里面有两个基本实现, 一个是HashShuffleManager
, 另外一个是SortShuffleManager
2. ShuffleManager的实现和具体使用.
很多人都详细写了HashShuffle, SortShuffle的实现, 以及它们是如何压缩中间文件的数量以便更快的实现数据从一个Node 到另外一个Node的转移, 从而做到了让Spark的MR模型比Hadoop2.0的MR模型要快.
下面的内容和介绍大部分来自Cloudera团队开发SortShuffle时的说明, 熟悉的忽略即可
2.1 什么是shuffle
Shuffle在分布式计算中, 不得不让数据在node之间转移, 等同于把数据从一批机器洗到另外一批机器. reduceByKey
的时候, 必须让同一个pairRDD中同样key的数据进入同一个机器, 这就需要shuffle. Spark中RDD如果到前一层RDD的依赖是shuffleDependency
, 就说明需要执行shuffle操作, 划分stage了.
Shuffle过程也是Stage和Stage之间的分割线
2.2 Basic Shuffle
传统的shuffle过程, 每个Map进程, 为每个Reduce进程维护一个文件, 把结果写进去. 这样假设说我们有M个Map进程, R个Reduce进程, 又恰好数据非常分散, 每个Map进程都包含了后续每个Reduce进程可能用到的数据. 那么整个集群需要需要维护M*R个文件. 对于存储系统来说, 它带来了小文件的问题. 这意味着更多的磁盘IO操作, 非连续写, 内存分页碎片问题, 这些都是小文件高负载时存储系统面对的问题. 传统文件系统的解决方案也很简单, 就是类似EXT文件系统的日志写, 让小文件表现成一个连续的日志即可.
BasicShuffle
https://issues.apache.org/jira/browse/SPARK-2503
2.3 Hash Shuffle
同一台机器上的Map在写shuffle文件的时候, 把数据append到reduce需要的key的bucket中. 这样M就被去掉了, 每个机器维护R个文件即可. 在这个地方我们解决了小文件的问题, 但是不同进程append同一个文件涉及到一些同步过程和很多优化过程.
Hash Shufflehttps://issues.apache.org/jira/browse/SPARK-751
2.4 Sort Shuffle
更进一步的, 非常类似日志文件系统的策略
- 每个Map过程会产生一个shuffle结果文件, 和一个index文件, index文件指出了结果文件中, start-end放的是哪个Reduce需要的key. 这样每台计算节点为了M个文件对好存信息, 最后可以把这M个排序好的文件Merge到一起成为一个大的排序好的带索引的文件.
- 在真正执行中就变成了, 如果R比较小, spark.shuffle.sort.bypassMergeThreshold = 200, 就走Hash Sort路线. 如果R比较大了, 就走Sort Shuffle路线, 每个MapTask产生一个结果数据, 最后再Merge到一起.
这个merge过程并不是在写的时候发生的, 而是在有机器过来拉数据的时候触发的!
Sort Shuffle
2.5 Tungsten Sort Shuffle
NODE: 根据更新日志, 这个方案到2.x.x还是在序列化上存在一定的BUG
实现的思路
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
钨丝计划并不是单单对Shuffle进行优化, 实际上这个持续到今天的计划是对整个资源调度进行优化, 让更多的序列化的字节码在机器间流动, 让CPU和内存都能够被框架直接分配. 对我来说, 这意味着Spark越来越接近MPP的方案.也许有一天, 会把Spark变成了一个java的MPP + Cgroup, 大量的CPU分片和内存分片都是直接在内核里做
网友评论