美文网首页
4.3 DAGScheduler - Locality特性

4.3 DAGScheduler - Locality特性

作者: GongMeng | 来源:发表于2018-11-14 23:44 被阅读0次

    1. 概述

    和Hadoop1.0时代一样, spark在计算DAG执行时, 也会考虑到Locality特性. 这个在上文中已经描述过了. 大量的Map操作尽可能的在同一个物理机器上运行, 规避到从物理机到物理机的传输过程.

    2. 实现

    我们可以看到一个非常重要的概念是NarrowDependency, 它的基本含义是这个RDD中的parition依赖的上一个parition是经过Transform操作而不是Action操作而来的.

    结合上文, 我们知道所有Action类操作会导致shuffle, 从而导致一个新的stage的生成.

    所以locality可以理解成, 尽可能让一个stage内部的A->B->C->D这些transform工作在同一台物理机上完成. RDD中的Partition被读取, 并初始化在哪台Executor上, 就在那个Executor上各种执行操作, 直到遇到一个shuffle操作为止.

    这个特性我们后边在描述RDD时还会再提起, 这个特性同样存在于Flink中, 只不过早期版本的Apache Flink优化的更狠, 它会直接合并所有中间这些操作变成一个大操作. 如果这个操作后继没有依赖, 它就会变成一个独立任务进行执行.

    /**
       * Recursive implementation for getPreferredLocs.
       *
       * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
       * methods (getCacheLocs()); please be careful when modifying this method, because any new
       * DAGScheduler state accessed by it may require additional synchronization.
       */
      private def getPreferredLocsInternal(
          rdd: RDD[_],
          partition: Int,
          visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
        // If the partition has already been visited, no need to re-visit.
        // This avoids exponential path exploration.  SPARK-695
        if (!visited.add((rdd, partition))) {
          // Nil has already been returned for previously visited partitions.
          return Nil
        }
        // If the partition is cached, return the cache locations
        val cached = getCacheLocs(rdd)(partition)
        if (cached.nonEmpty) {
          return cached
        }
        // If the RDD has some placement preferences (as is the case for input RDDs), get those
        val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
        if (rddPrefs.nonEmpty) {
          return rddPrefs.map(TaskLocation(_))
        }
    
        // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
        // that has any placement preferences. Ideally we would choose based on transfer sizes,
        // but this will do for now.
        rdd.dependencies.foreach {
          case n: NarrowDependency[_] =>
            for (inPart <- n.getParents(partition)) {
              val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
              if (locs != Nil) {
                return locs
              }
            }
    
          case _ =>
        }
    
        Nil
      }
    

    相关文章

      网友评论

          本文标题:4.3 DAGScheduler - Locality特性

          本文链接:https://www.haomeiwen.com/subject/pytefqtx.html