spark分区与任务切分

作者: Tim在路上 | 来源:发表于2019-04-11 16:08 被阅读3次

    我们都知道在spark中,RDD是其基本的抽象数据集,其中每个RDD由多个Partition组成。在job的运行期间,参与运算的Parttion数据分布在多台机器中,进行并行计算,所以分区是计算大数据量的措施。

    分区数越多越好吗?

    不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。

    分区太少有什么影响?

    分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。分区的目的就是要避免存在单任务处理时间过长。

    合理的分区数是多少?如何设置?

    总核数=executor-cores * num-executor?

    一般合理的分区数设置为总核数的2~3倍

    分区数就是任务数吗?

    一般来说任务数对应为分区数量,默认情况下为每一个HDFS分区创建一个分区,默认为128MB,但如果文件中的行太长(比块大小更长),则分区将会更少。RDD创建与HDFS分区一致数量的分区。

    当使用textFile压缩文件(file.txt.gz不是file.txt或类似的)时,Spark禁用拆分,这使得只有1个分区的RDD(因为对gzip文件的读取无法并行化)。在这种情况下,要更改应该重新分区的分区数

    但有时候你需要为你的应用程序,调整分区的大小,或者使用另一种分区方案。

    设置多大分区数 ?

    Spark只能为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量。因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是该分区的2-3倍)。

    此外,分区数决定了将RDD保存到文件的操作生成的文件数。

    划分RDD:repartition

    repartition(numPartitions: Int)
    
    rdd = sc.textFile('demo.gz')
    rdd = rdd.repartition(100)
    

    请注意,Spark禁用拆分压缩文件,并创建只有1个分区的RDD。在这种情况下,使用sc.textFile('demo.gz')和重新分区是有帮助的,rdd.repartition(100)

    rdd.repartition(N)做一个shuffle分割数据来匹配N

    划分RDD:coalesce

    coalesce(numPartitions: Int, shuffle: Boolean = false)
    

    该coalesce转变是用来改变分区的数量。它可以根据标志触发RDD混洗shuffle(默认情况下禁用,即false)。

    shuffle = true 和repartition是一致的。

    分区的3种方式

    1.HashPartitioner

     val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new HashPartitioner(3))
    

    HashPartitioner确定分区的方式:partition = key.hashCode () % numPartitions

    2.RangePartitioner

    val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new RangePartitioner(3,counts))
    

    RangePartitioner会对key值进行排序,然后将key值被划分成3份key值集合。
    3.CustomPartitioner

    val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))
    
    class CustomPartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int =
    {
          if(key==1)){
       0
    }else if 
    (key==2){
    1}else{ 2 }} override def equals(AcadGild: Any): Boolean = AcadGild match { case test: CustomPartitioner => test.numPartitions == numPartitions case _ => false }}
    

    CustomPartitioner可以根据自己具体的应用需求,自定义分区。

    相关文章

      网友评论

        本文标题:spark分区与任务切分

        本文链接:https://www.haomeiwen.com/subject/oytxwqtx.html