美文网首页
关于Spark、Scala实现WordCount的8种写法(多种

关于Spark、Scala实现WordCount的8种写法(多种

作者: 每天起床打酱油 | 来源:发表于2019-07-31 09:39 被阅读0次

    1、groupByKey()

    RDD[(String, Int)] = sc.makeRDD(List(("a",1), ("b",2), ("a",3), ("b",4)))

    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1), ("b",2), ("a",3), ("b",4)))
    
    // 转换算子 —— groupByKey
    val rdd2: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    
    val rdd3: RDD[(String, Int)] = rdd2.map {
      case (c, datas) => {
        (c, datas.sum)
      }
    }
    rdd3.collect().foreach(println)
    sc.stop()
    

    2、reduceByKey()

    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
    
    // 转换算子 —— reduceByKey
    //val value: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>{x+y})
    val value: RDD[(String, Int)] = rdd.reduceByKey(_+_)
    
    value.collect().foreach(println)
    sc.stop()
    

    3、aggregateByKey

    转换算子 —— aggregateByKey()()使用了函数柯里化
    存在两个参数列表 :
    第一个参数列表表示分区内计算时的初始值(零值)——在初始值的基础上做比较运算
    第二参数列表中需要传递两个参数
    第一个参数表示分区内计算规则
    第二个参数表示分区间计算规则

      val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",2), ("b",3),
          ("a",3), ("b",4), ("a", 5)), 2)
        val value: RDD[(String, Int)] = rdd.aggregateByKey(0)
        ((x,y)=>{Math.max(x,y)},
        (x,y)=>{x+y})
        value.collect().foreach(println)
    

    4、foldByKey()

    foldByKey其实就是aggregateByKey简化版,
    当aggregateByKey中分区内和分区间的计算规则一样时,使用foldByKey就可以了
    rdd.aggregateByKey(0)(+,+) ——> rdd.foldByKey(0)(+)

    // TODO 当aggregateByKey中分区内和分区间的计算规则一样时,使用foldByKey就可以了
    
    //rdd.aggregateByKey(10)((x,y)=>{Math.max(x,y)},(x,y)=>{x+y})
    //  val value2: RDD[(String, Int)] = rdd.aggregateByKey(0)(_+_,_+_)
    val value3: RDD[(String, Int)] = rdd.foldByKey(0)(_+_)
    value3.collect().foreach(println)
    

    5、combineByKey()

    根据key计算每种key的平均值
    combineByKey 需要传递三个参数

    1. 将第一个key出现的v转换结构计算规则
    2. 第二个参数表示分区内计算规则
    3. 第三个参数表示分区间计算规则
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(
      Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),
      2
    )
    //过程
    // ("a", 88), ("b", 95), ("a", 91)
    // ("b", 93), ("a", 95), ("b", 98)
    // (88,1) + 99 => (187,2)
    
    // 分区内第一次碰见key的时候,将数据V进行结构的转变
    // v => (v,1)
    // combineByKey 需要传递三个参数
    val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
       //  1. 将第一个key出现的v转换结构计算规则
      //("a", 88)=>(88,1)
      (num: Int) => (num, 1),
      //  2. 第二个参数表示分区内计算规则
     // (88,1) + 99 => (187,2}
      (t: (Int, Int), num: Int) => {
        (t._1 + num, t._2 + 1) 
      //  3. 第三个参数表示分区间计算规则  
      //tuple(a1,))
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    val resultRDD: RDD[(String, Int)] = rdd1.map {
      case (key, t) => {
        (key, t._1 / t._2)
      }
    }
    resultRDD.collect().foreach(println)
    

    6、基础版 groupBy+双层map()

    val rdd1= rdd.groupBy{case t=>t._1}
    val rdd2= rdd1.map {
      case (words, t) => {
        (words,t.map {
          case (word,num) => num
        })
      }
    }
    val result: RDD[(String, Int)] = rdd2.map {
      case (word, t) => (word, t.sum)
    }
    

    7 、 行动算子 ——countByKey

    println(rdd.countByKey())
    

    8、 行动算子 ——countByValue

    以tuple为单位 ("a",1)

    println(rdd.countByValue())
    

    相关文章

      网友评论

          本文标题:关于Spark、Scala实现WordCount的8种写法(多种

          本文链接:https://www.haomeiwen.com/subject/nuoarctx.html