有三种方法:hash shuffle(后期优化有consolidated shuffle)、sort shuffle和tungsten-sort shuffle。
第一种:hash shuffle适合的场景是小数据的场景,对小规模数据的处理效率会比排序后的shuffle高。
a.shuffle输出的map任务会为每个reduce创建对应的bucket,map产生的结果会根据设置的partitioner得到对应的bucketID,然后填充到相应的bucket中去,每个map的输出结果可能包含所有的reduce所需要的数据,所以每个map会创建R个bucket,M个map总共会创建M*R个bucket。map创建的bucket其实对应磁盘上的一个文件,map的结果写到每个bucket中其实就是写到那个磁盘文件中,这个文件也被称为blockFile,是disk block manager管理器通过文件名的hash值对应到本地目录的子目录中创建的。每个map要在节点上创建R个磁盘文件用于结果输出,map的结果是直接输出到磁盘文件上的。
b.有一种对hash shuffle的优化是consolidation shuffle,将shuffle数据英社称文件中的一段,这样可以减少文件的数量。在consolidation shuffle中每个bucket并非对应一个文件,而是对应文件中的一个segment部分。此时有两种方案:第一种是将内存中的数据按每个partition创建一个临时文件,最后溢出到分区文件中合并;第二种是在内存中排序合并。job的map在某个节点上第一次执行,为每个reduce创建bucket对应的输出文件,把这些文件组织成shufflefilegroup,当这次map执行完之后,这个shufflefilegroup可以释放为下次循环使用。当又有map在这个节点上执行时,不需要创建新的bucket文件,而是在上次的shuffleFileGroup中取的已经创建的文件继续追加写一个segment;当前次map还没执行完,shuffleFileGroup还没有释放,这时如果有新的map在这个节点上执行,无法循环利用这个shuffleFileGroup,而是只能创建新的bucket文件组成新的shuffleFileGroup来写输出。
Reduce去拖map的输出数据,spark提供了两套不同的拉取数据框架:通过socket连接去取数据;使用netty框架去取数据。reduce拖过来的数据会放在一个hashMap中,hashMap中存储的也是<key,value>对,key是map输出的key,map输出对应这个key的所有value组成hashmap的value。spark将shuffle去过来的每一个<key,value>对插入或者更新到hashmap中,来一个一个处理。hashmap全部放在内存中。
shuffle取过来的数据全部存放在内存中,对于数据量比较小或者已经在map端做过合并处理的shuffle数据,占用内存空间不会太大,但是对于比如group by key这样的操作,reduce需要得到key对应的所有value,并将这些value组一个数组放在内存中,这样当数据量较大时,就需要较多内存。
spark意识到在处理数据规模远远大于内存空间时锁带来的不足,引入了一个具有外部排序的方案。shuffle过来的数据先放在内存中,当内存中存储的<key,value>对超过1000并且内存使用超过70%时,判断节点上可用内存,如果还足够,则把内存缓冲区大小翻倍,如果可用内存不再够了,则把内存中<key,value>对排序然后写到磁盘文件中。最后把内存缓冲区中的数据排序后和那些磁盘文件组成一个最小堆,每次从最小堆中读取最小的数据,这个合MapReduce中的merge过程类似。
第二种 sorted based shuffle
https://blog.csdn.net/snail_gesture/article/details/50807129
1)首先每个shuffleMapTask不会为每个Reducer单独生成一个文件,相反,Sort-based shuffle会把Map的每一个shuffleMapTask所有的输出数据Data只写到一个文件中。因为每个shuffleMapTask中的数据会被分类,所以Sort-based shuffle使用了index文件存储具体shuffleMapTask输出数据在同一个Data文件中是如何分类的信息。
2)基于Sort-based shuffle会在mapper中的每一个shuffleMapTask中产生两个文件:Data文件和index文件,其中data文件是存储当前task的shuffle输出的。而index文件中则存储了data文件中的数据通过partitioner的分类信息,此时下一个阶段的stage中的task就是根据这个index文件获取自己所要抓取的上一个stage中的shuffleMapTask产生的数据的,Reducer就是根据index文件来获取属于自己的数据。
externalSorter是spark的sort形式的shuffle实现的关键。Sort-based shuffle使用它把RDD分区中的数据写入文件。这个类用于对一些(K,V)类型的key-value对进行排序,如果需要就进行merge,产生一些(K,C)类型的key-combiner对。combiner就是对同样key的value进行合并的结果。它首先使用一个partitioner来把key分到不同的partition,然后,就把每个partition内部的key按照一个特定的comparator来进行排序,输出到文件,其中不同的partition位于这个文件的不同区域(在字节层面上每个分区是连续的,会有index文件存储这个位置信息),这样就适用于shuffle时对数据的抓取。把输出的partition内部的kv排序,这个功能是hash shuffle不具有的。
默认的Sort-based shuffle的几个缺陷:
1.如果mapper中task的数量过大,依旧会产生很多小文件,shuffle传递数据到Reducer端,reduce会需要同时打开大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃!
2.如果需要在分片内也进行排序,此时需要进行mapper端和reducer端的两次排序。
涉及问题:Sorted-based Shuffle会产生2M(M代表了mapper阶段中并行的partition的总数量,其实就是shuffleMapTask的总数量)个shuffle临时文件。
shuffle产生的临时文件的数量变化依次为:
basic hash shuffle: MR
Consalidate方式的hash shuffle: CR
Sort-based Shuffle:2M
第三种 sort based Tungsten
https://blog.csdn.net/andyshar/article/details/52144141
https://blog.csdn.net/u011007180/article/details/52389268
Tungsten sort是对普通sort的一种优化,排序的不是内容本身,而是内容序列化后字节数组的指针,把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以没有序列化和反序列化的过程。内存消耗大大降低,极大的减少了GC的开销。
要做到这个,JVM的内存管理结构无法完成,引入了一个新的概念page。
page是由block组成的,block里面除了记录page编号外,还有MemoryLocation。数据一旦进来,就使用shuffle write进行序列化,在序列化的二进制基础上进行排序,可以减少内存的GC。
当且仅当下面条件都满足时,才会使用新的shuffle方式:
shuffle dependency不能带有aggregation或者输出需要排序。
shuffle文件的数量不能大于16777216
序列化时,单条记录不能大于128M
可以看到,能使用的条件还是挺苛刻的
在写入的过程中单条记录不能超过128M,因为每个page的最大大小即为128M。
shuffle文件的数量:是partition的限制,数字16777216是来源于partition number使用24bit表示的。
网友评论