美文网首页数客联盟
RDD mapPartitionsWithIndex 与mapP

RDD mapPartitionsWithIndex 与mapP

作者: Woople | 来源:发表于2016-10-12 17:57 被阅读0次

定义

Transformation Meaning
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

注意,func的返回值是Iterator<U>

  • mapPartitionsWithIndex
    通过下面发方法可以查看每个partition存储的内容是什么
//创建一个有3个partition的RDD
scala> val testRDD = sc.makeRDD(1 to 10, 3)
testRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:27

scala> testRDD.partitions.size
res1: Int = 3

scala> var newRDD = testRDD.mapPartitionsWithIndex {
     |   (index, partitionIterator) => {
     |     val partitionsMap = scala.collection.mutable.Map[Int, List[Int]]()
     |     var partitionList = List[Int]()
     | 
     |     while (partitionIterator.hasNext) {
     |       partitionList = partitionIterator.next() :: partitionList
     |     }
     | 
     |     partitionsMap(index) = partitionList
     |     partitionsMap.iterator//返回值
     |   }
     | }
newRDD: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:29

//每个RDD都存储了哪些元素
scala> newRDD.collect
res2: Array[(Int, List[Int])] = Array((0,List(3, 2, 1)), (1,List(6, 5, 4)), (2,List(10, 9, 8, 7)))
  • mapPartitions
    mapPartitions使用样例
scala> val newRDD = testRDD.mapPartitions { item => {
     |   var result = List[String]()
     | 
     |   while (item.hasNext) {
     |     result = (item.next() + 1).toString :: result
     |   }
     | 
     |   result = result ::: List("|")
     |   result.iterator//返回值
     | }
     | }
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at mapPartitions at <console>:29

//每个partition的内容用|分隔,和上一个用例的结果一致
scala> newRDD.collect
res15: Array[String] = Array(4, 3, 2, |, 7, 6, 5, |, 11, 10, 9, 8, |)

相关文章

网友评论

    本文标题:RDD mapPartitionsWithIndex 与mapP

    本文链接:https://www.haomeiwen.com/subject/qdfiyttx.html