在Spark官方文档中对aggregate函数定义如下
def aggregate[S](zeroValue: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S
其中seqOp是聚合各分区中的元素,将元素类型从T转变为S,操作的初始值是zeroValue
combop是将聚合各分区中元素的结果再次进行聚合,操作的初始值也是zeroValue
举个列子:
对List(1,2,3,4,5,6,7,8,9)取平均值
/**
* @author td
* @date 2017/11/23
*/
object Demo {
/**
* 对各分区的元素进行聚合,聚合的初始值为zeroValue
* @param tuple1
* @param num
*/
def sqlOp(tuple1: (Int, Int), num: Int): (Int, Int) = {
(tuple1._1+num,tuple1._2+1)
}
/**
* 对各分区元素聚合的结果再次进行聚合,聚合的初始值为zeroValue
* @param tuple1
* @return
*/
def combOp(tuple1: (Int,Int),tuple2: (Int,Int)): (Int,Int) = {
(tuple1._1+tuple2._1,tuple1._2+tuple2._2);
}
def main(args: Array[String]): Unit = {
val rdd = List(1,2,3,4,5,6,7,8,9)
val resutlt = rdd.par.aggregate((0,0))(sqlOp,combOp);
println(resutlt._1)
println(resutlt._2)
val avg = resutlt._1/resutlt._2;
println("平均值是"+avg)
}
}
其中sqlOp操作的过程是样子:
第一步:将zeroValue作为初始值进行运算(0,0)
def sqlOp((0,0),1): (Int, Int) = {
(0+1,0+1)
}
第二步:将第一步的结果tuple作为参数传入进去(1,1)
def sqlOp((1,1),2): (Int, Int) = {
(1+2,1+1)
}
以此类推sqlOp每一步的过程:
3+3, 2+1
6+4, 3+1
10+5, 4+1
15+6, 5+1
21+7, 6+1
28+8, 7+1
36+9, 8+1
上面说的spark单线程运行的情况,Spark在实际运行过程中是以分区多线程的形式运行
比如分为3个分区List(1,2,3,4)、List(5,6,7,8)、List(9)。然后combOp函数将sqlOp每个分区算出来的结果再次进行聚合。(0+1+2+3+4,4)、(0+5+6+7+8,4)、(0+9,1)
聚合后的结果是(0+1+2+3+4+0+5+6+7+8+0+9,4+4+1) 即(45,9)。在求平均值就简单了。
网友评论