repartition 和 coalesce 都是Transformation算子,都可以实现RDD的重新分区功能。
1. coalesce
1.1 源码
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
根据方法体上的介绍:
该方法返回一个新的RDD(numPartitions个分区)。
如果从1000个分区变为100个分区,这将会产生一个窄依赖,没有shuffle过程,相反,新产生的100个分区中的每个分区都会拥有当前1000个分区中的10个。
但是,如果coalesce过程很剧烈(例如:numPartitions=1),可能导致计算发生在比你期望更少的节点上(例如当numPartitions=1时只有一个节点)。为了避免这种情况,可以将shuffle设置为true,这将会加入一个shuffle过程,那么coalesce之前的操作仍然按照之前的分区数并行执行(不用管coalesce重新设置了多少分区)
如果设置shuffle=true,那么可以将RDD重分区为更大的分区数,这对于如下情形非常有用:分区数较少(例如100)且有一些分区特别大(数据倾斜),调用coalesce(1000, shuffle = true)
可以将数据分布到1000个分区上。
1.2 例子
public class WordCount1 implements ISparkJob {
@Override
public void run(JavaSparkContext sparkContext) {
JavaRDD<String> javaRDD = sparkContext.parallelize(
Arrays.asList("aaa bbb", "bbb ccc", "aaa", "bbb", "ccc", "aaa bbb", "aaa", "bbb", "ccc", "ddd"),
5);
// partition:5->1 shuffle=false
javaRDD.coalesce(1, false).collect();
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// partition:5->1 shuffle=true
javaRDD.coalesce(1, true).collect();
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// partition:5->10 shuffle=false
javaRDD.coalesce(10, false).collect();
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// partition:5->10 shuffle=true
javaRDD.coalesce(10, true).collect();
try {
TimeUnit.MINUTES.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行过程:
- 共有四个Job
-
Job0(partition:5->1,shuffle=false)
Job0只有一个Stage(ResultStage),没有shuffle过程,且Task数为1。
虽然原始RDD的分区数为5,但是coalesce操作将分区数变为了1,整个Stage过程的Task数由末尾分区数决定,所以Task数为1。
注意:整个Stage过程的Task数由末尾分区数决定。
-
Job1(partition:5->1,shuffle=true)
Job1有两个Stage(ResultStage和ShuffleMapStage),由于有shuffle过程,所以在shuffle处切分Stage。
ShuffleMapStage(原始RDD -> Shuffle Write)的Task数为5,因为原始RDD的分区数为5,且ShuffleMapStage中分区数未改变。
ResultStage(Shuffle Read -> Job结束)的Task数为1,因为coalesce
操作将分区数变为了1。
-
Job2(partition:5->10,shuffle=false)
Job2有一个Stage(ResultStage),没有shuffle过程,虽然coalesce操作中设置numPartitions=10,但是并不能生效,所以Task数为5。
注意:当shuffle=false时,设置numPartitions大于原始RDD分区数,无法生效。
-
Job3(partition:5->10,shuffle=true)
Job3有两个Stage(ResultStage和ShuffleMapStage),有shuffle过程,在shuffle处切分Stage。
ShuffleMapStage(原始RDD -> Shuffle Write)的Task数为5,因为原始RDD分区数为5,且Stage过程中未修改分区数。
ResultStage(Shuffle Read -> Job结束)的Task数为10,因为coalesce操作将分区数变为了10。
2. repartition
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
可见:repartition内部调用coalesce方法实现,且shuffle=true。
3. 关于stage的task数的测试
public class WordCount1 implements ISparkJob {
@Override
public void run(JavaSparkContext sparkContext) {
JavaRDD<String> javaRDD = sparkContext.parallelize(
Arrays.asList("aaa bbb", "bbb ccc", "aaa", "bbb", "ccc", "aaa bbb", "aaa", "bbb", "ccc", "ddd"),
10);
// shuffle=false
javaRDD.coalesce(8, false)
.coalesce(4, false)
.coalesce(2, false)
.coalesce(1, false)
.collect();
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// shuffle=true
javaRDD.coalesce(8, true)
.coalesce(4, true)
.coalesce(2, true)
.coalesce(1, true)
.collect();
try {
TimeUnit.MINUTES.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-
共有两个Job
-
Job0(Shuffle=false)
Job0没有Shuffle,只有一个Stage,虽然Stage过程中多次重分区,但是Stage末尾将分区数设为了1,所以整个Stage的Task数就是1。
-
Job1(Shuffle=true)
Job1有Shuffle,根据Shuffle划分为5个Stage:
Stage1(原始RDD -> Shuffle Write)有10个task,因为原始RDD分区数为10,且Stage1中未修改分区数;
Stage2(Shuffle Read -> Shuffle Write)有8个task,因为Stage1将分区数设为8,且Stage2中未修改分区数;
Stage3(Shuffle Read -> Shuffle Write)有4个task,因为Stage2将分区数设为4,且Stage3中未修改分区数;
Stage4(Shuffle Read -> Shuffle Write)有2个task,因为Stage3将分区数设为2,且Stage4中未修改分区数;
Stage5(Shuffle Read -> Job结束)有1个task,因为Stage4将分区数设为1,且Stage5中未修改分区数;
因此,Stage的Task数可以通过整个Stage末尾RDD的分区数来判定。
网友评论