美文网首页
spark中实现排序

spark中实现排序

作者: yeathMe | 来源:发表于2018-04-01 23:30 被阅读0次

    第一种方式:

    package cn.edu360.day5
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by zx on 2017/10/10.
      */
    object CustomSort1 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("CustomSort1").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
    
        //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
        val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    
        //将Driver端的数据并行化变成RDD
        val lines: RDD[String] = sc.parallelize(users)
    
        //切分整理数据
        val userRDD: RDD[User] = lines.map(line => {
          val fields = line.split(" ")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toInt
          //(name, age, fv)
          new User(name, age, fv)
        })
    
        //不满足要求
        //tpRDD.sortBy(tp => tp._3, false)
    
        //将RDD里面装的User类型的数据进行排序
        val sorted: RDD[User] = userRDD.sortBy(u => u)
    
        val r = sorted.collect()
    
        println(r.toBuffer)
    
        sc.stop()
    
      }
    
    }
    
    
    class User(val name: String, val age: Int, val fv: Int) extends Ordered[User] with Serializable {
    
      override def compare(that: User): Int = {
        if(this.fv == that.fv) {
          this.age - that.age
        } else {
          -(this.fv - that.fv)
        }
      }
    
      override def toString: String = s"name: $name, age: $age, fv: $fv"
    }
    
    
    

    第二种方式

    package cn.edu360.day5
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by zx on 2017/10/10.
      */
    object CustomSort2 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("CustomSort2").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
    
        //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
        val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    
        //将Driver端的数据并行化变成RDD
        val lines: RDD[String] = sc.parallelize(users)
    
        //切分整理数据
        val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
          val fields = line.split(" ")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toInt
          (name, age, fv)
        })
    
        //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
        val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => new Boy(tp._2, tp._3))
    
        println(sorted.collect().toBuffer)
    
        sc.stop()
    
      }
    
    }
    
    
    class Boy(val age: Int, val fv: Int) extends Ordered[Boy] with Serializable {
    
      override def compare(that: Boy): Int = {
        if(this.fv == that.fv) {
          this.age - that.age
        } else {
          -(this.fv - that.fv)
        }
      }
    }
    
    
    

    第三种方式

    package cn.edu360.day5
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by zx on 2017/10/10.
      */
    object CustomSort3 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("CustomSort3").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
    
        //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
        val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    
        //将Driver端的数据并行化变成RDD
        val lines: RDD[String] = sc.parallelize(users)
    
        //切分整理数据
        val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
          val fields = line.split(" ")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toInt
          (name, age, fv)
        })
    
        //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
        val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => Man(tp._2, tp._3))
    
        println(sorted.collect().toBuffer)
    
        sc.stop()
    
      }
    
    }
    
    
    case class Man(age: Int, fv: Int) extends Ordered[Man] {
    
      override def compare(that: Man): Int = {
        if(this.fv == that.fv) {
          this.age - that.age
        } else {
          -(this.fv - that.fv)
        }
      }
    }
    
    
    

    第四种方式:

    package cn.edu360.day5
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by zx on 2017/10/10.
      */
    object CustomSort4 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("CustomSort4").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
    
        //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
        val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    
        //将Driver端的数据并行化变成RDD
        val lines: RDD[String] = sc.parallelize(users)
    
        //切分整理数据
        val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
          val fields = line.split(" ")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toInt
          (name, age, fv)
        })
    
        //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
        import SortRules.OrderingXiaoRou
        val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => XianRou(tp._2, tp._3))
    
        println(sorted.collect().toBuffer)
    
        sc.stop()
    
      }
    
    }
    
    
    case class XianRou(age: Int, fv: Int)
    
    
    

    第五种规则

    
    /**
      * Created by zx on 2017/10/10.
      */
    object CustomSort5 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("CustomSort5").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
    
        //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
        val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    
        //将Driver端的数据并行化变成RDD
        val lines: RDD[String] = sc.parallelize(users)
    
        //切分整理数据
        val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
          val fields = line.split(" ")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toInt
          (name, age, fv)
        })
    
        //充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个
        val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))
    
        println(sorted.collect().toBuffer)
    
        sc.stop()
    
      }
    
    }
    
    在这种规则种我们需要注意的是 元组是可以被排序 的,
    

    第六种

    
    /**
      * Created by zx on 2017/10/10.
      */
    object CustomSort6 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("CustomSort6").setMaster("local[*]")
    
        val sc = new SparkContext(conf)
    
        //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
        val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    
        //将Driver端的数据并行化变成RDD
        val lines: RDD[String] = sc.parallelize(users)
    
        //切分整理数据
        val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
          val fields = line.split(" ")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toInt
          (name, age, fv)
        })
    
        //充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个
        //Ordering[(Int, Int)]最终比较的规则格式
        //on[(String, Int, Int)]未比较之前的数据格式
        //(t =>(-t._3, t._2))怎样将规则转换成想要比较的格式
        implicit val rules = Ordering[(Int, Int)].on[(String, Int, Int)](t =>(-t._3, t._2))
        val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => tp)
    
        println(sorted.collect().toBuffer)
    
        sc.stop()
    
      }
    
    }
    
    
    

    相关文章

      网友评论

          本文标题:spark中实现排序

          本文链接:https://www.haomeiwen.com/subject/lwyzcftx.html