美文网首页
大数据之二次排序

大数据之二次排序

作者: 机器不能学习 | 来源:发表于2018-11-08 19:25 被阅读0次

    二次排序指的是第一次段(key键)排序完后,对第一字段相同的条件下再进行对value的排序,且不影响原来key的顺序。

    对于归约器排序至少有两种方案。两种都在hadoop/rm和spark中都可以解决。

    • 第一种方案是让归约器读取和缓存的定键的所有值。比如把这些值存入一个数据结构中。然后在这个数据结构中进行排序。这是一种不具有可伸缩性的方法,因为这个归约器要接受一个给定键的所有值。这种方法可能导致归约器耗尽内存。如果数量很少可以用。
    • 第二种方案是用框架中的排序进行。
    这里介绍spark框架的排序。

    spark的SortBy和SortByKey具有内部调用的是被排序函数的compare方法。如果我们自定义一个数据结构并继承Ordered(提供compare方法)和Serializable(这个很重要,所有被引用到的类都必须是可序列化的,不然会报错的)。
    二次排序不过是定义了一个比较方法,在第一个值排序的条件下,进行二个值的排序。以此类推,那么三次排序,N次排序都很容易进行。

    class SecondarySort(val two :Int,val one : Int) extends Ordered[SecondarySort] with Serializable {
      override def compare(that: SecondarySort): Int = {
       
        if(that.two-this.two != 0){
    
          this.two-that.two
        }else{
          that.one-this.one
        }
      }
    }
    

    这里最关键是是顺序问题,因为scala的返回值是最后一个输出值。如果没有else,那么输出值永远是最后一个。
    有了这个compare函数,我们不仅可以比较int值,可以自定义所有类型的比较方法。

    import org.apache.spark.{SparkConf, SparkContext}
    
    object TwoSort {
      def main(args: Array[String]): Unit = {
    
        val conf=new SparkConf().setAppName("sort2")
        val sc=new SparkContext(conf)
    
        val source=sc.textFile("/user/join1.txt",1).distinct()
    
        val rdd= source.map(x=>{
          val xy=x.split(" ")
    
          (new SecondarySort((xy(0)).toInt,xy(1).size.toInt),x)
        })
        val  rdd2=rdd.sortByKey()
        rdd2.foreach(x=>println("-----------------"+x._2))
    
      }
    }
    

    二次排序多个分区运行。最终多个文件合并后才是完整的排序。
    如果在console中打印,不保证有序。
    如果我们想要将它最终以一个文件打印出来,可以用repartitionAndSortWithinPartitions。该函数是repartition和sortby的结合,并做了一些优化,因为它可以将排序过程推送到 shuffle 操作的机器上进行。

    相关文章

      网友评论

          本文标题:大数据之二次排序

          本文链接:https://www.haomeiwen.com/subject/rexgxqtx.html