美文网首页
解读Spark的aggregate函数用法

解读Spark的aggregate函数用法

作者: shuaidong | 来源:发表于2018-03-26 17:39 被阅读0次

在Spark官方文档中对aggregate函数定义如下

def aggregate[S](zeroValue: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S
其中seqOp是聚合各分区中的元素,将元素类型从T转变为S,操作的初始值是zeroValue
combop是将聚合各分区中元素的结果再次进行聚合,操作的初始值也是zeroValue

举个列子:

对List(1,2,3,4,5,6,7,8,9)取平均值

/**
  * @author td
  * @date 2017/11/23
  */
object Demo {

  /**
    * 对各分区的元素进行聚合,聚合的初始值为zeroValue
    * @param tuple1
    * @param num
    */
  def sqlOp(tuple1: (Int, Int), num: Int): (Int, Int) = {
    (tuple1._1+num,tuple1._2+1)
  }

  /**
    * 对各分区元素聚合的结果再次进行聚合,聚合的初始值为zeroValue
    * @param tuple1
    * @return
    */
  def combOp(tuple1: (Int,Int),tuple2: (Int,Int)): (Int,Int) = {
    (tuple1._1+tuple2._1,tuple1._2+tuple2._2);
  }
  def main(args: Array[String]): Unit = {
    val rdd = List(1,2,3,4,5,6,7,8,9)

    val resutlt = rdd.par.aggregate((0,0))(sqlOp,combOp);

    println(resutlt._1)
    println(resutlt._2)
    val avg = resutlt._1/resutlt._2;
    println("平均值是"+avg)

  }
}

其中sqlOp操作的过程是样子:
第一步:将zeroValue作为初始值进行运算(0,0)
def sqlOp((0,0),1): (Int, Int) = {
(0+1,0+1)
}
第二步:将第一步的结果tuple作为参数传入进去(1,1)
def sqlOp((1,1),2): (Int, Int) = {
(1+2,1+1)
}
以此类推sqlOp每一步的过程:
3+3, 2+1
6+4, 3+1
10+5, 4+1
15+6, 5+1
21+7, 6+1
28+8, 7+1
36+9, 8+1
上面说的spark单线程运行的情况,Spark在实际运行过程中是以分区多线程的形式运行
比如分为3个分区List(1,2,3,4)、List(5,6,7,8)、List(9)。然后combOp函数将sqlOp每个分区算出来的结果再次进行聚合。(0+1+2+3+4,4)、(0+5+6+7+8,4)、(0+9,1)
聚合后的结果是(0+1+2+3+4+0+5+6+7+8+0+9,4+4+1) 即(45,9)。在求平均值就简单了。

相关文章

网友评论

      本文标题:解读Spark的aggregate函数用法

      本文链接:https://www.haomeiwen.com/subject/dngrcftx.html