spark核心构件之partitioner

作者: 曾二爷耶 | 来源:发表于2019-02-01 09:43 被阅读5次

    spark 核心思想之一就是数据分区,将数据分成很多个part,一个一个的进行处理这样的设置达到了以下的目的。
    1、实现分布式
    2、可以减少内存占用
    3、还能方便的做任务重跑
    4、而且将统一个key的数据聚集到一起,方便join、group等操作

    一、partitioner的定义

    1.1 partition

    首先我们来看下partition的定义

    //Partition.scala
    trait Partition extends Serializable {
      /**
       * Get the partition's index within its parent RDD
       */
      def index: Int
    ……  后面是实现序列化必须的两个方法  ……
    }
    

    其实就这么简单,仅仅是做为一个分区的标识,不携带任何的数据。真正的数据在spark里面其实是以Iterator(迭代器)的形式进行交流的,关注公众号其他的文章就知道了。

    2.1 partitioner

    partitioner 故名思意就是确定每条数据应该发往哪个partition的分区器,也就是计算Partition.index的计算器。
    我们来看下定义:

    //Partitioner.scala
    abstract class Partitioner extends Serializable {
      def numPartitions: Int
      def getPartition(key: Any): Int
    }
    

    是不是又是很简单,就一个方法一个属性。getPartition这个定义清晰明了,就是传一个key进来返回这个key所在的分区的idex。
    我们来看个实现的例子——最常用的HashPartitioner:

    //Partitioner.scala
    class HashPartitioner(partitions: Int) extends Partitioner {
      def numPartitions: Int = partitions
      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }
    }
    

    其实就是key的hashcode对分区数据取模,是不是easy的不敢相信,spark就是这么简单,因为整体设计的巧妙使得每一部分都是那么的简单

    二、partitioner的应用

    我们从之前的文章知道spark的运行都是划分了stage进行执行的,每个stage里面的task都是可以并行的,每个task处理的分区都是不一样的,思考以下我们就能得到下面的结论,stage里面每条数据所在的partition肯定都是不会变的,不同的stage之间就可能会不一样了,所以partitioner起作用的时机肯定是在shuffle的时候咯。

    在每个stage完成进行shufflewrite的时候(关于shuffle参考之前的文章)使用partitioner来确定每条数据的去向,下一个stage开始shuffleread就能通过index拿到对应的数据了,例如进行groupby的候partitioner将相同key的数据发往同一个分区,下一个stage进行合并就完成了groupby的操作。

    三、自己思考时间

    大家可以思考下如果要实现全局排序,partitioner会在其中发挥怎么样的功能,欢迎在留言中一起讨论。

    相关文章

      网友评论

        本文标题:spark核心构件之partitioner

        本文链接:https://www.haomeiwen.com/subject/kaursqtx.html