美文网首页
Spark 动态资源分配下数据本地性导致的作业运行缓慢

Spark 动态资源分配下数据本地性导致的作业运行缓慢

作者: jinxing | 来源:发表于2016-12-04 11:49 被阅读0次

    CoarseGrainedSchedulerBackend 以 spark.scheduler.revive.interval 默认1s调用makeoffers(), 在分配到的executor上调度task;makeoffers() 中scheduler.resourceOffers(workerOffers)产生可执行的task策略,包含task到executor的映射;
    每一个stage对应的tasks都由一个TaskSetManager管理,分配策略由以下生成:

    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels){
      do {
        launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers,   availableCpus, tasks)
      } while (launchedTask)}
    

    其中myLocalityLevels是对应taskSet中所包含的所有本地性偏好级别,包括PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY;
    shuffledOffers是对executor offer的随机化处理,taskSet整个分配过程是两层for循环:
    第一层for循环就是上面的maxLocality.taskSet.myLocalityLevels;
    第二层for循环在resourceOfferSingleTaskSet,对offer中的每一个executor进行判断,是否有一个task能够满足本地性偏好,和executor绑定一起,形成一个执行略;
    其实还有第三层for循环,就是dequeueTask;
    重点看TaskSetManager::resourceOffer:
    一个关键的问题就是如何通过延迟机制保证数据本地行,其实现方法就在getAllowedLocalityLevel,spark关于延迟调度由三个参数:
    spark.locality.wait.process, spark.locality.wait.node, spark.locality.wait.rack, 默认3s;
    TaskSetManager中记录了lastLaunchTime,如果当前时间减去lastLaunchTime大于上面的值,对应getAllowedLocalityLevel就返回允许的本地偏好级别;
    在遍历过程中,第一层的for循环的locality和getAllowedLocalityLevel的返回值取最小值,然后执行dequeueTask,如果dequeueTask如果返回Some(_),则更新lastLaunchTime和currentLocalityIndex;

    这就带来一个问题,我们举一个极端的例子:
    有100个executor和100个task,每两个executor一个node,每四个executor一个rack,100个task的本地便好全都是到executor1的process,则整个调度过程如下:

    1. 第一次dequeue task1到executor1,之后executor2~executor100的遍历,由于本地性原因,全部调度失败,并且dequeueTask导致currentLocalityIndex=0;
    2. 3s过后,currentLocalityIndex加1,getAllowedLocality返回NODE_LOCAL,导致task2被调度到executor2,但是executor3~executor100均调度失败;
    3. 2s过后假如executor1执行task1结束,executor1参与调度,task3成功调度到executor1;
    4. dequeueTask返回Some(_),currentLocalityIndex=0,lastLaunch=curTime,其他executor调度失败;
    5. 1s过后,类似步骤2的情况再次发生;
    6. 假如上米娜四步循环发生,会导致长时间的executor处于idle状态,默认60s,idle的executor被系统释放掉,
    7. stage被拖死;

    应对方法:

    1. 调整currentLocalityIndex和lastLaunchTime的更新策略,能够提高task的调度效率;
    2. 减少spark.locaity.wait;

    以上两点均以牺牲数据本地性为代价。

    补充写一点Spark关于本地偏好的机制,Spark通过RDD的依赖关系拓扑图来描述整个一个Job的计算过程,整个拓扑图通过shuffle dependency来划分出各个stage,我们说一个stage就是从一个shuffle-read开始到一个shuffle-write,task运行的executor距离shuffle-read(或者读取dfs, cache)数据物理距离越近,本地性就越强,那hdfs距离,RDD的HadoopPartition内部就描述着split的位置信息,而这样的信息会在DAGScheduler.submitMissingTasks时通过listenerBus以SparkListenerStageSubmitted的形式通知给ExecutorAllocationManager,ExecutorAllocation据此向ExecutorAllocationClient指示最终通过YarnAllocator如何申请executor,申请获得的executor以offer的形式最终分配给TaskScheduler,offer和task最终在TaskSetManager内部完成匹配;

    TaskSetManager内有几个存储结构,

    pendingTasksForExecutor, 
    pendingTasksForHost, 
    pendingTasksForRack, 
    pendingTasksWithNoPrefs, 
    allPendingTasks;
    

    由低到高,假如一个task存在于pendingTasksForExecutor,它一定存在于其他四种,相应的key就是executor所属的Host,Rack等;上面四个集合在tasks初始化的时候就根据task的location preference确定了,TaskSetManager只要根据offer内的executor,按照延迟分配的策略,匹配出对应的task即可,当然,也有可能由于本地性的原因,无法匹配出任何task;

    下面终点讲一下延迟匹配的函数:

    private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    ......
      while (currentLocalityIndex < myLocalityLevels.length - 1) {    
        val moreTasks = myLocalityLevels(currentLocalityIndex) match {      
          case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)      
          case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)      
          case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty      
          case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)     
        }    
        if (!moreTasks) {      
          // This is a performance optimization: if there are no more tasks that can      
          // be scheduled at a particular locality level, there is no point in waiting     
          // for the locality wait timeout (SPARK-4939).      
          lastLaunchTime = curTime      
          logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +        s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")      currentLocalityIndex += 1    
        } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {      
          // Jump to the next locality level, and reset lastLaunchTime so that the next locality      
          // wait timer doesn't immediately expire      
          lastLaunchTime += localityWaits(currentLocalityIndex)      
          logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +        s"${localityWaits(currentLocalityIndex)}ms")      
          currentLocalityIndex += 1    } 
        else {
          return myLocalityLevels(currentLocalityIndex)
        }
      }
      myLocalityLevels(currentLocalityIndex)
    }
    

    延迟策略的关键就是currentLocalityIndex的变化,终点是上面的else if,这里面一个地方有一些微妙,就是四个级别的先后顺序是process, node, nopref, rack, any,有人可能会猜测nopref会被rack先选中,其实不可能,因为nopref对应的wait时间默认是0s,所以while循环内,在遍历到node之后,会自动遍历通过nopref,并进入下一次遍历到rack,所以如果我们本地性偏好几种在rack,就可以把所有的wait值设成0,然后rack的wait值设成1~3s,这样能够缓解因为本地偏好到来的调度效率低下,极端情况下还是不能避免上面举出的例子,但是生产情况下应该会好很多。

    相关文章

      网友评论

          本文标题:Spark 动态资源分配下数据本地性导致的作业运行缓慢

          本文链接:https://www.haomeiwen.com/subject/emrhmttx.html