美文网首页
combineByKey[C]

combineByKey[C]

作者: yayooo | 来源:发表于2019-07-31 00:06 被阅读0次

一、
combineByKey(_, _, _)需要3个参数
1.将第一个key出现的v转换结构计算规则
2.第二个参数表示分区内计算规则
3.第三个参数表示分区间计算规则

二、作用:对相同K,把V合并成一个集合。

三、需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
以val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)为例。
f分区1:("a", 88), ("b", 96), ("a", 91)
分区2: ("b", 93), ("a", 95), ("b", 98)
1)分区内第一次碰到key的时候,将数据V进行结构的转变:v => (v,1):
分区1: ("a", 88) -》 (88,1), ("b", 96) -》 (96,1)
分区2: ("b", 93) -》 (93,1), ("a", 95) -》 (95,1)

2)分区内计算规则
(t:(Int, Int), num:Int) => {
(t._1 + num, t._2 + 1)
}

  1. ((88,1),91) => (179,2),, (96,1)
  2. ((93,1),98) => (191,2),, (95,1)

3)分区间计算规则
(t1:(Int, Int), t2:(Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
1.((179,2),(95,1)) => (274,3)
2.((191,2),(96,1)) => (287,3)

combineByKey源码

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }

//计算每种key的平均值
package com.atguigu

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object Trans {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)


  //需求:创建一个pairRDD,根据key计算每种key的均值。
    // (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)

    val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey[(Int, Int)](
      (num: Int) => (num, 1),
      (t: (Int, Int), num: Int) => {
        (t._1 + num, t._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
   rdd1.collect().foreach(println)
    println("*********")

    val resultRDD: RDD[(String, Int)] = rdd1.map {
      case (key, t) => {
        (key, t._1 / t._2)
      }
    }
    resultRDD.collect().foreach(println)

    sc.stop()
  }
}


(b,(287,3))
(a,(274,3))
*********
(b,95)
(a,91)

相关文章

网友评论

      本文标题:combineByKey[C]

      本文链接:https://www.haomeiwen.com/subject/xvinrctx.html