val rdd=sc.makeRDD(Array(1,8,6,4,9,3,76,4))
val sorted = rdd.sortBy(identity).zipWithIndex().map {
case (v, idx) => (idx, v)
}
val count = sorted.count()
val median: Double = if (count % 2 == 0) {
val l = count / 2 - 1
val r = l + 1
(sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
} else sorted.lookup(count / 2).head.toDouble
println(median)
首先根据元素排序(sortBy),得到有序RDD
RDD[Int]=(1,3,4,4,6,8,9,76)
1
WithIndex后与index调换顺序.
(1,1),(2,3),(3,4),(4,4),(5,6),(6,8),(7,9),(8,76)
然后如果有偶数个的话,中位数就是最中间两个数的平均数
lookup
lookup是根据map中的键来取出相应的值的,
如上面的sorted.lookup(1) ,得到的结果是一个序列Seq[Int]
如果sorted是
RDD((1,(1,"a",1.0)),(1,(2,"b",9.1))
sorted.lookup(1)得到的就是Seq[(Int,String,Double)],是键为1的所有值的序列,
println(sorted.lookup(1))
//是
WrappedArray((1,a,1.0),(2,b,9.0))
sorted.lookup(1).head是第一项,即(1,a,1.0),
sorted.lookup(1).tail是第二项,即Seq[(Int,String,Double)]
网友评论