美文网首页
2020-11-27-Spark-6(Spark-Core)

2020-11-27-Spark-6(Spark-Core)

作者: 冰菓_ | 来源:发表于2020-12-05 08:02 被阅读0次

    spark练习题
    处理数据上的分组和业务需求上的分组

    1.案例topN(要点使用模式匹配重新分组)

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    //需求:统计出每一个省份广告被点击次数的TOP3
    //数据的示例:
    //1516609143869 4 6 1 11
    //1516609143869 3 6 49 7
    //1516609143869 8 3 4 18
    //1516609143869 8 8 69 14
    //1516609143869 0 6 51 29
    //1516609143869 5 3 59 2
    //1516609143869 8 4 66 25
    
    //需求:时间戳 省份 用户 城市 广告
    object Test7 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("test7").setMaster("local[*]"))
        val rdd = sc.textFile("src/main/resources/top.txt")
        //由于shuffle传输的字节越多性能越差,只需要需求的数据
        val maprdd: RDD[((String, String), Int)] = rdd.map(data => {
          val th = data.split(" ")
          //结构为 ((省份,城市),广告数)
          ((th(1), th(3)), th(4).toInt)
        })
        //使用效率高的reducebykey,求取出(省份,城市)分组内的所有广告数
        val reducerdd: RDD[((String, String), Int)] = maprdd.reduceByKey(_ + _)
        //使用case 模式匹配重新分组
        val mapardd = reducerdd.map(data => data match {
          case ((province, city), add) => (province, (city, add))
        })
        val grouprdd: RDD[(String, Iterable[(String, Int)])] = mapardd.groupByKey()
        //获取前三
        val result: RDD[(String, List[(String, Int)])] = grouprdd.mapValues(data => data.toList.sortBy(_._2)(Ordering.Int.reverse).take(3))
        result.collect.foreach(println)
        sc.stop()
      }
    }
    

    2.基础练习题(过滤求和,最值问题,平均值的多解法效率,join)

    12 宋江 25 男 chinese 50
    12 宋江 25 男 math 60
    12 宋江 25 男 english 70
    12 吴用 20 男 chinese 50
    12 吴用 20 男 math 50
    12 吴用 20 男 english 50
    12 杨春 19 女 chinese 70
    12 杨春 19 女 math 70
    12 杨春 19 女 english 70
    13 李逵 25 男 chinese 60
    13 李逵 25 男 math 60
    13 李逵 25 男 english 70
    13 林冲 20 男 chinese 50
    13 林冲 20 男 math 60
    13 林冲 20 男 english 50
    13 王英 19 女 chinese 70
    13 王英 19 女 math 80
    13 王英 19 女 english 70
    

    1.一共有多少个小于20岁的人参加考试?

     val result = rdd.map(data => data.split(" ")).filter(x => x(2).toInt < 20).groupBy(_(1)).count()
    

    2.一共有多少个女生参加考试?

     val result = rdd.map(_.split(" ")).filter(_ (3) == "男").groupBy(_(1)).count()
    

    3.语文科目的平均成绩是多少?

        val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
        val rdd = sc.textFile("src/main/resources/sanguo.txt")
        rdd.map(data => {
          val number = data.split(" ")
          ((number(4), number(5).toInt))
        }).filter(_._1 == "chinese").aggregateByKey((0, 0))(
          (u, v) => ((u._1 + v), u._2 + 1),
          (x, x1) => ((x._1 + x1._1), (x._2 + x1._2))
        ).mapValues(data => data._1 / data._2).collect.foreach(println)
        sc.stop()
    

    4.13班数学最高成绩是多少

    object Test2 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
        val rdd = sc.textFile("src/main/resources/sanguo.txt")
        val strings: Int = rdd.map(_.split(" ")).filter(_ (0) == "13").filter(_ (4) == "math").map(_ (5).toInt).max()
          //.sortBy(_.toInt,false).take(1)
        println(strings)
        sc.stop()
      }
    }
    

    5.总成绩大于150分的12班的女生有几个?

    object Test3 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
        val rdd = sc.textFile("src/main/resources/sanguo.txt")
        val filtrdd: RDD[Array[String]] = rdd.map(_.split(" ")).filter(data => data(0).equals("12") && data(3).equals("女"))
        val rerdd: RDD[(String, Int)] = filtrdd.map(data => (data(1), data(5).toInt)).reduceByKey(_ + _)
        rerdd.collect.foreach(println)
        val result: Long = rerdd.filter(_._2 > 150).count()
        println(result)
        sc.stop()
      }
    }
    

    6.总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    //总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?
    object Test4 {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
        val data = sc.textFile("src/main/resources/sanguo.txt")
        //首先求取总成绩大于150的姓名和总成绩
        val mapdd = data.map(txt => {
          val strings = txt.split(" ")
          (strings(1), (strings(4),strings(5).toInt))
        })
        //获取总成绩和科目数
        val aggrdd: RDD[(String, (Int, Int))] = mapdd.aggregateByKey((0, 0))(
          (u, v) => ((u._1 + v._2), u._2 + 1),
          (x, x1) => ((x._1 + x1._1), x._2 + x1._2)
        )
        //得到的是总成绩大于150的姓名和平均值
        val re1: RDD[(String, Int)] = aggrdd.filter(_._2._1 > 150).mapValues(data => data._1 / data._2)
        //(王英,73)
        //(宋江,60)
        //(杨春,70)
        //(李逵,63)
        //(林冲,53)
    
        //且数学大于等于70,且年龄大于等于19岁的学生
        val maprdd: RDD[Array[String]] = data.map(_.split(" "))
        val filtrdd: RDD[Array[String]] = maprdd.filter(data => data(2).toInt >= 19 && data(4).equals("math") && data(5).toInt >= 70)
        val re2: RDD[(String, Int)] = filtrdd.map(data => {
          (data(1), data(5).toInt)
        })
        //join连接
        val result: RDD[(String, Int)] = re1.join(re2).map(data => {
          (data._1, data._2._1)
        })
        result.collect.foreach(println)
        sc.stop()
        //(王英,73)
        //(杨春,70)
      }
    }
    

    相关文章

      网友评论

          本文标题:2020-11-27-Spark-6(Spark-Core)

          本文链接:https://www.haomeiwen.com/subject/btcziktx.html