二次排序指的是第一次段(key键)排序完后,对第一字段相同的条件下再进行对value的排序,且不影响原来key的顺序。
对于归约器排序至少有两种方案。两种都在hadoop/rm和spark中都可以解决。
- 第一种方案是让归约器读取和缓存的定键的所有值。比如把这些值存入一个数据结构中。然后在这个数据结构中进行排序。这是一种不具有可伸缩性的方法,因为这个归约器要接受一个给定键的所有值。这种方法可能导致归约器耗尽内存。如果数量很少可以用。
- 第二种方案是用框架中的排序进行。
这里介绍spark框架的排序。
spark的SortBy和SortByKey具有内部调用的是被排序函数的compare方法。如果我们自定义一个数据结构并继承Ordered(提供compare方法)和Serializable(这个很重要,所有被引用到的类都必须是可序列化的,不然会报错的)。
二次排序不过是定义了一个比较方法,在第一个值排序的条件下,进行二个值的排序。以此类推,那么三次排序,N次排序都很容易进行。
class SecondarySort(val two :Int,val one : Int) extends Ordered[SecondarySort] with Serializable {
override def compare(that: SecondarySort): Int = {
if(that.two-this.two != 0){
this.two-that.two
}else{
that.one-this.one
}
}
}
这里最关键是是顺序问题,因为scala的返回值是最后一个输出值。如果没有else,那么输出值永远是最后一个。
有了这个compare函数,我们不仅可以比较int值,可以自定义所有类型的比较方法。
import org.apache.spark.{SparkConf, SparkContext}
object TwoSort {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("sort2")
val sc=new SparkContext(conf)
val source=sc.textFile("/user/join1.txt",1).distinct()
val rdd= source.map(x=>{
val xy=x.split(" ")
(new SecondarySort((xy(0)).toInt,xy(1).size.toInt),x)
})
val rdd2=rdd.sortByKey()
rdd2.foreach(x=>println("-----------------"+x._2))
}
}
二次排序多个分区运行。最终多个文件合并后才是完整的排序。
如果在console中打印,不保证有序。
如果我们想要将它最终以一个文件打印出来,可以用repartitionAndSortWithinPartitions。该函数是repartition和sortby的结合,并做了一些优化,因为它可以将排序过程推送到 shuffle 操作的机器上进行。
网友评论