美文网首页
partitionBy (通过分区器进行分区)

partitionBy (通过分区器进行分区)

作者: yayooo | 来源:发表于2019-07-30 21:11 被阅读0次
    1. 作用:对pairRDD进行分区操作,通过指定的分区器决定数据计算的分区,spark默认使用的分区器是HashPartitioner

    如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程(即前后分区个数不一致)。

    源码:

      def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
        if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
        if (self.partitioner == Some(partitioner)) {
          self
        } else {
          new ShuffledRDD[K, V, V](self, partitioner)
        }
      }
    

    练习:

    // 需求:创建一个4个分区的RDD,对其重新分区
    
    package com.atguigu
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    
    object Trans {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
        //构建spark上下文对象
        val sc = new SparkContext(conf)
    
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))
        val rdd1: RDD[(Int, (Int, String))] = rdd.mapPartitionsWithIndex((index, items) => {
          items.map((index, _))
        })
        rdd1.collect().foreach(println)
    
        println("*************")
    
        val rdd3: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
        val rdd4: RDD[(Int, (Int, String))] = rdd3.mapPartitionsWithIndex((index, items) => {
          items.map((index, _))
        })
        rdd4.collect().foreach(println)
        sc.stop()
      }
    }
    
    
    

    (1,(1,aaa))
    (2,(2,bbb))
    (3,(3,ccc))
    ***************
    (0,(2,bbb))
    (1,(1,aaa))
    (1,(3,ccc))


    自定义分区器

    //将所有数据放到同一个分区·
    package com.atguigu
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    
    object Trans {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
    
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))
    
        val rdd2: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
        val rdd3: RDD[(Int, (Int, String))] = rdd2.mapPartitionsWithIndex((index, datas) => {
          datas.map((index, _))
        })
        rdd3.collect().foreach(println)
    
        sc.stop()
    
      }
    }
    
    class MyPartitioner(num: Int) extends Partitioner{
      override def numPartitions: Int ={
        num
      }
    
      override def getPartition(key: Any): Int = {
        //让所有数据放到一个分区
        1
      }
    }
    
    

    (1,(1,aaa))
    (1,(2,bbb))
    (1,(3,ccc))

    相关文章

      网友评论

          本文标题:partitionBy (通过分区器进行分区)

          本文链接:https://www.haomeiwen.com/subject/xzcnrctx.html