一、
combineByKey(_, _, _)需要3个参数
1.将第一个key出现的v转换结构计算规则
2.第二个参数表示分区内计算规则
3.第三个参数表示分区间计算规则
二、作用:对相同K,把V合并成一个集合。
三、需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
以val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)为例。
f分区1:("a", 88), ("b", 96), ("a", 91)
分区2: ("b", 93), ("a", 95), ("b", 98)
1)分区内第一次碰到key的时候,将数据V进行结构的转变:v => (v,1):
分区1: ("a", 88) -》 (88,1), ("b", 96) -》 (96,1)
分区2: ("b", 93) -》 (93,1), ("a", 95) -》 (95,1)
2)分区内计算规则
(t:(Int, Int), num:Int) => {
(t._1 + num, t._2 + 1)
}
- ((88,1),91) => (179,2),, (96,1)
- ((93,1),98) => (191,2),, (95,1)
3)分区间计算规则
(t1:(Int, Int), t2:(Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
1.((179,2),(95,1)) => (274,3)
2.((191,2),(96,1)) => (287,3)
combineByKey源码
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
//计算每种key的平均值
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
//需求:创建一个pairRDD,根据key计算每种key的均值。
// (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey[(Int, Int)](
(num: Int) => (num, 1),
(t: (Int, Int), num: Int) => {
(t._1 + num, t._2 + 1)
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
rdd1.collect().foreach(println)
println("*********")
val resultRDD: RDD[(String, Int)] = rdd1.map {
case (key, t) => {
(key, t._1 / t._2)
}
}
resultRDD.collect().foreach(println)
sc.stop()
}
}
(b,(287,3))
(a,(274,3))
*********
(b,95)
(a,91)
网友评论