美文网首页
partitionBy (通过分区器进行分区)

partitionBy (通过分区器进行分区)

作者: yayooo | 来源:发表于2019-07-30 21:11 被阅读0次
  1. 作用:对pairRDD进行分区操作,通过指定的分区器决定数据计算的分区,spark默认使用的分区器是HashPartitioner

如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程(即前后分区个数不一致)。

源码:

  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

练习:

// 需求:创建一个4个分区的RDD,对其重新分区

package com.atguigu

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object Trans {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
    //构建spark上下文对象
    val sc = new SparkContext(conf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))
    val rdd1: RDD[(Int, (Int, String))] = rdd.mapPartitionsWithIndex((index, items) => {
      items.map((index, _))
    })
    rdd1.collect().foreach(println)

    println("*************")

    val rdd3: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
    val rdd4: RDD[(Int, (Int, String))] = rdd3.mapPartitionsWithIndex((index, items) => {
      items.map((index, _))
    })
    rdd4.collect().foreach(println)
    sc.stop()
  }
}


(1,(1,aaa))
(2,(2,bbb))
(3,(3,ccc))
***************
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))


自定义分区器

//将所有数据放到同一个分区·
package com.atguigu

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object Trans {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
    val sc = new SparkContext(conf)


    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))

    val rdd2: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
    val rdd3: RDD[(Int, (Int, String))] = rdd2.mapPartitionsWithIndex((index, datas) => {
      datas.map((index, _))
    })
    rdd3.collect().foreach(println)

    sc.stop()

  }
}

class MyPartitioner(num: Int) extends Partitioner{
  override def numPartitions: Int ={
    num
  }

  override def getPartition(key: Any): Int = {
    //让所有数据放到一个分区
    1
  }
}

(1,(1,aaa))
(1,(2,bbb))
(1,(3,ccc))

相关文章

  • partitionBy (通过分区器进行分区)

    作用:对pairRDD进行分区操作,通过指定的分区器决定数据计算的分区,spark默认使用的分区器是HashPar...

  • spark-partitionBy

    partitionBy 重新分区, repartition默认采用HashPartition分区,关于数据倾斜ht...

  • Transformation转换算子之Key-Value类型

    分类: partitionBy() 按照K重新分区 自定义分区 reduceByKey()按照K聚合V group...

  • SparkCore之RDD的转换Key-Value类型

    partitionBy案例 作用: 对pairRDD进行分区操作,如果原有的partionRDD和现有的parti...

  • vbox 扩容

    情景 linux 下三个概念 物理分区,pv分区,lv分区物理分区需要通过vboxmanager 来进行扩充创建,...

  • spark parquet分区:大量文件

    我正在尝试利用 spark分区。我想做点什么 data.write.partitionBy("key").parq...

  • linux新建分区

    概述 在linux下新建分区有两种办法 一、通过fdisk进行分区 二、通过命令dd创建大文件,虚拟出分区 fdi...

  • kafka05 开发自定义分区器

    开发自定义分区器 上一节我们看到,如果在发送消息的时候没有指定对应的分区,会使用默认分区器对消息进行分区,这一节我...

  • Spark-RDD分区器

    Spark中现在支持的分区器有Hash分区器和Range分区器,除此之外,用户也可以自定义分区方式。默认的分区方式...

  • 微服务篇:对事件流再分区

    事件流根据事件的键和事件划分逻辑进行分区。对于每个事件,应用事件分区器,并为要写入的事件选择一个分区。再分区是生成...

网友评论

      本文标题:partitionBy (通过分区器进行分区)

      本文链接:https://www.haomeiwen.com/subject/xzcnrctx.html