Transformation | Meaning |
---|---|
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
注意,func的返回值是Iterator<U>
- mapPartitionsWithIndex
通过下面发方法可以查看每个partition存储的内容是什么
//创建一个有3个partition的RDD
scala> val testRDD = sc.makeRDD(1 to 10, 3)
testRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:27
scala> testRDD.partitions.size
res1: Int = 3
scala> var newRDD = testRDD.mapPartitionsWithIndex {
| (index, partitionIterator) => {
| val partitionsMap = scala.collection.mutable.Map[Int, List[Int]]()
| var partitionList = List[Int]()
|
| while (partitionIterator.hasNext) {
| partitionList = partitionIterator.next() :: partitionList
| }
|
| partitionsMap(index) = partitionList
| partitionsMap.iterator//返回值
| }
| }
newRDD: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:29
//每个RDD都存储了哪些元素
scala> newRDD.collect
res2: Array[(Int, List[Int])] = Array((0,List(3, 2, 1)), (1,List(6, 5, 4)), (2,List(10, 9, 8, 7)))
- mapPartitions
mapPartitions使用样例
scala> val newRDD = testRDD.mapPartitions { item => {
| var result = List[String]()
|
| while (item.hasNext) {
| result = (item.next() + 1).toString :: result
| }
|
| result = result ::: List("|")
| result.iterator//返回值
| }
| }
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at mapPartitions at <console>:29
//每个partition的内容用|分隔,和上一个用例的结果一致
scala> newRDD.collect
res15: Array[String] = Array(4, 3, 2, |, 7, 6, 5, |, 11, 10, 9, 8, |)
网友评论