美文网首页
用Spark实现多种方式的排序

用Spark实现多种方式的排序

作者: 不愿透露姓名的李某某 | 来源:发表于2019-07-22 08:48 被阅读0次

    方式一:

    package Day05

    import org.apache.spark.rdd.RDD

    import org.apache.spark.{SparkConf, SparkContext}

    object Sortcust01 {

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    //排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序

        val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")

    //将Driver端的数据并行化变成RDD

        val lines = sc.parallelize(users)

    val lmap: RDD[User] = lines.map(t => {

    val sp = t.split(" ")

    val name = sp(0)

    val age = sp(1).toInt

    val yan = sp(2).toInt

    new User(name,age,yan)

    })

    //将RDD里面装的User类型的数据进行排序

        val lsort: RDD[User] =lmap.sortBy (u => u)

    val res = lsort.collect()

    println(res.toBuffer)

    sc.stop()

    }

    }

    class User(val name:String,val age:Int,val nian:Int)extends Ordered[User]with Serializable {

    override def compare(that: User): Int = {

    if(this.nian==that.nian){

    this.age-that.age

    }else{

    -(this.nian-that.nian)

    }

    }

    override def toString:String =s"name: $name,age:$age,nian:$nian"

    }

    方式二

    package Day05

    import org.apache.spark.rdd.RDD

    import org.apache.spark.{SparkConf, SparkContext}

    object Sortcust02 {

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    //排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序

        val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")

    //将Driver端的数据并行化变成RDD

        val lines = sc.parallelize(users)

    val lmap: RDD[(String,Int,Int)] = lines.map(t => {

    val sp = t.split(" ")

    val name = sp(0)

    val age = sp(1).toInt

    val yan = sp(2).toInt

    (name,age,yan)

    })

    //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)

        val lsort =lmap.sortBy (tp =>new  Boy(tp._2,tp._3))

    println(lsort.collect().toBuffer)

    sc.stop()

    }

    }

    class Boy(val age:Int,val nian:Int)extends Ordered[Boy]with Serializable {

    override def compare(that: Boy): Int = {

    if(this.nian==that.nian){

    this.age-that.age

    }else{

    -(this.nian-that.nian)

    }

    }

    }

    方式三:

    package Day05

    import org.apache.spark.rdd.RDD

    import org.apache.spark.{SparkConf, SparkContext}

    object Sortcust03{

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    //排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序

        val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")

    //将Driver端的数据并行化变成RDD

        val lines = sc.parallelize(users)

    val lmap: RDD[(String,Int,Int)] = lines.map(t => {

    val sp = t.split(" ")

    val name = sp(0)

    val age = sp(1).toInt

    val yan = sp(2).toInt

    (name,age,yan)

    })

    //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)

        val lsort =lmap.sortBy (tp =>new  Man(tp._2,tp._3))

    println(lsort.collect().toBuffer)

    sc.stop()

    }

    }

    case  class Man( age:Int, nian:Int)extends Ordered[Man]  {

    override def compare(that: Man): Int = {

    if(this.nian==that.nian){

    this.age-that.age

    }else{

    -(this.nian-that.nian)

    }

    }

    }

    方式四:

    package Day05

    import org.apache.spark.rdd.RDD

    import org.apache.spark.{SparkConf, SparkContext}

    object Sortcust04{

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    //排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序

        val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")

    //将Driver端的数据并行化变成RDD

        val lines = sc.parallelize(users)

    val lmap: RDD[(String,Int,Int)] = lines.map(t => {

    val sp = t.split(" ")

    val name = sp(0)

    val age = sp(1).toInt

    val yan = sp(2).toInt

    (name,age,yan)

    })

    import  SortRules.Orderingxianrou

    //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)

        val lsort =lmap.sortBy (tp =>new  Xianr(tp._2,tp._3))

    println(lsort.collect().toBuffer)

    sc.stop()

    }

    }

    case  class Xianr( age:Int, nian:Int)

    package Day05

    object SortRules {

    implicit  object Orderingxianrouextends Ordering[Xianr] {

    override def compare(x: Xianr, y: Xianr): Int = {

    if(x.nian==y.nian){

    x.age-y.age

    }else{

    y.nian-x.nian

    }

    }

    }

    }

    方式五:

    package Day05

    import org.apache.spark.rdd.RDD

    import org.apache.spark.{SparkConf, SparkContext}

    object Sortcust05{

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    //排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序

        val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")

    //将Driver端的数据并行化变成RDD

        val lines = sc.parallelize(users)

    val lmap: RDD[(String,Int,Int)] = lines.map(t => {

    val sp = t.split(" ")

    val name = sp(0)

    val age = sp(1).toInt

    val yan = sp(2).toInt

    (name,age,yan)

    })

    //充分利用元祖的规则,先比第一个,相等比第二个

        val lsort =lmap.sortBy (tp =>(-tp._3,tp._2))

    println(lsort.collect().toBuffer)

    sc.stop()

    }

    }

    方式六:

    package Day05

    import org.apache.spark.rdd.RDD

    import org.apache.spark.{SparkConf, SparkContext}

    object Sortcust06 {

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    //排序,首先按照颜值的降序,如果颜值相等,再按照年龄的升序

        val users=Array("laoduan 30 99","laozhao 88 99 ","laoyang 12 454")

    //将Driver端的数据并行化变成RDD

        val lines = sc.parallelize(users)

    val lmap: RDD[(String,Int,Int)] = lines.map(t => {

    val sp = t.split(" ")

    val name = sp(0)

    val age = sp(1).toInt

    val yan = sp(2).toInt

    (name,age,yan)

    })

    //    Ordering[(Int,Int)]最终比较的规则样式

    //    on[(String,Int,Int)]未比较前的规则样式

    //    (t=>(-t._3,t._2))怎样将规则转换成想要的格式

    implicit  val  Rules=Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2))

    val lsort =lmap.sortBy (tp =>tp)

    println(lsort.collect().toBuffer)

    sc.stop()

    }

    }

    相关文章

      网友评论

          本文标题:用Spark实现多种方式的排序

          本文链接:https://www.haomeiwen.com/subject/tuzblctx.html