1. Background
分区是 RDD 中最重要的概念之一,RDD 的五大属性中,第一个描述的便是分区:
A list of partitions
分区也是后续很多重要概念的基础,比如宽窄依赖、广播变量等等。所以深入了解分区相关的问题是具备一定必要性的。
2. Basic
2.1 什么是分区?
分区( partition )是 RDD 在并行计算时的计算单元。当从 HDFS 中读取时,物理存储的分片称为 block,计算逻辑的分片称为 split,每一个 split 对应一个 partition。
2.2 为什么分区?
首先 spark 作为分布式计算引擎,无论是底层对接的分布式存储,还是自身的计算部分,都是基于分布式的。那么在最终计算时,若想发挥出多节点的并行计算优势,数据+逻辑必然是分布在多个节点并行执行的,这也是分区解决的最基本的问题。更重要的是,由于分布式程序中,通信的代价是非常大的,那么如果能减少 Shuffle 的次数,将大大提高执行效率。而良好的分区正是解决这个问题的关键。具体细节在后面章节详解。
2.3 分区和分区器?
之前谈论的一直是分区( partition ),但是还有一个重要概念不要和分区搞混了,那就是分区器( partitioner )。RDD 分区不一定需要分区器才能实现,root RDD 一般都不具备分区器,比如直接通过 makeRDD 从内存中生成的 RDD,或者 textFile 从文件系统中读取的 RDD 都是没有分区器的,他们分区的策略都是直接写在源码中而不依赖具体某一个分区器。使用分区器的场景一般都是在 pairRDD 中,因为有键的存在,所以分区时会根据键来进行分区,使用固定的分区器可以保证相同键的数据被分到同一个分区。我们可以通过
def partitionBy(partitioner: Partitioner)
来直接指定分区器并分区,也可以通过类似
def reduceByKey(partitioner: Partitioner, func: (V, V) => V)
在 transformation 操作(一般是 shuffle 算子)的同时指定分区器。当然也可以不指定而使用默认的分区器。分区器通过numPartitions
指定分区数量,并通过getPartition()
指定分区方式。
3. Deep
3.1 源码中的分区
源码中不使用分区器的分区有两种,一种是从内存中直接生成的 RDD ,另一种是从外部存储中生成的 RDD
3.1.1 内存生成 RDD 的分区
def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]
def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
一般通过 parallelize 函数从内存中创建 RDD,从函数参数列表中可以看到,我们可以直接手动指定分区数量,也可以使用默认分区数。而默认分区数的生成逻辑是先从配置中读取spark.default.parallelism
的值,如果读取不到则使用总CPU 数。分区逻辑可以自行阅读源码,这里不做展开。
3.1.2 外部存储生成 RDD 的分区
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String]
从外部存储中生成 RDD 一般通过 textFile 函数,这里指定分区数的参数名字并不是之前的 numSlices,而是 minPartitions。所以用户指定的分区数并不一定是最终的分区数,而是一个最小值。分区逻辑大体上分为两个步骤:
- 划分分区
- 原则:一个分区内只能包含一个文件的内容
- 步骤:计算出所有文件总大小,根据 minPartitions & SPLIT_SLOP(1.1) 得出分区数量、计算偏移量
- 读取数据
- 原则:一行数据是单次读取的最小单位
- 步骤:根据偏移量读取数据
下面通过一个具体案例进行讲解。首先创建在根目录下的 input 文件夹创建两个文件1.txt
和2.txt
内容分别是:
12
3456
和
78
901
回车符占一个字节,所以两个文件大小分别是 7 byte 和 6 byte
通过sc.textFile("input",5).saveAsTextFile("output")
对文件进行输出并查看分区的数量和分区的内容。
按照上述分区逻辑的两个步骤,首先是划分分区:
因为指定了最小分区数是5,用总大小 / 最小分区数即 13 / 5 = 2,即每个分区的分片大小是2 byte。因为这里数据量非常小,肯定达不到 SPLIT_SLOP 的要求( SPLIT_SLOP 的值的含义是当计算的出的分区大小不能整除时,剩余的数据量如果占分区不到百分之十,则将剩余的数据合并到上一个分区,否则单独再开一个分区)。接下来计算偏移量
1.txt:
[0,2],(2,4],(4,6],(6,7]
2.txt:
[0,2],(2,4),(4,6)
可见一共分了7个区,比最小分区数多了两个。
接下来读取数据:首先读取 1.txt 的 1 号分区的数据,结果是12,2号分区数据(2,4],因为最小读取单位是一行,所以直接将3456读取过来,3号和4号分区因为没有数据可读,所以是空的。同理2.txt文件的三个分区分别是78;901;空。经过验证结果和预期是一致的。
3.2 分区传递
分区在算子间传递时是会变化的,那么分区是如何变化的呢?
- 不改变分区的算子:
map、flatmap、foreach、foreachPartition。他们的特点有两个:1.不产生 shuffle 2.只有一个父 RDD - 改变分区的算子:
union 虽然不产生 shuffle,但是分区数会增加为 union 的所有分区数的总和
coalesce 不产生 shuffle,但是分区数会减少
shuffle 类算子:join、reducebykey、groupbykey、repartition。除了在使用这类 shuffle 算子时指定分区数或者分区器,更多时候我们只是单纯传递了一个函数,此时会使用默认分区器:
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
通过源码可以看到
当上游存在分区器时:
使用分区数最大的分区器作为下一个 rdd 的分区器,比如 join 时,取两个 rdd 中分区数最大的分区器作为下一个 rdd 的分区器。reducebykey,groupbykey这种则使用自身的分区器作为下一个 rdd 的分区器。
当上游不存在分区器时:
例如 KafakaRDD,没有覆盖 partitioner
。则没有默认的分区器。此时将使用 HashPartitioner,并且分区数量先取spark.default.parallelism
参数值,取不到则用上游 rdd 中所有分区数最大的分区数值。
3.3 分区优化
3.3.1 如何确定分区数
分区数、cpu数、数据量。三者是相辅相成的。一般情况下:一个 stage 内,1GB 数据一个分区(task)。3-5 个 task 对应一个核心。
3.3.2 如何利用分区器
利用分区器优化程序执行效率的本质是 Spark 内部知道各个算子操作是如何影响分区方式的。基于此有两种方式优化:
-
对于后续有多个 action 并且具有 shuffle 操作的 RDD,提前使用 partitinBy 指定分区器并 cache,以此来减少后续shuffle 操作。
提前使用分区器 - 在无需改变元素的 Key 时,使用 mapValues 而不是 map 方法,以此来让 Spark 内部明确的知道当前的分区器使用的分区方式并且 key 不会改变,从而进行相应的优化。
4. Ref
1.《Spark 快速大数据分析》
网友评论