美文网首页
spark task partition 并行度 线程

spark task partition 并行度 线程

作者: 王金松 | 来源:发表于2019-03-28 22:48 被阅读0次

partition分区

概念

分区概念spark的分区是RDD里的一个概念,RDD为分布式弹性工作集,因为数据量很大,所以RDD分布在各个节点分区里,我们操作RDD,实际上就是操作分区的数据

对应关系

spark parition和HDFS block的初始数量关系基本认为一对一
spark partition和kafka parition的初始数量关系也是一对一
没有shuffle过程,partition是不变的 ,经过shuffle,可以通过算子改变或者通过colease()和repaitition()改变

input split和block的关系

block是hdfs的概念 inputsplit是mr或者spark的概念
默认是一对一

partition的数目

对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
在Map阶段partition数目保持不变。
在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的

分区的方式

  • HashPartition
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3)
      .partitionBy(new HashPartitioner(3))
  • RangePartition
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3)
        .partitionBy(new RangePartitioner(3,counts))
  • CustomPatition
class CustomPartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts
 override def getPartition(key: Any): Int =
 {
       if(key==1)){
    0
 }else if 
(key==2){
1}else{ 2 }} override def equals(AcadGild: Any): Boolean = AcadGild match { case test: CustomPartitioner => test.numPartitions == numPartitions case _ => false }} 
scala> val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))

并行度设置

参考:https://www.jianshu.com/p/7442deb21ae0
一般情况下设置并行度=3 * executor-num * executor-cores

对于初始读取阶段

如果按照spark读取kafka来说,如果kafka的分区是30个,spark任务启动的时候executor-num*executor-cores=10最为合适

对于shuffle后的阶段

比如我们在读取的时候为30个分区,但是shuffle之后我们通过reduceByKey设置了100个分区 ,实际上并行度为100,那么executor-num*executor-cores=30-40比较合适

合理的分区数是多少?如何设置?

总核数=executor-cores * num-executor?
一般合理的分区数设置为总核数的2~3倍

并行度调优的方式

最简单粗暴的就是repartition算子,如果知道哪个步骤需要调整并行度,那么在这个步骤执行之前,调用repartition(${partitionNum})就可以了。有一定的shuffle消耗,但是有些情况下会提升程序的执行速度很多很多,特别是碎片化文件特别严重的时候有奇效。但是有个局限,这个输入的partition是写死的,不够灵活。
reduce算子指定partition,这个和repartition类似。
spark.defalut.parallelism   默认是没有值的,如果设置了值比如说10,是在shuffle的过程才会起作用(val rdd2 = rdd1.reduceByKey(+) //rdd2的分区数就是10,rdd1的分区数不受这个参数的影响)
spark.sql.shuffle.partitions //spark sql中shuffle过程中partitions的数量
val rdd3 = rdd1.join(rdd2)  rdd3里面partiiton的数量是由父RDD中最多的partition数量来决定,因此使用join算子的时候,增加父RDD中partition的数量
(关于spark.sql.shuffle.partitions和spark.defalut.parallelism 这两个参数配置。一个是在spark sql文档里的,描述上说(Configures the number of partitions to use when shuffling data for joins or aggregations.
)默认200。另一个是spark文档里的参数(Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.)。我所理解的是当使用当spark sql且两者都有设置,shuffle.partitions会起作用,如果不是spark sql的shuffle,需要defalut.parallelism才起作用)

并行度和partition的关系

parititon和task的关系

Task是Spark中最新的执行单元。RDD一般是带有partitions的,每个partition的在一个executor上的执行可以任务是一个Task。
每个Task执行的结果就是生成了目标RDD的一个partiton。
每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task

stage

宽依赖

父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle

窄依赖

父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖

stage划分

DAG根据宽依赖来划分stage,每个宽依赖的处理均会是一个stage的划分点。同一个stage中的多个操作会在一个task中完成。因为子RDD的分区仅依赖于父RDD的一个分区,因此这些步骤可以串行执行。


image.png

rdd

总体关系图

总体关系图

一个job 分为很多的stage,每个stage又包含很多的rdd,每个rdd在节点上是parition,parition就是task执行结果,任务在executor执行 一个worker包含很多executor

相关文章

网友评论

      本文标题:spark task partition 并行度 线程

      本文链接:https://www.haomeiwen.com/subject/ltxhbqtx.html