spark 核心思想之一就是数据分区,将数据分成很多个part,一个一个的进行处理这样的设置达到了以下的目的。
1、实现分布式
2、可以减少内存占用
3、还能方便的做任务重跑
4、而且将统一个key的数据聚集到一起,方便join、group等操作
一、partitioner的定义
1.1 partition
首先我们来看下partition的定义
//Partition.scala
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
…… 后面是实现序列化必须的两个方法 ……
}
其实就这么简单,仅仅是做为一个分区的标识,不携带任何的数据。真正的数据在spark里面其实是以Iterator(迭代器)的形式进行交流的,关注公众号其他的文章就知道了。
2.1 partitioner
partitioner 故名思意就是确定每条数据应该发往哪个partition的分区器,也就是计算Partition.index的计算器。
我们来看下定义:
//Partitioner.scala
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
是不是又是很简单,就一个方法一个属性。getPartition这个定义清晰明了,就是传一个key进来返回这个key所在的分区的idex。
我们来看个实现的例子——最常用的HashPartitioner:
//Partitioner.scala
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
}
其实就是key的hashcode对分区数据取模,是不是easy的不敢相信,spark就是这么简单,因为整体设计的巧妙使得每一部分都是那么的简单。
二、partitioner的应用
我们从之前的文章知道spark的运行都是划分了stage进行执行的,每个stage里面的task都是可以并行的,每个task处理的分区都是不一样的,思考以下我们就能得到下面的结论,stage里面每条数据所在的partition肯定都是不会变的,不同的stage之间就可能会不一样了,所以partitioner起作用的时机肯定是在shuffle的时候咯。
在每个stage完成进行shufflewrite的时候(关于shuffle参考之前的文章)使用partitioner来确定每条数据的去向,下一个stage开始shuffleread就能通过index拿到对应的数据了,例如进行groupby的候partitioner将相同key的数据发往同一个分区,下一个stage进行合并就完成了groupby的操作。
三、自己思考时间
大家可以思考下如果要实现全局排序,partitioner会在其中发挥怎么样的功能,欢迎在留言中一起讨论。
网友评论