美文网首页
combineByKey[C]

combineByKey[C]

作者: yayooo | 来源:发表于2019-07-31 00:06 被阅读0次

    一、
    combineByKey(_, _, _)需要3个参数
    1.将第一个key出现的v转换结构计算规则
    2.第二个参数表示分区内计算规则
    3.第三个参数表示分区间计算规则

    二、作用:对相同K,把V合并成一个集合。

    三、需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
    以val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)为例。
    f分区1:("a", 88), ("b", 96), ("a", 91)
    分区2: ("b", 93), ("a", 95), ("b", 98)
    1)分区内第一次碰到key的时候,将数据V进行结构的转变:v => (v,1):
    分区1: ("a", 88) -》 (88,1), ("b", 96) -》 (96,1)
    分区2: ("b", 93) -》 (93,1), ("a", 95) -》 (95,1)

    2)分区内计算规则
    (t:(Int, Int), num:Int) => {
    (t._1 + num, t._2 + 1)
    }

    1. ((88,1),91) => (179,2),, (96,1)
    2. ((93,1),98) => (191,2),, (95,1)

    3)分区间计算规则
    (t1:(Int, Int), t2:(Int, Int)) => {
    (t1._1 + t2._1, t1._2 + t2._2)
    }
    1.((179,2),(95,1)) => (274,3)
    2.((191,2),(96,1)) => (287,3)

    combineByKey源码

      def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
      }
    
    
    //计算每种key的平均值
    package com.atguigu
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    
    object Trans {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
    
    
      //需求:创建一个pairRDD,根据key计算每种key的均值。
        // (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
    
        val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey[(Int, Int)](
          (num: Int) => (num, 1),
          (t: (Int, Int), num: Int) => {
            (t._1 + num, t._2 + 1)
          },
          (t1: (Int, Int), t2: (Int, Int)) => {
            (t1._1 + t2._1, t1._2 + t2._2)
          }
        )
       rdd1.collect().foreach(println)
        println("*********")
    
        val resultRDD: RDD[(String, Int)] = rdd1.map {
          case (key, t) => {
            (key, t._1 / t._2)
          }
        }
        resultRDD.collect().foreach(println)
    
        sc.stop()
      }
    }
    
    
    

    (b,(287,3))
    (a,(274,3))
    *********
    (b,95)
    (a,91)

    相关文章

      网友评论

          本文标题:combineByKey[C]

          本文链接:https://www.haomeiwen.com/subject/xvinrctx.html