GroupBy
假定一个RDD 存在十个分区, 当对这个RDD进行groupby之后得到一个新的RDD,相同字段的数据是否 处于同一个分区?
我测试结果查看是相同分组字段的数据被划分到了相同分区, 并且同一个分区可以存在其他字段的数据。
延伸问题:
(1)如果说相同字段的数据处于同一分区那么 groupBy之后得到的groupByRDD.mapValues 获取到这个字段对应的所有的values数据,当数据量大的时候
groupByRdd.mapValues(_.tolist().sortby )就会产生内存溢出。
这个理解对吗
//((bigdata,zhang),1) ((bigdata,li),1)
val sbTeaOne: RDD[((String, String), Int)] = subjectAndTeacher.map((_,1))
//根据key聚合
val subTeacherReduced: RDD[((String, String), Int)] = sbTeaOne.reduceByKey(+)
// 按照学科进行分组,在这种情况下一个分区内可以含有多个学科,当我们groupBy 之后按照学科进行分组,那么每一个组的键值就是学科,value就是数据,相同学科的数据处于同一个分区中;但是允许一个分区内存在多个学科。
val grouped: RDD[(String, Iterable[((String, String), Int)])] = subTeacherReduced.groupBy(_._1._1)
//查看学科名称和分区 List((0,bigdata), (1,ios), (3,JavaEE), (3,python), (3,php)) 0分区是bigdata,1 分区是ios 3分区中含有javaEE python
val keyPartitonIndexRdd: RDD[(String, String)] = grouped.mapPartitionsWithIndex((index: Int, iter: Iterator[(String, Iterable[((String, String), Int)])]) => {
var list: List[Tuple2[String, String]] = ListTuple2[String, String]
while (iter.hasNext) {
val tuple: (String, Iterable[((String, String), Int)]) = iter.next()
list = (index + "", tuple._1) :: list
}
list.iterator
})
print( keyPartitonIndexRdd.collect().toList)
//这个地方是按照数量进行排序
val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(.toList.sortBy(._2).reverse.take(1))//这种情况下是将学科的数据转为list,如果学科对应的数据太多则会的导致list的内存溢出
网友评论