源码:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
可以看出,repartiton实质是调用coalesce()方法,并且给第二个参数设置为true。
所以repartiton一定有shuffer过程
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
//构建spark上下文对象
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 16, 4)
val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, items)/** (1,list(1,2,3,4)... */
=> {
items.map((index, _))
})
rdd1.collect().foreach(println)
println("********")
val rdd2: RDD[Int] = rdd.repartition(2)
val rdd3: RDD[(Int, Int)] = rdd2.mapPartitionsWithIndex((index, items) => {
items.map((index, _))
})
rdd3.collect().foreach(println)
sc.stop()
}
}
(0,1)
(0,2)
(0,3)
(0,4)
(1,5)
(1,6)
(1,7)
(1,8)
(2,9)
(2,10)
(2,11)
(2,12)
(3,13)
(3,14)
(3,15)
(3,16)
********
(0,1)
(0,3)
(0,5)
(0,7)
(0,9)
(0,11)
(0,13)
(0,15)
(1,2)
(1,4)
(1,6)
(1,8)
(1,10)
(1,12)
(1,14)
(1,16)
使用glom()来显示
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
//构建spark上下文对象
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 16, 4)
val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, items)/** (1,list(1,2,3,4)... */
=> {
items.map((index, _))
})
rdd1.collect().foreach(println)
println("********")
val rdd2: RDD[Int] = rdd.repartition(2)
rdd2.glom().collect().foreach(list => {
println("************")
list.foreach(println)
})
sc.stop()
}
}
网友评论