美文网首页
spark 4种 shuffle机制与mapreduce shu

spark 4种 shuffle机制与mapreduce shu

作者: loukey_j | 来源:发表于2019-01-21 14:19 被阅读0次

    纵观整个mapreduce过程会发现存在许多的排序和文件合并操作。

    为什么要排序,主要原因有:

    1、key的存在combiner操作,排序之后相同的key放到一块显然方便做合并操作。

    2、reduce task是按key去处理数据的。 如果没有排序那必须从所有数据中把当前相同key的所有value数据拿出来,然后进行reduce逻辑处理。显然每个key到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个key对于的value集合。

    3、reduce task按key去处理数据时,如果key按顺序排序,那么reduce task就按key顺序去读取,显然当读到的key是文件末尾的key那么久标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些key处理完了,哪些key没有处理完。

    为什么要文件合并,主要原因有:

    1、因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据。

    2、任务去读取文件的数增多,打开的文件句柄数也会增多

    3、mapreduce是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。

    虽有千万种理由需要这么做,但是很耗资源,并且像排序其实我们有些业务并不需要排序。在hadoop 2.x 排序就变为可选了。

    spark的shuffle是在mapreduce shuffle基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在spark中shuffle write相当于mapreduce 的map,shuffle reade相当于mapreduce 的reduce.

    spark shuffle分4种

    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager

    1、未经优化的HashShuffleManager,其原理见下图

    从图中可以看到,相比mapreduce,排序不见了,文件合并不见了。上游task写文件的时候只是将数据按分区追加到文件中,并没有像mapreduce 那样先内存溢写成文件,然后再文件与文件之间进行合并,虽然节省了排序、合并的开销。但有一个弊端就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。如上图 下游有3个shuffle reade task ,那每个上游shuffle write就会形成3个文件。 形成的文件数是 shuffle reade个数 × shuffle write个数。

    2、优化的HashShuffleManager,其原理见下图

    相比第一种机制。就是在一个excutor中的task是可以共用一个buffer内存。在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,而是允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。此时的文件个数是 CPU core的数量  × 下一个stage的task数量。

    为了开启优化后的HashShuffleManager,我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

    在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager

    3、SortShuffleManager,其原理见下图

    这种机制和mapreduce差不多,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

    SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量,由于每个task最终只有一个磁盘文件所以文件个数等于上游shuffle write个数。

    4、bypass运行机制

    相比第3中少了排序,task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

    该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    bypass运行机制的触发条件如下:

    1、shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

    2、不是聚合类的shuffle算子(比如reduceByKey)。因为不像第3种机制那样会对聚合类算子以map的数据结构存储,在写的过程中会先进行局部聚合。

    spark shuffle 优于mapreduce shuffle的原因1、减少了磁盘io

    2、可选的shuffle和排序

    相关文章

      网友评论

          本文标题:spark 4种 shuffle机制与mapreduce shu

          本文链接:https://www.haomeiwen.com/subject/akjcjqtx.html