美文网首页大数据Spark深入学习Spark在简书
Spark Sql 源码剖析(二): TreeNode

Spark Sql 源码剖析(二): TreeNode

作者: 牛肉圆粉不加葱 | 来源:发表于2018-06-07 10:09 被阅读41次

    零、前置知识 Scala Product trait

    // 所有 products 的基trait,至少包含 [[scala.Product1]] 至 [[scala.Product22]] 及 [[scala.Tuple1]] 至 [[scala.Tuple22]]
    trait Product extends Any with Equals {
      // 第 n 个元素,从0开始
      def productElement(n: Int): Any
    
      // product size
      def productArity: Int
    
      // product 遍及所有元素的迭代器
      def productIterator: Iterator[Any] = new scala.collection.AbstractIterator[Any] {
        private var c: Int = 0
        private val cmax = productArity
        def hasNext = c < cmax
        def next() = { val result = productElement(c); c += 1; result }
      }
    }
    

    一、CurrentOrigin

    使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。

    // Origin 表示第几行第几列
    case class Origin(
      line: Option[Int] = None,
      startPosition: Option[Int] = None)
    

    object CurrentOrigin 主要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列

    另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)

    二、重要方法

    2.1、children: Seq[BaseType](由子类实现)

    返回该节点的 seq of children,children 是不可变的。有三种情况:

    • LeafNode:无 children
    • UnaryNode:包含一个 child
    • BinaryNode:包含 left、right 两个 child

    2.2、find(f: BaseType => Boolean): Option[BaseType]

    查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。

    2.3、foreach(f: BaseType => Unit): Unit

      def foreach(f: BaseType => Unit): Unit = {
        f(this)
        children.foreach(_.foreach(f))
      }
    

    将函数 f 递归应用于节点及其子节点

    2.4、foreachUp(f: BaseType => Unit): Unit

    与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent

    2.5、map[A](f: BaseType => A): Seq[A]

      def map[A](f: BaseType => A): Seq[A] = {
        val ret = new collection.mutable.ArrayBuffer[A]()
        foreach(ret += f(_))
        ret
      }
    

    调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型

    2.6、flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A]

    原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]

    2.5、collect[B](pf: PartialFunction[BaseType, B]): Seq[B]

      def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = {
        val ret = new collection.mutable.ArrayBuffer[B]()
        val lifted = pf.lift
        foreach(node => lifted(node).foreach(ret.+=))
        ret
      }
    

    PartialFunction#lift:将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回

    2.6、collectLeaves(): Seq[BaseType]

    以 Seq 的形式返回 tree 的所有叶子节点

    def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B]:注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None

    2.7、mapProductIterator[B: ClassTag](f: Any => B): Array[B]

    相当于 productIterator.map(f).toArray ,即对于 productIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回

    注意:TreeNode 没有实现 Product 相关方法,都由其子类自行实现

    2.8、withNewChildren

    使用 new children 替换并返回该节点的拷贝。该方法会对 productElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。

    2.9、transform(rule: PartialFunction[BaseType, BaseType]): BaseType

    调用 transformDown

    2.10、transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType

    rule: PartialFunction[BaseType, BaseType]

      def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
        val afterRule = CurrentOrigin.withOrigin(origin) {
            // 如果 this 是 BaseType 或其子类,则对 this 应用 rule 再返回应用 rule 后的结果,否则返回 this 
          rule.applyOrElse(this, identity[BaseType])
        }
    
        // Check if unchanged and then possibly return old copy to avoid gc churn.
        if (this fastEquals afterRule) {
            // 如果应用了 rule 后节点无变化,则递归将 rule 应用于 children
          mapChildren(_.transformDown(rule))
        } else {
            // 如果应用了 rule 后节点有变化,则本节点换成变化后的节点(children 不变),再将 rule 递归应用于子节点。也就是从根节点往下来应用 rule 替换节点
          afterRule.mapChildren(_.transformDown(rule))
        }
      }
    

    2.11、mapChildren(f: BaseType => BaseType): BaseType

    返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 copy。其内部的原理是调用 mapProductIterator,对每一个 productElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:

    case arg: TreeNode[_] if containsChild(arg) =>
      val newChild = f(arg.asInstanceOf[BaseType])
      if (!(newChild fastEquals arg)) {
        changed = true
        newChild
      } else {
        arg
      }
    case Some(arg: TreeNode[_]) if containsChild(arg) =>
      val newChild = f(arg.asInstanceOf[BaseType])
      if (!(newChild fastEquals arg)) {
        changed = true
        Some(newChild)
      } else {
        Some(arg)
      }
    case m: Map[_, _] => m.mapValues {
      case arg: TreeNode[_] if containsChild(arg) =>
        val newChild = f(arg.asInstanceOf[BaseType])
        if (!(newChild fastEquals arg)) {
          changed = true
          newChild
        } else {
          arg
        }
      case other => other
    }.view.force // `mapValues` is lazy and we need to force it to materialize
    case d: DataType => d // Avoid unpacking Structs
    case args: Traversable[_] => args.map {
      case arg: TreeNode[_] if containsChild(arg) =>
        val newChild = f(arg.asInstanceOf[BaseType])
        if (!(newChild fastEquals arg)) {
          changed = true
          newChild
        } else {
          arg
        }
      case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
        val newChild1 = if (containsChild(arg1)) {
          f(arg1.asInstanceOf[BaseType])
        } else {
          arg1.asInstanceOf[BaseType]
        }
    
        val newChild2 = if (containsChild(arg2)) {
          f(arg2.asInstanceOf[BaseType])
        } else {
          arg2.asInstanceOf[BaseType]
        }
    
        if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
          changed = true
          (newChild1, newChild2)
        } else {
          tuple
        }
      case other => other
    }
    case nonChild: AnyRef => nonChild
    case null => null
    

    以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回

    2.12、makeCopy(newArgs: Array[AnyRef]): BaseType

    反射生成节点副本

    2.13、nodeName: String

    返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀

    2.14、innerChildren: Seq[TreeNode[_]]

    所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries

    2.15、 allChildren: Set[TreeNode[_]]

    (children ++ innerChildren).toSet[TreeNode[_]]

    2.16、node string 相关

    • 用一行表示该节点
    • 一行更细致的
    • 带 suffix 的
    • tree 形状的
    • tree 形状带 num 的
    • to json
    • pretty json 等 json 相关的

    2.17、apply(number: Int): TreeNode[_]

    主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的

    private def getNodeNumbered(number: MutableInt): Option[TreeNode[_]] = {
        if (number.i < 0) {
          None
        } else if (number.i == 0) {
          // 返回根节点
          Some(this)
        } else {
          number.i -= 1
          // 注意,此遍历顺序必须与numberedTreeString相同
          innerChildren.map(_.getNodeNumbered(number)).find(_ != None).getOrElse {
            children.map(_.getNodeNumbered(number)).find(_ != None).flatten
          }
        }
      }
    

    相关文章

      网友评论

        本文标题:Spark Sql 源码剖析(二): TreeNode

        本文链接:https://www.haomeiwen.com/subject/oabwsftx.html