- 作用:对pairRDD进行分区操作,通过指定的分区器决定数据计算的分区,spark默认使用的分区器是HashPartitioner
如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程(即前后分区个数不一致)。
源码:
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
练习:
// 需求:创建一个4个分区的RDD,对其重新分区
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
//构建spark上下文对象
val sc = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))
val rdd1: RDD[(Int, (Int, String))] = rdd.mapPartitionsWithIndex((index, items) => {
items.map((index, _))
})
rdd1.collect().foreach(println)
println("*************")
val rdd3: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
val rdd4: RDD[(Int, (Int, String))] = rdd3.mapPartitionsWithIndex((index, items) => {
items.map((index, _))
})
rdd4.collect().foreach(println)
sc.stop()
}
}
(1,(1,aaa))
(2,(2,bbb))
(3,(3,ccc))
***************
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))
自定义分区器
//将所有数据放到同一个分区·
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))
val rdd2: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
val rdd3: RDD[(Int, (Int, String))] = rdd2.mapPartitionsWithIndex((index, datas) => {
datas.map((index, _))
})
rdd3.collect().foreach(println)
sc.stop()
}
}
class MyPartitioner(num: Int) extends Partitioner{
override def numPartitions: Int ={
num
}
override def getPartition(key: Any): Int = {
//让所有数据放到一个分区
1
}
}
(1,(1,aaa))
(1,(2,bbb))
(1,(3,ccc))
网友评论