美文网首页
repartition 和 coalesce 算子

repartition 和 coalesce 算子

作者: Jorvi | 来源:发表于2019-06-12 10:03 被阅读0次

repartition 和 coalesce 都是Transformation算子,都可以实现RDD的重新分区功能。

1. coalesce

1.1 源码
  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

根据方法体上的介绍:
该方法返回一个新的RDD(numPartitions个分区)。

如果从1000个分区变为100个分区,这将会产生一个窄依赖,没有shuffle过程,相反,新产生的100个分区中的每个分区都会拥有当前1000个分区中的10个。

但是,如果coalesce过程很剧烈(例如:numPartitions=1),可能导致计算发生在比你期望更少的节点上(例如当numPartitions=1时只有一个节点)。为了避免这种情况,可以将shuffle设置为true,这将会加入一个shuffle过程,那么coalesce之前的操作仍然按照之前的分区数并行执行(不用管coalesce重新设置了多少分区)

如果设置shuffle=true,那么可以将RDD重分区为更大的分区数,这对于如下情形非常有用:分区数较少(例如100)且有一些分区特别大(数据倾斜),调用coalesce(1000, shuffle = true)可以将数据分布到1000个分区上。

1.2 例子
public class WordCount1 implements ISparkJob {

    @Override
    public void run(JavaSparkContext sparkContext) {
        JavaRDD<String> javaRDD = sparkContext.parallelize(
                Arrays.asList("aaa bbb", "bbb ccc", "aaa", "bbb", "ccc", "aaa bbb", "aaa", "bbb", "ccc", "ddd"),
                5);

        // partition:5->1  shuffle=false
        javaRDD.coalesce(1, false).collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // partition:5->1  shuffle=true
        javaRDD.coalesce(1, true).collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // partition:5->10  shuffle=false
        javaRDD.coalesce(10, false).collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // partition:5->10  shuffle=true
        javaRDD.coalesce(10, true).collect();

        try {
            TimeUnit.MINUTES.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

运行过程:

  • 共有四个Job

  • Job0(partition:5->1,shuffle=false)

Job0只有一个Stage(ResultStage),没有shuffle过程,且Task数为1。
虽然原始RDD的分区数为5,但是coalesce操作将分区数变为了1,整个Stage过程的Task数由末尾分区数决定,所以Task数为1。
注意:整个Stage过程的Task数由末尾分区数决定。


  • Job1(partition:5->1,shuffle=true)

Job1有两个Stage(ResultStage和ShuffleMapStage),由于有shuffle过程,所以在shuffle处切分Stage。
ShuffleMapStage(原始RDD -> Shuffle Write)的Task数为5,因为原始RDD的分区数为5,且ShuffleMapStage中分区数未改变。
ResultStage(Shuffle Read -> Job结束)的Task数为1,因为coalesce
操作将分区数变为了1。


  • Job2(partition:5->10,shuffle=false)

Job2有一个Stage(ResultStage),没有shuffle过程,虽然coalesce操作中设置numPartitions=10,但是并不能生效,所以Task数为5。
注意:当shuffle=false时,设置numPartitions大于原始RDD分区数,无法生效。


  • Job3(partition:5->10,shuffle=true)

Job3有两个Stage(ResultStage和ShuffleMapStage),有shuffle过程,在shuffle处切分Stage。
ShuffleMapStage(原始RDD -> Shuffle Write)的Task数为5,因为原始RDD分区数为5,且Stage过程中未修改分区数。
ResultStage(Shuffle Read -> Job结束)的Task数为10,因为coalesce操作将分区数变为了10。

2. repartition

  /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

可见:repartition内部调用coalesce方法实现,且shuffle=true。

3. 关于stage的task数的测试

public class WordCount1 implements ISparkJob {

    @Override
    public void run(JavaSparkContext sparkContext) {
        JavaRDD<String> javaRDD = sparkContext.parallelize(
                Arrays.asList("aaa bbb", "bbb ccc", "aaa", "bbb", "ccc", "aaa bbb", "aaa", "bbb", "ccc", "ddd"),
                10);

        // shuffle=false
        javaRDD.coalesce(8, false)
                .coalesce(4, false)
                .coalesce(2, false)
                .coalesce(1, false)
                .collect();

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // shuffle=true
        javaRDD.coalesce(8, true)
                .coalesce(4, true)
                .coalesce(2, true)
                .coalesce(1, true)
                .collect();

        try {
            TimeUnit.MINUTES.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 共有两个Job

  • Job0(Shuffle=false)

Job0没有Shuffle,只有一个Stage,虽然Stage过程中多次重分区,但是Stage末尾将分区数设为了1,所以整个Stage的Task数就是1。


  • Job1(Shuffle=true)

Job1有Shuffle,根据Shuffle划分为5个Stage:
Stage1(原始RDD -> Shuffle Write)有10个task,因为原始RDD分区数为10,且Stage1中未修改分区数;
Stage2(Shuffle Read -> Shuffle Write)有8个task,因为Stage1将分区数设为8,且Stage2中未修改分区数;
Stage3(Shuffle Read -> Shuffle Write)有4个task,因为Stage2将分区数设为4,且Stage3中未修改分区数;
Stage4(Shuffle Read -> Shuffle Write)有2个task,因为Stage3将分区数设为2,且Stage4中未修改分区数;
Stage5(Shuffle Read -> Job结束)有1个task,因为Stage4将分区数设为1,且Stage5中未修改分区数;

因此,Stage的Task数可以通过整个Stage末尾RDD的分区数来判定。

相关文章

网友评论

      本文标题:repartition 和 coalesce 算子

      本文链接:https://www.haomeiwen.com/subject/xqhwtctx.html