美文网首页
Spark 自定义分区

Spark 自定义分区

作者: 我的小猫不见了 | 来源:发表于2020-05-03 09:05 被阅读0次
1.要实现自定义分区需要实现一个 partitioner类

我这里是内部类

package scala

import org.apache.spark.{Partitioner, SparkConf, SparkContext, TaskContext}
/**
 * spark自定义分区
 */
object CoustomPartitioner {
  def main(args: Array[String]): Unit = {
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    Logger.getLogger("org").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress","false")

    val conf = new SparkConf().setAppName("Spark Streaming Jason").setMaster("local")
    val sc = new SparkContext(conf)
//我这里是初始化了
    val rdd = sc.parallelize(List(0,1,2,2,3,3,3,4))
    rdd.map((_,1)).partitionBy(new MyPartitioner(5)).foreachPartition(fp=>{
      println("分区ID:" + TaskContext.get.partitionId)
      fp.foreach(f=>{
        println(f)
      })
    })
  }
  class MyPartitioner(num:Int) extends Partitioner {

    override def numPartitions: Int = num
    override def getPartition(key: Any): Int = {
      if(key.toString.toInt  == 0){
        0  //这里返回的是分区 index
      }else if(key.toString.toInt  == 1){
        1
      }else if(key.toString.toInt  == 2){
        2
      }else if(key.toString.toInt  == 3){
        3
      }else{
        4
      }
    }
  }
}



类里面重写的getpartition 就是根据输入的key ,处理之后,得到了数字
数字是 分区的 index , 默认都是 从0开始 0,1,2,3,4,5 这样

最后的结果是

分区ID:0
(0,1)
分区ID:1
(1,1)
分区ID:2
(2,1)
(2,1)
分区ID:3
(3,1)
(3,1)
(3,1)
分区ID:4
(4,1)

相关文章

网友评论

      本文标题:Spark 自定义分区

      本文链接:https://www.haomeiwen.com/subject/nqorghtx.html