spark-partitionBy

作者: 点点渔火 | 来源:发表于2018-04-15 18:04 被阅读0次

    partitionBy 重新分区, repartition默认采用HashPartition分区,
    关于数据倾斜https://www.jianshu.com/writer#/notebooks/11387253/notes/26676524
    自己设计合理的分区方法(比如数量比较大的key 加个随机数 随机分到更多的分区, 这样处理数据倾斜更彻底一些)

    def partitionBy(partitioner: Partitioner)
                    abstract class Partitioner extends Serializable {
      def numPartitions: Int
      def getPartition(key: Any): Int
    }
    
    //查看rdd1中每个分区的元素  
    val rdd1 =  rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    rdd1.mapPartitionsWithIndex{
          (partIdx,iter) => {
             val part_map = scala.collection.mutable.Map[String, List[(Int,Int)]]()
                 while(iter.hasNext){
                   val part_name = "part_" + partIdx
                     var elem = iter.next()
                     if(part_map.contains(part_name)) {
                         var elems = part_map(part_name)
                         elems ::= elem
                         part_map(part_name) = elems
                       } else {
                         part_map(part_name) = List[(Int,Int)]{elem}
                       }
                   }
                 part_map.iterator
    
             }
         }.collect 
    

    这里的分区方法可以选择, 默认的分区就是HashPartition分区,
    注意如果多次使用该RDD或者进行join操作, 分区后peresist持久化操作

    class HashPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    
      def numPartitions: Int = partitions
    
      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
    
      override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
          h.numPartitions == numPartitions
        case _ =>
          false
      }
    
      override def hashCode: Int = numPartitions
    }
    

    范围分区 RangePartitioner :先键值排序, 确定样本大小,采样后不放回总体的随机采样方法, 分配键值的分区,通过样本采样避免数据倾斜。
    源码理解参考: https://www.cnblogs.com/liuming1992/p/6377540.html

    val rdd2 = rdd.partitionBy(new org.apache.spark.RangePartitioner(3,rdd))
    rdd2.glom()
    

    自定义分区函数 自己根据业务数据减缓数据倾斜问题:
    要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法

    • numPartitions: Int:返回创建出来的分区数。
    • getPartition(key: Any): Int:返回给定键的分区编号( 0 到 numPartitions-1)。
    //自定义分区类,需继承Partitioner类
    class UsridPartitioner(numParts:Int) extends Partitioner{
      //覆盖分区数
      override def numPartitions: Int = numParts
      
      //覆盖分区号获取函数
      override def getPartition(key: Any): Int = {
         if(key.toString == "A")
               key.toString.toInt%10
         else:
              key.toString.toInt%5      
      }
    }
    

    相关文章

      网友评论

        本文标题:spark-partitionBy

        本文链接:https://www.haomeiwen.com/subject/ttfjkftx.html