本章节主要是剖析Shuffle原理,spark中的Shuffle是非常重要的,shuffle不管在Hadoop中还是Spark中都是重重之重,特别是在Spark shuffle优化的时间。更是非常的重要。
那么在什么情况下,会发生Shuffle操作呢,一般在reduceByKey,groupByKey,sortBykey,countBykey,join,cogroup等函数的下都会发生Shuffle.
这里我举个例子来说明什么是Shuffle,它的原理是什么。
假设我们的一个节点A上,运行了4个ShuffleMapTask,但节点上只有两个cpu cores.
那么是不是这4个ShuffleMapTask是并发操作,其中两个是并行操作。这是基础知识就不用说了。
假设在另一个节点B上,运行了4个ResultTask(关于什么是shuffleMapTask,resultTask,
我们前面都已经详细说过,这里就不说了)
现在在节点B上,等着去获取节点A上ShuffleMapTask的输出数据,来完成一个reduceBykey操作。
我们知道,每个ShuffleMapTask都会为每一个ResultTask创建一份bucket缓冲,以及对应的ShuffleBlockFile文件到磁盘上。
那么这样,我们想一想,我们这个例子,是不是每一个ShuffleMapTask都会生成4个bucket,4个blockFile文件。
每个bucket,是一个缓冲区,当数据慢慢达到一定值时,会自动刷新到ShuffleBockFile文件里到磁盘。
ShuffleMapTask的输出会做为MapStatus对象,发送到DAGScheduler的MapoutATrackerMaster中。
那么ResultTask就会用BlockstoreShuffleFetcher从MapoutATrackerMaster中读取信息,ShuffleMapTask的输出会做为MapStatus包含了每个ResultTask要拉取的数据的大小 ,位置等,就是元数据。
那么每个resultTask是不是根据这些信息底层通过BlockManager去每个ShuffleMapTask上的ShuffleBlockFile上拉取属于自己的那一份数据。把数据拉取过来。
1:在map端的RDD,我们可以看作是shuffle的第一个RDD,就是mapPartitionRDD
2: 每个ResultTaskr拉取过来的数据,其实就会组成一个内部RDD,
这个RDD就是叫ShuffleRDD,这个RDD优先放入内存,内存不足时,就 写入磁盘。
3: 然后每个ShuffleRDD进行本地聚会,
最后成生一个MapPartitionRDD,这个RDD就是我们执行reduceByKey后希望得到的rdd.
看到上面的三点我们是不是很熟悉了,前面我们在讲过stage划分时,也讲过Shuffle分了两步,
一步是MapPartitionRDD-->ShuffleRDD
另一步就是ShuffleRDD-->MapPartitionRDD
上面就我们普通的Shuffle的内部原理。我们假设,现在有100个map task,100个result task,那么本地磁盘是不是会有100*100个文件,这样造成了磁盘IO之多,极大的影响了性能。
下一节我们就这些问题,来剖析新版本中的优化。
网友评论