美文网首页初见
Spark实战 - 2 - RDD实战:Average Frie

Spark实战 - 2 - RDD实战:Average Frie

作者: 钢铁大郭 | 来源:发表于2020-05-22 20:01 被阅读0次

    这篇要有一点map reduce基础哦
    用到RDD的key value

    Key & Value

    和Hadoop一样……

    比如我们有给一个Friend数据集,里面有有一个column是age,一个column是name
    那么key就是age,value就是该年龄下name的总数

    RDD可以hold住 Key/Value pairs:

    // 
    totalsByAge = rdd.map(age => (age, 1))
    
    (33, 1)
    (33, 1)
    (12, 1)
    (23, 1)
    

    这些k/v对,是一个2个元素的tuple。

    看到这里,其实你意识到了,scala/spark也没啥新鲜的。只是把k/v对映射成一个tuple了

    bingo,现在我们就有了一个k/v组成的RDD了,撒花。

    常用的RDD操作api

    reduceByKey() 加和相同key的value。

    rdd.reduceByKey((x, y) = >x + y) //此处的x、y并非k/v对。其实是value而已。
    

    groupByKey() 将相同key的RDD group起来
    sortByKey() 用key来排序RDD
    keys(), values() 创建一个只要keys或values的RDD
    还有一些join(), rightOuterJoin(), leftOuterJoin(), cogroup() 等等,可以看看文档。

    RDD是不是有点像个NoSQL 的数据集?

    如果不需要key的话

    mapValues() & flatMapValues()这两个api是很好的选择,性能比较好。

    开始战斗!

    我们的数据集长这样

    id, name, age, numFriends
    0, jean, 33, 2
    1, hugh, 55, 221
    2, will, 33, 385
    

    我们要做的,是找出各个年龄的friendNum的平均值。

    第一步 Mapping

    1. 定义方法解析数据,读入文件
    // 解析数据的方法,是不是和Hadoop的很像!
    def parseLine(line: String)  = {
      val fields = line.split(",")
      val age = fields(2).toInt
      val numFriends = fields(3).toInt
      // (age, numFriends)
    }
    
    val lines = sc.textFile("path/to/the/file.csv")
    val rdd = lines.map(parseLine) // 将parseLine传入作为mapping的方法
    
    1. Map阶段
      我们看一个比较吓人的代码
    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))
    

    亲娘嘞这是啥??!!

    先不急哈,一步一步来。
    我们先看rdd.mapValues(x => (x, 1)),它return了啥?

    (33, 385)=> (33, (385, 1))
    (33, 2)=> (33, (2, 1))
    (55, 221)=> (33, (55, 221))
    

    这个方法把刚才那个parseLine的tuple们转化成了如上的模式。mapValues只是用了tuple的value。是不是好神奇?

    然后我们来看.reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))
    熟悉jQuery的小伙伴一定不陌生,这个方法的参数就是接着上一个方法的返回的rdd。
    但是既然是reduceByKey,那么Key就不参与计算了,直接传入的(x, y)其实是(385, 1)以及(2, 1)
    弄个清楚点的

    (33, (385, 1)) => (385, 1) // 只取这个内嵌的tuple, 这个作为参数 x
    |
    V
    (33, (2, 1)) => (2, 1) // 按照RDD顺序,下一个也是取内嵌的tuple,这个作为y
    |
    V
    这一轮的返回值作为x传入,然后y取下一个的内嵌的tuple,递归着跑。(实在不明白,留言咱们讨论……)
    

    这样就成了我们可以计算的RDD了!

    1. Reduce阶段
      这里就好简答啦。
      val averagesByAge = totalsByAge.mapValues(x=> x._1 / x._2)
    (33, (387, 2)) => (33, 193.5)
    ...
    
    1. 获取结果
      此时虽然得到了结果,但是还无法显示,因为结果是个RDD,所有要多一步来显示它。
    val results = averagesByAge.collect()
    result.sorted.foreach(println)
    

    是不是有种超神的感觉呢,嘿嘿嘿

    运行起来走一个

    如果不熟悉环境搭建的,请看我Spark实战 - 1 - 配置和运行 超级详细了
    数据集:FakeFriend.csv 提取码:z3db

    完整代码:

    package com.ephraim.spark
    
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    import org.apache.log4j._
    
    object FriendsByAge {
      
      def parseLine(line: String) = {
          val fields = line.split(",")
    
          val age = fields(2).toInt
          val numFriends = fields(3).toInt
    
          (age, numFriends)
      }
      
      /** Our main function where the action happens */
      def main(args: Array[String]) {
       
        Logger.getLogger("org").setLevel(Level.ERROR)
            
        val sc = new SparkContext("local[*]", "FriendsByAge")
      
        val lines = sc.textFile("./fakefriends/fakefriends.csv")
    
        val rdd = lines.map(parseLine)
    
        val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
    
        val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
    
        val results = averagesByAge.collect()
    
        results.sorted.foreach(println)
      }
        
    }
    
    (18,343)
    (19,213)
    (20,165)
    (21,350)
    (22,206)
    (23,246)
    (24,233)
    (25,197)
    (26,242)
    (27,228)
    (28,209)
    (29,215)
    (30,235)
    (31,267)
    (32,207)
    (33,325)
    (34,245)
    (35,211)
    (36,246)
    (37,249)
    (38,193)
    (39,169)
    (40,250)
    (41,268)
    (42,303)
    (43,230)
    (44,282)
    (45,309)
    (46,223)
    (47,233)
    (48,281)
    (49,184)
    (50,254)
    (51,302)
    (52,340)
    (53,222)
    (54,278)
    (55,295)
    (56,306)
    (57,258)
    (58,116)
    (59,220)
    (60,202)
    (61,256)
    (62,220)
    (63,384)
    (64,281)
    (65,298)
    (66,276)
    (67,214)
    (68,269)
    (69,235)
    

    跟我一样优秀的你,一定能得到一样的结果呢!

    欢迎大家点赞,评论,打赏哈!

    相关文章

      网友评论

        本文标题:Spark实战 - 2 - RDD实战:Average Frie

        本文链接:https://www.haomeiwen.com/subject/znjcahtx.html