美文网首页
Spark并行度优化 增加cores,增加partition 还

Spark并行度优化 增加cores,增加partition 还

作者: pcqlegend | 来源:发表于2019-01-24 21:42 被阅读0次

    Spark的官方文档中关于并行度的介绍有一段介绍:

    Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config property spark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.

    翻译下:如果你的操作的并行度不够高的话,集群的资源就不能被充分利用。Spark会根据文件的大小自动的设置每个文件上的map task的数量。当然你可以通过一些可选的参数来控制(比如 SparkContext.textFile)。对于分布式的reduce操作,比如 groupByKey和reduceByKey,使用的是最大的父RDD的分区数作为并行度。你可以通过第二个参数传入并行度,或者设置spark.default.parallelism,一般情况下,我们对于每个cpu的core建议分配2-3个task。

    这里我们在看些Spark spark.default.parallelism 这个参数的描述

    For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
    Local mode: number of cores on the local machine
    Mesos fine grained mode: 8
    Others: total number of cores on all executor nodes or 2, whichever is larger
    Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.
    对于shuffle操作,并行度取决于父RDD的最大的分区数。对于没有父RDD的并行操作,取决于cluster manager。
    本地模式下:本地cores的数量
    Mesos:8
    其他:所有executor的cores的数量,2两个取最大的一个。

    override def defaultParallelism(): Int = {
        conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
      }
    

    也就是说如果你Spark 运行在yarn上的时候 按照这个设定并行度。但是看完之后,还是不太清楚如何提高数据处理的并行度呢?直接这设置了并行度参数就一定会生效吗?

    举个生产环境的优化的例子

    Job业务背景

    有一个SparkStreaming的job,使用Direct的方式接收kafka的日志,用于处理用户的部分实时特征,并将计算结果实时的写入redis。但是kafka的topic两个,每个topic分区只有1个。5s一个batch,数据量一个batch 1w。spark.executor.cores=1,spark.executor.instances=3

    Job问题描述

    batch的process的time经常会出现超过5s的情况。


    image.png

    问题排查

    经过查看job stage的执行情况,看到job的每个batch需要写入1w/2=5000的数据,因为是逐条写入,每次2ms的话,因为不是批量写入,所以写入还是比较慢的。

    解决方案有两个

    方案一 使用批量的接口,写入数据,方案可行,调研后发现可以使用redis的pipeline。
    方案二 增加并行度 这个比较好实现,将数据进行repartition,这样数据分区后,就可以并行执行了,时间就会减少。

    方案决定

    考虑到之前做过类似的写入逻辑,可以直接通过增加并行度的方法解决,所以我直接修改代码,将原来的stream.repartition(6),然后spark.executor.instances=7(一般我们这会多留一个instance,据说可以在某个instance挂掉的时候,直接使用多余的executor而不需要申请,没验证过,不确定),部署执行,发现每个batch的处理时间还是5s左右,并没有减少。于是打开stage对应的task查看,发现task的数量是从原来的2增加到6了。但是奇怪的是,这6个task是在同一个executor上执行的,并且是顺序执行。那这样的就和数据没有做分区的效果是一样的了。


    image.png
    image.png

    好了,我们分析一下这个问题出现的原因:

    因为我们接受的数据只有两个topic都只有一个分区,并且进行了union,所以数据接收后只会有一个分区,这个时候进行repartition操作只是将数据拆分成6个新的分区,因为executor上的并行度默认值是1,这个时候虽然你新增到6个instance ,每个1个core应该是6*1=6 个并行度.但是因为你的kafka只有一个分区,也就是说父RDD只有一个分区,数据接收后只在一台机器上。想象中此时数据会被分发到六台机器上执行,从图中执行情况,并没有,task都是在同一台机器上执行的,而且是顺序执行。为什么呢?因为任务调度策略 Data locality .那我们又要开始看一下Spark的官方文档了

    • PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality -possible
    • NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
    • NO_PREF data is accessed equally quickly from anywhere and has no locality preference
    • RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
    • ANY data is elsewhere on the network and not in the same rack

    Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.

    Spark 在选择调度的时候会按照数据本地化顺序进行调度,如果超过某个时间仍然没有被调度的话,才会选择下一个调度级别。如果当前没有未处理的数据,Spark就会降低本地化的层级。两个选择 a 在数据所在的机器上一直等待cpu空闲在启动task b 立即启动一个新的task在一个远程机器上,这就要求需要移动数据了。

    What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see thespark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.

    Spark做法就是等待一段时间直到cpu空闲。一旦超时才会移动数据到其他。这个值是3s。
    可以看下我这儿的验证如下图所示 executor上只分配一个core,spark.locality.wait=1 看下任务执行情况,可以看到第四个任务启动的时候,时间从10:31:26 从任务开始的时间(10:31:25)开始刚刚过了1s。


    image.png


    再看数据的locality情况,下图中可以看到,有三个task是locality level 是NODE_LOCAL,另外三个task是RACK_LOCAL


    image.png
    可见我们的推断是正确的。
    这儿在看一下源代码

    org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet

     private def resourceOfferSingleTaskSet(
          taskSet: TaskSetManager,
          maxLocality: TaskLocality,
          shuffledOffers: Seq[WorkerOffer],
          availableCpus: Array[Int],
          tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
        var launchedTask = false
        // nodes and executors that are blacklisted for the entire application have already been
        // filtered out by this point
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          if (availableCpus(i) >= CPUS_PER_TASK) {
            try {
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                tasks(i) += task
                val tid = task.taskId
                taskIdToTaskSetManager(tid) = taskSet
                taskIdToExecutorId(tid) = execId
                executorIdToRunningTaskIds(execId).add(tid)
                availableCpus(i) -= CPUS_PER_TASK
                assert(availableCpus(i) >= 0)
                launchedTask = true
              }
            } catch {
              case e: TaskNotSerializableException =>
                logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
                // Do not offer resources for this task, but don't throw an error to allow other
                // task sets to be submitted.
                return launchedTask
            }
          }
        }
        return launchedTask
      }
    
     /**
       * Get the level we can launch tasks according to delay scheduling, based on current wait time.
       */
      private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
        // Remove the scheduled or finished tasks lazily
        def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
          var indexOffset = pendingTaskIds.size
          while (indexOffset > 0) {
            indexOffset -= 1
            val index = pendingTaskIds(indexOffset)
            if (copiesRunning(index) == 0 && !successful(index)) {
              return true
            } else {
              pendingTaskIds.remove(indexOffset)
            }
          }
          false
        }
        // Walk through the list of tasks that can be scheduled at each location and returns true
        // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
        // already been scheduled.
        def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
          val emptyKeys = new ArrayBuffer[String]
          val hasTasks = pendingTasks.exists {
            case (id: String, tasks: ArrayBuffer[Int]) =>
              if (tasksNeedToBeScheduledFrom(tasks)) {
                true
              } else {
                emptyKeys += id
                false
              }
          }
          // The key could be executorId, host or rackId
          emptyKeys.foreach(id => pendingTasks.remove(id))
          hasTasks
        }
    

    到了这儿我们就明白了,为什么我们做了repartition之后仍然不能降低处理时间的原因了。

    最终方案

    a 减少spark.locality.wait 时间,这样数据会移动,可能移动的数据时间,也会超过数据处理时间。
    b 增加spark.executor.cores 完美解决,改完之后每个batch处理时间0.7s,如下图所示


    image.png

    总结

    • 一句话总结上面 这个case, 增加rdd的分区数一定会增加task的数量,但是不一定会减少处理时长,还有可能受到数据本地化和core数量的影响。*

    所以关于并行度的优化,大家可能需要考虑的点

    1 如果原始数据分区数较多,那么可以直接分配对应数量的core,也就是启动对应数量的task。但是要注意一下,每个instance上的task一般每个cpu分配2-3个task。
    2 如果原始数据分区较多,目前还是处理比较慢,则可以加repartition操作,提高并行度。但是这个时候必须满足的场景是,移动数据的时间,远远小于数据处理需要的时间。对于streaming的job而言,感觉不是很合适,比如当前的例子。这个方法比较适合处理较慢的job,大量计算或者io操作,为了避免等待本地化的策略,可以直接减少spark.local.wait。
    3 对于原始数据分区较少的数据,那么可以增加每个executor上的core的数量,然后repartition,来增加并行度。所以只是增加core的数量是不行的,只是进行repartition也是不行的,单纯增加executor的数量自然也是不行的。比较适合场景,单个分区内处理较慢的,且数据量不太大的场景。

    相关文章

      网友评论

          本文标题:Spark并行度优化 增加cores,增加partition 还

          本文链接:https://www.haomeiwen.com/subject/hflujqtx.html