1.aggregate
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用于聚合 rdd 中的元素,首先使用sepOP 将rdd 中的元素 T 聚合成 U ,然后使用 comOP 将之前聚合后的U 聚合成U
**特别注意: sepOP 和comOP 都会使用初值 zeroValue, 且类型为U
def setOP(a:Int ,b:Int): Int ={
println("seqOP: "+ a +"\t"+b)
math.min(a,b)
}
def comOP(a:Int ,b:Int): Int ={
println("comOP: "+ b+ "\t"+a)
a*b
}
def testAggregate(): Unit ={
val z =sc.parallelize( List(1,2,3,4,5,6),2)
println(z.aggregate(3)(setOP ,comOP))
}
结果分析
- 输出结果 7
- 首先 list 分为两个分区 123, 456
- 然后进行 setOP 操作
- 第一个分区: 输出 3,1 1,2 1,3
- 第二个分区: 输出 3,4 3,5 3,6
- 最后进行 comOP 操作
- 输出 3 1
4 3
- 最后的值为: 3+ 1+ 3 =7
2.fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
def testFold(): Unit ={
val z= sc.parallelize(List(1,2,3,4),2)
println(z.fold(1)(comOP))
}
3.Lookup
用于(K,V) 类型的RDD,制定K ,返回所有对应的V
def testLookup(): Unit ={
val z=sc.makeRDD(Array(("a",1),("a",2),("c",3)))
println(z.lookup("a"))
}
}
网友评论