美文网首页
简书spark的两种核心shuffle(HashShuffle与

简书spark的两种核心shuffle(HashShuffle与

作者: scott_alpha | 来源:发表于2019-09-30 07:17 被阅读0次

    在spark 1.2以前默认用的ShuffleManager是HashShuffleManager,但是它有一个很大的弊端,就是会产生很多的中间文件,进而有大量的磁盘IO影响性能。
    因此在spark 1.2开始,默认改为用SortShuffleManager,它的优点是会把临时文件合并成一个磁盘文件,因此每个task就只有一个磁盘文件。
    1.HashShuffleManager包括普通运行的HashShuffleManager和合并运行的HashShuffleManager
    普通运行的HashShuffleManager:
    每一个task创建与下一个stage的task数量相等的文件。比如下一个stage有100个task,那么当前的每个task都要创建100个磁盘文件,假设一个executor只有一个CPU core,有10个executor,每个executor执行5个task。如果当前stage有50个task,那么总共就创建5000个磁盘文件,因此在shuffle read的时候就会出现大量的磁盘IO操作,影响性能。shuffle read那边的buffer默认大小32k。
    合并运行的HashShuffleManager:
    这里的优化是,当前stage里的task之间会复用创建的磁盘文件。故最后会生成10*100=1000个磁盘文件,文件数量大大降低。
    开启方式为spark.shuffle.consolidateFiles=true
    2.SortShuffleManager包括普通的SortShuffleManager和bypass的SortShuffleManager
    普通的SortShuffleManager:
    下一个stage的task数量大于bypassMergeThreshold参数的值。
    数据会先写入一个内存数据结构中(默认5M),如reduceByKey就会选用map数据结构,一边通过map聚合,一边写入内存;如果是join这种普通的shuffle算子,就用选用array数据结构,直接写入内存,内存数据达到阈值进行溢写。
    在溢写到磁盘之前,会先根据key对内存数据结构中已有的数据进行排序。
    排序完成后,会分批溢写到磁盘,默认batch数量是10000条。
    溢写完成后,会对所有文件进行一个merge操作,写入最终的磁盘文件中,由于一个task只对应一个磁盘文件,因此还会单独写一份索引文件,标识了下游stage各个task的数据在文件中的start Offset和end Offset。
    比如第一个stage有50个task,总共有10个executor,每个executor执行5个task,而第二个stage有100个task,由于每个task最终只有一个磁盘文件,则最终有50个磁盘文件。
    bypass的SortShuffleManager:
    触发条件是当shuffle read task的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200),或者不是聚合类的shuffle算子(比如reduceByKey),就会启用bypass模式。
    从内存缓冲溢写磁盘时,是根据key的hash值写入磁盘文件,全部溢写完成后合并成一个磁盘文件和一个索引文件。
    而该机制与普通SortShuffleManager运行机制的不同在于:
    1.磁盘写机制不同
    2.不会进行排序,也就节省掉这部分的性能开销
    [https://www.cnblogs.com/itboys/p/9226479.html]

    总结:
    Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。
    shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle,
    HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到CoreR个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。
    SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能
    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager,因为HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    相关文章

      网友评论

          本文标题:简书spark的两种核心shuffle(HashShuffle与

          本文链接:https://www.haomeiwen.com/subject/exzqpctx.html