美文网首页初见
Spark实战 - 2 - RDD实战:Average Frie

Spark实战 - 2 - RDD实战:Average Frie

作者: 钢铁大郭 | 来源:发表于2020-05-22 20:01 被阅读0次

这篇要有一点map reduce基础哦
用到RDD的key value

Key & Value

和Hadoop一样……

比如我们有给一个Friend数据集,里面有有一个column是age,一个column是name
那么key就是age,value就是该年龄下name的总数

RDD可以hold住 Key/Value pairs:

// 
totalsByAge = rdd.map(age => (age, 1))

(33, 1)
(33, 1)
(12, 1)
(23, 1)

这些k/v对,是一个2个元素的tuple。

看到这里,其实你意识到了,scala/spark也没啥新鲜的。只是把k/v对映射成一个tuple了

bingo,现在我们就有了一个k/v组成的RDD了,撒花。

常用的RDD操作api

reduceByKey() 加和相同key的value。

rdd.reduceByKey((x, y) = >x + y) //此处的x、y并非k/v对。其实是value而已。

groupByKey() 将相同key的RDD group起来
sortByKey() 用key来排序RDD
keys(), values() 创建一个只要keys或values的RDD
还有一些join(), rightOuterJoin(), leftOuterJoin(), cogroup() 等等,可以看看文档。

RDD是不是有点像个NoSQL 的数据集?

如果不需要key的话

mapValues() & flatMapValues()这两个api是很好的选择,性能比较好。

开始战斗!

我们的数据集长这样

id, name, age, numFriends
0, jean, 33, 2
1, hugh, 55, 221
2, will, 33, 385

我们要做的,是找出各个年龄的friendNum的平均值。

第一步 Mapping

  1. 定义方法解析数据,读入文件
// 解析数据的方法,是不是和Hadoop的很像!
def parseLine(line: String)  = {
  val fields = line.split(",")
  val age = fields(2).toInt
  val numFriends = fields(3).toInt
  // (age, numFriends)
}

val lines = sc.textFile("path/to/the/file.csv")
val rdd = lines.map(parseLine) // 将parseLine传入作为mapping的方法
  1. Map阶段
    我们看一个比较吓人的代码
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))

亲娘嘞这是啥??!!

先不急哈,一步一步来。
我们先看rdd.mapValues(x => (x, 1)),它return了啥?

(33, 385)=> (33, (385, 1))
(33, 2)=> (33, (2, 1))
(55, 221)=> (33, (55, 221))

这个方法把刚才那个parseLine的tuple们转化成了如上的模式。mapValues只是用了tuple的value。是不是好神奇?

然后我们来看.reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))
熟悉jQuery的小伙伴一定不陌生,这个方法的参数就是接着上一个方法的返回的rdd。
但是既然是reduceByKey,那么Key就不参与计算了,直接传入的(x, y)其实是(385, 1)以及(2, 1)
弄个清楚点的

(33, (385, 1)) => (385, 1) // 只取这个内嵌的tuple, 这个作为参数 x
|
V
(33, (2, 1)) => (2, 1) // 按照RDD顺序,下一个也是取内嵌的tuple,这个作为y
|
V
这一轮的返回值作为x传入,然后y取下一个的内嵌的tuple,递归着跑。(实在不明白,留言咱们讨论……)

这样就成了我们可以计算的RDD了!

  1. Reduce阶段
    这里就好简答啦。
    val averagesByAge = totalsByAge.mapValues(x=> x._1 / x._2)
(33, (387, 2)) => (33, 193.5)
...
  1. 获取结果
    此时虽然得到了结果,但是还无法显示,因为结果是个RDD,所有要多一步来显示它。
val results = averagesByAge.collect()
result.sorted.foreach(println)

是不是有种超神的感觉呢,嘿嘿嘿

运行起来走一个

如果不熟悉环境搭建的,请看我Spark实战 - 1 - 配置和运行 超级详细了
数据集:FakeFriend.csv 提取码:z3db

完整代码:

package com.ephraim.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._

object FriendsByAge {
  
  def parseLine(line: String) = {
      val fields = line.split(",")

      val age = fields(2).toInt
      val numFriends = fields(3).toInt

      (age, numFriends)
  }
  
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    Logger.getLogger("org").setLevel(Level.ERROR)
        
    val sc = new SparkContext("local[*]", "FriendsByAge")
  
    val lines = sc.textFile("./fakefriends/fakefriends.csv")

    val rdd = lines.map(parseLine)

    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

    val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)

    val results = averagesByAge.collect()

    results.sorted.foreach(println)
  }
    
}
(18,343)
(19,213)
(20,165)
(21,350)
(22,206)
(23,246)
(24,233)
(25,197)
(26,242)
(27,228)
(28,209)
(29,215)
(30,235)
(31,267)
(32,207)
(33,325)
(34,245)
(35,211)
(36,246)
(37,249)
(38,193)
(39,169)
(40,250)
(41,268)
(42,303)
(43,230)
(44,282)
(45,309)
(46,223)
(47,233)
(48,281)
(49,184)
(50,254)
(51,302)
(52,340)
(53,222)
(54,278)
(55,295)
(56,306)
(57,258)
(58,116)
(59,220)
(60,202)
(61,256)
(62,220)
(63,384)
(64,281)
(65,298)
(66,276)
(67,214)
(68,269)
(69,235)

跟我一样优秀的你,一定能得到一样的结果呢!

欢迎大家点赞,评论,打赏哈!

相关文章

网友评论

    本文标题:Spark实战 - 2 - RDD实战:Average Frie

    本文链接:https://www.haomeiwen.com/subject/znjcahtx.html