美文网首页大数据领域精选
【Spark 精选】源码阅读 — Scala 高级语法

【Spark 精选】源码阅读 — Scala 高级语法

作者: 熊本极客 | 来源:发表于2023-10-09 16:01 被阅读0次

    1.case 模式匹配

    case 模式匹配的使用样例

    # 1. 匹配特定的数据类型
    def processValue(value: Any): String = value match {
      case s: String => s"String: $s"
      case i: Int => s"Int: $i"
      case d: Double => s"Double: $d"
      case _ => "Other"
    }
    
    val result1 = processValue("Hello") // 输出 "String: Hello"
    val result2 = processValue(123) // 输出 "Int: 123"
    val result3 = processValue(3.14) // 输出 "Double: 3.14"
    val result4 = processValue(true) // 输出 "Other"
    
    
    # 2.根据不同的输入执行不同的逻辑
    def processInput(input: Any): String = input match {
      case 1 => "One"
      case "two" => "Two"
      case _: Double => "A Double"
      case _ => "Other"
    }
    
    val result1 = processInput(1) // 输出 "One"
    val result2 = processInput("two") // 输出 "Two"
    val result3 = processInput(3.14) // 输出 "A Double"
    val result4 = processInput(true) // 输出 "Other"
    
    
    # 3.解构数据结构
    val tuple = (1, "two", 3.14)
    val (a, b, c) = tuple // 解构元组
    println(a) // 输出 1
    println(b) // 输出 "two"
    println(c) // 输出 3.14
    
    val list = List(1, 2, 3, 4, 5)
    val result = list.map {
      case x if x % 2 == 0 => "Even"
      case _ => "Odd"
    }
    println(result) // 输出 List("Odd", "Even", "Odd", "Even", "Odd")
    

    spark-sql 源码中的 case 模式匹配AnalyzerResolveRelations

      object ResolveRelations extends Rule[LogicalPlan] {
        def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
          case view @ View(desc, isTempView, _, child) if !child.resolved =>
              // 省略 ...
          case p @ SubqueryAlias(_, view: View) =>
            p.copy(child = resolveViews(view))
          case _ => plan
        }
    }
    

    2.case 类

    case 类的使用场景

    • 数据传递case class可以用于封装一组相关的数据,并且很容易进行复制和传递。
    • 模式匹配case class可以与模式匹配结合使用,方便地根据不同的数据类型进行处理。
    // 定义一个 case class
    case class Person(name: String, age: Int)
    
    // 创建一个 Person 对象
    val person1 = Person("Alice", 25)
    
    // 复制一个 Person 对象,并修改部分属性
    val person2 = person1.copy(age = 30)
    
    // 打印 person1 和 person2
    println(person1) // 输出 Person(Alice,25)
    println(person2) // 输出 Person(Alice,30)
    
    // 模式匹配
    def processPerson(person: Person): String = person match {
      case Person(name, age) if age < 30 => s"$name is young"
      case Person(name, age) if age >= 30 => s"$name is old"
    }
    
    val result1 = processPerson(person1) // 输出 "Alice is young"
    val result2 = processPerson(person2) // 输出 "Alice is old"
    

    spark-sql 源码中的 case 类LogicalPlan 的子类 Filter

    // case 类 Filter
    case class Filter(condition: Expression, child: LogicalPlan)
      extends OrderPreservingUnaryNode with PredicateHelper {
      override def output: Seq[Attribute] = child.output
    
      override def maxRows: Option[Long] = child.maxRows
    
      override protected lazy val validConstraints: ExpressionSet = {
        val predicates = splitConjunctivePredicates(condition)
          .filterNot(SubqueryExpression.hasCorrelatedSubquery)
        child.constraints.union(ExpressionSet(predicates))
      }
    }
    
    // case 模式匹配
    // EliminateOuterJoin的apply
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
          val newJoinType = buildNewJoinType(f, j)
          if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
      }
    

    3.嵌套函数

    spark-sql 源码中的嵌套函数QueryPlan 的嵌套函数 mapExpressions

      def mapExpressions(f: Expression => Expression): this.type = {
        var changed = false
    
         // 嵌套函数A
        @inline def transformExpression(e: Expression): Expression = {
          val newE = CurrentOrigin.withOrigin(e.origin) {
            f(e)
          }
          if (newE.fastEquals(e)) {
            e
          } else {
            changed = true
            newE
          }
        }
        
        // 嵌套函数B
        def recursiveTransform(arg: Any): AnyRef = arg match {
          case e: Expression => transformExpression(e)    // 执行嵌套函数B
          case Some(value) => Some(recursiveTransform(value))
          case m: Map[_, _] => m
          case d: DataType => d // Avoid unpacking Structs
          case stream: Stream[_] => stream.map(recursiveTransform).force
          case seq: Iterable[_] => seq.map(recursiveTransform)
          case other: AnyRef => other
          case null => null
        }
        // 执行嵌套函数A
        val newArgs = mapProductIterator(recursiveTransform)
    
        if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
      }
    

    4.偏函数

    偏函数的使用样例

    • 过滤和转换数据:偏函数可以用于过滤和转换数据。通过定义适当的条件,可以使用偏函数来过滤掉不需要的数据或者将数据进行转换。
    • 对特定输入进行处理:偏函数可以对特定类型或特定条件的输入进行处理,而对其他输入不进行处理。
    // 定义一个偏函数,只处理正整数和字符串类型的输入
    val partialFunc: PartialFunction[Any, String] = {
      case i: Int if i > 0 => s"Positive integer: $i"
      case s: String => s"String: $s"
    }
    
    // applyOrElse接口接受两个参数:输入值和默认值。如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
    println(partialFunc.applyOrElse(10, (x: Any) => "Not defined")) // 输出:Positive integer: 10
    println(partialFunc.applyOrElse(-5, (x: Any) => "Not defined")) // 输出:Not defined
    println(partialFunc.applyOrElse("hello", (x: Any) => "Not defined")) // 输出:String: hello
    println(partialFunc.applyOrElse(3.14, (x: Any) => "Not defined")) // 输出:Not define
    

    spark-sql 源码中的偏函数
    AnalysisHelper 的函数 resolveOperatorsUp

      // 跳过已经分析过的rule,并递归获取子节点
      def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
        if (!analyzed) {
          AnalysisHelper.allowInvokingTransformsInAnalyzer {
            val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
            if (self fastEquals afterRuleOnChildren) {
              CurrentOrigin.withOrigin(origin) {
                // rule是偏函数,applyOrElse会执行这个函数
               // 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
                rule.applyOrElse(self, identity[LogicalPlan])
              }
            } else {
              CurrentOrigin.withOrigin(origin) {
                val afterRule = rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
                afterRule.copyTagsFrom(self)
                afterRule
              }
            }
          }
        } else {
          self
        }
      }
    

    Optimizer 的函数 ConvertToLocalRelation

    // Optimizer 
    object ConvertToLocalRelation extends Rule[LogicalPlan] {
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {   // 下面函数的内容整体是偏函数,作为transform的入参
        case Project(projectList, LocalRelation(output, data, isStreaming))
            if !projectList.exists(hasUnevaluableExpr) =>
          val projection = new InterpretedMutableProjection(projectList, output)
          projection.initialize(0)
          LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)
    
        case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
          LocalRelation(output, data.take(limit), isStreaming)
    
        case Filter(condition, LocalRelation(output, data, isStreaming))
            if !hasUnevaluableExpr(condition) =>
          val predicate = Predicate.create(condition, output)
          predicate.initialize(0)
          LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
      }
     // 省略...
    }
    
      // TreeNode
      def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
        transformDown(rule)
      }
    
      def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
        val afterRule = CurrentOrigin.withOrigin(origin) {
          // rule是偏函数,applyOrElse会执行这个函数
          // 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
          rule.applyOrElse(this, identity[BaseType])
        }
    
        if (this fastEquals afterRule) {
          // 获取子节点,递归执行transformDown
          mapChildren(_.transformDown(rule))
        } else {
          afterRule.copyTagsFrom(this)
          afterRule.mapChildren(_.transformDown(rule))
        }
      }
    

    5.柯里化函数

    柯里化函数的使用样例

    • 部分应用:柯里化函数可以通过部分应用的方式,先给函数提供部分参数,然后返回一个接受剩余参数的新函数。这样可以在不同的上下文中复用同一个函数。
    • 函数组合:柯里化函数可以方便地进行函数组合,将多个函数串联起来。通过将一个函数的输出作为另一个函数的输入,可以构建更复杂的函数逻辑。

    如下案例中,add是一个柯里化函数,它接受两个参数 xy。通过部分应用的方式,我们先给 add 函数提供一个参数 1,然后返回一个新的函数 addOne,这个新函数只接受一个参数 y。最后,我们调用 addOne 函数,传递剩余参数 2,得到结果 3

    def add(x:Int)(y:Int): Int = x + y
    
    val addOne = add(1) _ // 部分应用,返回一个接受一个参数的新函数
    
    val result = addOne(2) // 调用新函数,传递剩余参数
    
    println(result) // 输出 3
    

    spark-sql 源码中的柯里化函数ParseDriver 的方法 parse

      // 参数1是command,参数2是toResult
      protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
        logDebug(s"Parsing command: $command")
       
        // 使用参数command
        val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
        lexer.removeErrorListeners()
        lexer.addErrorListener(ParseErrorListener)
    
        val tokenStream = new CommonTokenStream(lexer)
        val parser = new SqlBaseParser(tokenStream)
        parser.addParseListener(PostProcessor)
        // 使用参数command 
        parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
        // 省略 ...
        try {
          try {
            // first, try parsing with potentially faster SLL mode
            parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
           // 使用参数toResult
           // parser里面包含参数command,parser再作为toResult函数的入参
            toResult(parser)
          }
          catch {
            case e: ParseCancellationException =>
              // 省略...
              // 使用参数toResult
              // parser里面包含参数command,parser再作为toResult函数的入参
              toResult(parser)
          }
        }
        // 省略 ...
      }
    

    6.基于 Product 实现的 TreeNode

    Product 的使用样例

    • 元组操作Product 接口为元组类提供了一些常用的方法,如 productElement 用于获取元素值,productArity 用于获取元素数量。
    • 模式匹配Product 接口可以与模式匹配结合使用,方便地对元组进行解构和处理。
    // 导入Product接口
    import scala.Product
    
    // 定义一个元组类,继承自Product接口
    class MyTuple(val first: Int, val second: String) extends Product {
      // 实现Product接口的抽象方法
      def productElement(n: Int): Any = n match {
        case 0 => first
        case 1 => second
        case _ => throw new IndexOutOfBoundsException(s"Tuple index out of range: $n")
      }
    
      // 实现Product接口的抽象方法
      def productArity: Int = 2
    
      // 重写toString方法
      override def toString: String = s"MyTuple($first, $second)"
    }
    
    // 创建一个MyTuple对象
    val tuple = new MyTuple(42, "Hello")
    
    // 获取元素值
    val firstElement = tuple.productElement(0)
    val secondElement = tuple.productElement(1)
    
    // 获取元素数量
    val arity = tuple.productArity
    
    // 打印结果
    println(firstElement)  // 输出 42
    println(secondElement) // 输出 "Hello"
    println(arity)         // 输出 2
    println(tuple)         // 输出 "MyTuple(42, Hello)"
    

    spark-sql 源码中的 Product:基于 Product 实现的 TreeNode

    abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
      // 省略 ...
      protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {    // 入参是参数类型为B的scala function,返回值是Array[B]
        val arr = Array.ofDim[B](productArity)
        var i = 0
        while (i < arr.length) {
          arr(i) = f(productElement(i))    // productElement会执行传入的函数mp,然后f会执行apply
          i += 1
        }
        arr
      }  
      // 省略 ...
    }
    
      private def mapChildren(
          f: BaseType => BaseType,
          forceCopy: Boolean): BaseType = {
        // 省略...
    
        val newArgs = mapProductIterator {    // mapProductIterator 的入参是下面的函数mp
          case arg: TreeNode[_] if containsChild(arg) =>
            // 省略...
          case Some(arg: TreeNode[_]) if containsChild(arg) =>
            // 省略...
          case m: Map[_, _] => m.mapValues {
            // 省略...
          }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
          case d: DataType => d // Avoid unpacking Structs
          case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
          case args: Iterable[_] => args.map(mapChild)
          case nonChild: AnyRef => nonChild
          case null => null
        }
        // 省略...
      }
    

    相关文章

      网友评论

        本文标题:【Spark 精选】源码阅读 — Scala 高级语法

        本文链接:https://www.haomeiwen.com/subject/lnwkydtx.html