RDD mapPartitionsWithIndex 与mapP

Transformation Meaning
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.


  • mapPartitionsWithIndex
scala> val testRDD = sc.makeRDD(1 to 10, 3)
testRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:27

scala> testRDD.partitions.size
res1: Int = 3

scala> var newRDD = testRDD.mapPartitionsWithIndex {
     |   (index, partitionIterator) => {
     |     val partitionsMap = scala.collection.mutable.Map[Int, List[Int]]()
     |     var partitionList = List[Int]()
     |     while (partitionIterator.hasNext) {
     |       partitionList = partitionIterator.next() :: partitionList
     |     }
     |     partitionsMap(index) = partitionList
     |     partitionsMap.iterator//返回值
     |   }
     | }
newRDD: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:29

scala> newRDD.collect
res2: Array[(Int, List[Int])] = Array((0,List(3, 2, 1)), (1,List(6, 5, 4)), (2,List(10, 9, 8, 7)))
  • mapPartitions
scala> val newRDD = testRDD.mapPartitions { item => {
     |   var result = List[String]()
     |   while (item.hasNext) {
     |     result = (item.next() + 1).toString :: result
     |   }
     |   result = result ::: List("|")
     |   result.iterator//返回值
     | }
     | }
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at mapPartitions at <console>:29

scala> newRDD.collect
res15: Array[String] = Array(4, 3, 2, |, 7, 6, 5, |, 11, 10, 9, 8, |)



