突然听同事提起,就随便写一下的。
reduceByKey==groupByKey().map()
做一个word count小例子,
val counts = pairs.groupByKey(count=>(count._1,count._2.sum))
groupByKey的过程 MapPartitionsRDD=>ShuffledRDD=>MapPartitionsRDD=>MapPartitionsRDD
也就是说,它是原封不动的,把ShuffleMapTask的输出,来去到ResultTask的内存中,所以导致所有数据都进行了网络传输
而如果是reduceByKey,看下shuffleMapTask的write的实现,判断了是否有mapSideCombine,如果有,就先本地聚合,再写磁盘,再传输。
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
......
}
这到底是在干什么
谈之前要有个共识,分布式系统,网络传输是占时间比重高也非常影响效率的部分。
说些比较飘浮的内容,这其实是mapreduce比较经典的map端combine,也就是说因为是分布式系统啊,首先把数据分散到各个节点并行计算,算完了再把数据传到其他节点去做最终结果计算。那么在第一次计算之前,如果能先做一些对最终结果计算有帮助的计算,再去传输,就能节省一点网络传输时间。
说些更飘浮的内容啊,mr这种计算是为了算结果,也就是把数据的抽象程度变高了,那么,能越早的接近最终结果,越能节约时间。
适用场景
如果有hadoop基础就知道,map端combine和reduce端combine逻辑一致才能得到最终结果。
如果不是,那就是如果需要对单key的所有value放在一起才能计算的逻辑不合适做这种优化。
网友评论