美文网首页spark
spark二次排序案例

spark二次排序案例

作者: Frank_8942 | 来源:发表于2018-12-26 13:20 被阅读15次
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    
    object Spark03 {
    
      //二次排序
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[2]").appName("03").getOrCreate()
        import spark.implicits._
        Logger.getRootLogger.setLevel(Level.WARN)
    
    
        //sparkcore 计算方式
        fun1(spark.sparkContext)
        //sparksql 计算方式
        fun2(spark)
    
        spark.stop()
      }
    
    
      //使用sparkcore实现
      /*
      sparkcore实现思路:
      第一步: 自定义 key 实现scala.math.Ordered接口和Serializeable接口
      第二步:将要进行二次排序的数据加载, 将数据映射为 <key,value> 格式的RDD
      第三步:使用sortByKey 基于自定义的key进行二次排序
      第四步:去掉排序的key,只保留排序的结果
       */
      def fun1( sc:SparkContext ):Unit ={
        val data = List("hadoop  spark scala spark", "scala  mr", "spark  scala  mr  java" )
    
        sc.parallelize(data, 3)
          // 正则表达式  \\s 表示空白字符
          .flatMap( _.split("\\s+")  )
          .map( (_,1) )
          .reduceByKey( _+_ )
          .map( bean =>{
            ( MySortKey(bean._1, bean._2), bean )
          })
          .sortByKey(false,1)
          .map(_._2)
          .foreach( println )
      }
    
    
      //使用sparksql实现
      /**
        使用sql完成二次排序
        select  *   from   tablename   order  by  field1, field2 desc
        */
      def fun2( spark:SparkSession ):Unit ={
        import spark.implicits._
        val data = List("hadoop  spark scala spark", "scala  mr", "spark  scala  mr  java" )
    
        spark.sparkContext.parallelize(data,3)
          .flatMap( _.split("\\s+")  )
          .map( (_,1) )
          .reduceByKey( _+_ )
          .toDF("word","count")
          .createOrReplaceTempView("temptable")
    
        val sql = s"select word,count from temptable order by count desc,word "
        spark.sql(sql).show(false)
        
      }
    
    
      //Ordered[自定义数据类型]      用来自定义排序规则
      case class MySortKey(first:String, second:Int) extends Ordered[MySortKey]{
        //重写比较方法
        // 首先 按照second字段 数字大小降序, 之后按照 first字段 字典升序
        override def compare(that: MySortKey): Int = {
          val temp = this.second - that.second
          if ( temp != 0 ){
            temp
          }else{
            - this.first.compareTo(that.first)
          }
        }
      }
    
    }
    
    
    

    相关文章

      网友评论

        本文标题:spark二次排序案例

        本文链接:https://www.haomeiwen.com/subject/pqkhlqtx.html