美文网首页
repartition

repartition

作者: yayooo | 来源:发表于2019-07-30 20:53 被阅读0次

    源码:

      def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
      }
    

    可以看出,repartiton实质是调用coalesce()方法,并且给第二个参数设置为true。
    所以repartiton一定有shuffer过程

    package com.atguigu
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    
    object Trans {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
        //构建spark上下文对象
        val sc = new SparkContext(conf)
    
        val rdd: RDD[Int] = sc.makeRDD(1 to 16, 4)
        val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, items)/** (1,list(1,2,3,4)... */
        => {
          items.map((index, _))
        })
        rdd1.collect().foreach(println)
        println("********")
    
        val rdd2: RDD[Int] = rdd.repartition(2)
        val rdd3: RDD[(Int, Int)] = rdd2.mapPartitionsWithIndex((index, items) => {
          items.map((index, _))
        })
        rdd3.collect().foreach(println)
    
        sc.stop()
      }
    }
    

    (0,1)
    (0,2)
    (0,3)
    (0,4)
    (1,5)
    (1,6)
    (1,7)
    (1,8)
    (2,9)
    (2,10)
    (2,11)
    (2,12)
    (3,13)
    (3,14)
    (3,15)
    (3,16)
    ********
    (0,1)
    (0,3)
    (0,5)
    (0,7)
    (0,9)
    (0,11)
    (0,13)
    (0,15)
    (1,2)
    (1,4)
    (1,6)
    (1,8)
    (1,10)
    (1,12)
    (1,14)
    (1,16)

    使用glom()来显示

    package com.atguigu
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    
    object Trans {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
        //构建spark上下文对象
        val sc = new SparkContext(conf)
    
        val rdd: RDD[Int] = sc.makeRDD(1 to 16, 4)
        val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, items)/** (1,list(1,2,3,4)... */
        => {
          items.map((index, _))
        })
        rdd1.collect().foreach(println)
        println("********")
    
        val rdd2: RDD[Int] = rdd.repartition(2)
        rdd2.glom().collect().foreach(list => {
          println("************")
          list.foreach(println)
        })
    
        sc.stop()
      }
    }
    
    

    相关文章

      网友评论

          本文标题:repartition

          本文链接:https://www.haomeiwen.com/subject/umfnrctx.html