美文网首页大数据相关
Spark--spark的二次排序(面试必问)

Spark--spark的二次排序(面试必问)

作者: 李小李的路 | 来源:发表于2019-03-29 17:11 被阅读98次

    什么是二次排序

    • 二次排序就是key之间有序,而且每个Key对应的value也是有序的;也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
    [root@iteblog.com /tmp]# vim data.txt 
    2015,1,24
    2015,3,56
    2015,1,3
    2015,2,-43
    2015,4,5
    2015,3,46
    2014,2,64
    2015,1,4
    2015,1,21
    2015,2,35
    2015,2,0
    我们期望的输出结果是
    2014-2  64
    2015-1  3,4,21,24
    2015-2  -43,0,35
    2015-3  46,56
    2015-4  5
    

    spark 二次排序解决方案

    我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部弄到一起,然后对同一个Key的所有Value进行排序即可。
    scala 版实现过程,分为遍历输出和转成df格式,可进行下一步执行,重点理解groupByKey()算子和scala函数编程的思想。

    package cn.ted.secondarySort
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    
    /*
      * Author:  LiYahui
      * Date:  Created in  2019/3/1 11:21
      * Description: TODO spark实现二次排序,key有序,value内部的数据同样有序
      * Version: V1.0         
      */
    
    object SecondarySort {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
            .appName(s"${this.getClass.getSimpleName}")
            .master("local[2]")
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.sql.parquet.compression.codec", "gzip")
            .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        val inputPath = "F:\\LocalFileForTest\\secondarySort"
        //将结果进行打印
        sc.textFile(inputPath)
            .map(line => {
              val arr: Array[String] = line.split(",")
              //对年份和月份进行拼接
              val key: String = arr(0) + "-" + arr(1)
              val value = arr(2)
              //拼接成kv类型
              (key, value)
            })
            .groupByKey()
            .map(line => (line._1, line._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) //value内部进行升序排列
            .sortByKey(true) //key升序排列
            .collect()
            .foreach(println)
        import spark.implicits._
    
        //转换成df格式进行计算
        sc.textFile(inputPath)
            .map(line => {
              val arr: Array[String] = line.split(",")
              //对年份和月份进行拼接
              val key: String = arr(0) + "-" + arr(1)
              val value = arr(2)
              //拼接成kv类型
              (key, value)
            })
            .groupByKey()
            .map(line => (line._1, line._2.toList.sortWith(_.toInt < _.toInt).mkString(","))) //value内部进行升序排列
            .sortByKey(true) //key升序
            .toDF("key", "value")
        spark.stop()
        sc.stop()
      }
    
      /**
        * 数据源和期望结果
        * [root@iteblog.com /tmp]# vim data.txt
        * 2015,1,24
        * 2015,3,56
        * 2015,1,3
        * 2015,2,-43
        * 2015,4,5
        * 2015,3,46
        * 2014,2,64
        * 2015,1,4
        * 2015,1,21
        * 2015,2,35
        * 2015,2,0
        * 我们期望的输出结果是
        *
        * 2014-2  64
        * 2015-1  3,4,21,24
        * 2015-2  -43,0,35
        * 2015-3  46,56
        * 2015-4  5
        *
        */
    
    }
    

    相关文章

      网友评论

        本文标题:Spark--spark的二次排序(面试必问)

        本文链接:https://www.haomeiwen.com/subject/trhrbqtx.html