美文网首页
spark-sql源码解读

spark-sql源码解读

作者: Wping_1c08 | 来源:发表于2020-03-29 00:06 被阅读0次

一、开发一个spark应用

//初始化sparksession
    val spark = SparkSession.builder.appName("SparkSQL Test").master("local[4]").getOrCreate() 
    //transform操作,生成dataframe,可继续执行dataframe相关dsl api,
    val sqlDf = spark.sql("select count(*) from table")
    //action操作,spark-core开始执行
    sqlDf.show(false)

二、初始化sparksession-sessionState构造过程

//1:sparksession懒加载sessionstate
        lazy val sessionState: SessionState = {
            parentSessionState
              .map(_.clone(this))
              .getOrElse {
                val state = SparkSession.instantiateSessionState(
                  SparkSession.sessionStateClassName(sparkContext.conf),
                  self)
                initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
                state
              }
          }
//2:实例化sessionstate
    /**
       * Helper method to create an instance of `SessionState` based on `className` from conf.
       * The result is either `SessionState` or a Hive based `SessionState`.
       */
      private def instantiateSessionState(
          className: String,
          sparkSession: SparkSession): SessionState = {
        try {
          // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
          val clazz = Utils.classForName(className)
          val ctor = clazz.getConstructors.head
          //默认:BaseSessionStateBuilder, hive:HiveSessionStateBuilder
          ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
            ...
//3: 构建SessionState,初始化catalog、sqlparser、analyzer、optimzizer,内置函数以及udf函数等等
            def build(): SessionState = {
              new SessionState(
                session.sharedState,
                conf,
                experimentalMethods,
                functionRegistry,
                udfRegistration,
                () => catalog,
                sqlParser,
                () => analyzer,
                () => optimizer,
                planner,
                streamingQueryManager,
                listenerManager,
                () => resourceLoader,
                createQueryExecution,
                createClone)
            }

注:QueryExecution、SessionState、BaseSessionStateBuilder之间的关系:
(1)QueryExecution的analyzed、optimizedPlan是懒加载的,被调用时实际调用的是SessionState中的analyzer、optimizer的相关方法做解析和优化
(2)SessionState的catalog、analyzer、optimizer、resourceLoader也是懒加载的,被调用时实际调用的是在BaseSessionStateBuilder初始化SessionState的时候生成的匿名函数
三、transform-生成dataframe-resolved logicalPlan

/*
1: paserplan生成unresolved logicalPlan, ofRows方法中调用QueryExecution.assertAnalyzed(),
        其实是sparkSession.sessionState.analyzer.executeAndCheck(logical),
        再使用定义的各种解析规则,resolving unresolved attributes and relations,生成resolved logicalPlan,
        最终new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))生成dataframe*/
def sql(sqlText: String): DataFrame = {Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))}
//2: 使用访问者模式,astBuilder遍历antlr sql语法树,解析成catalyst的ast语法树,生成unresolved的逻辑计划
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }
 //代码3:将sql命令传给antlr,使用SqlBase.g4生成的词汇解析器SqlBaseLexer和语法解析器SqlBaseParser,对词和语法校验
  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")

    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)
    lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
    parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
        ...

四、action-触发执行-优化逻辑计划,生成物理计划,转为rdd提交给sparkContex

//1:拉取20行数据到driver端,调用take(),最终调用head()
        def show(): Unit = show(20)
            
        def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
//2:Wrap一个action,监控查询执行过程和时间花费,执行用户注册的回调函数
        private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
            try {
                /*触发optimizer优化器采用一系列优化规则(eg:谓词下推)对resolved logicalPlan进行优化,
                /sparkplanner选择出最优策略(eg:广播表)将optimizedPlan转化为sparkplan,
                    sparkplan应用一系列规则,转化为可预备执行的物理计划
                    */
              qe.executedPlan.foreach { plan =>
                plan.resetMetrics()
              }
              val start = System.nanoTime()
              val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
                 //调用collectFromPlan,交给spark-core,执行物理计划,转为rdd操作
                action(qe.executedPlan)
              }
              val end = System.nanoTime()
              sparkSession.listenerManager.onSuccess(name, qe, end - start)
              result
            } catch {
              case e: Exception =>
                sparkSession.listenerManager.onFailure(name, qe, e)
                throw e
            }
          }
//3:QueryExecution中从优化到生成可预备执行的物理计划工作流
          lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

          lazy val sparkPlan: SparkPlan = {
            SparkSession.setActiveSession(sparkSession)
            // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
            //       but we will implement to choose the best plan.
            planner.plan(ReturnAnswer(optimizedPlan)).next()
          }

          // executedPlan should not be used to initialize any SparkPlan. It should be
          // only used for execution.
          lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
              
          protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
              preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
            }

            /** A sequence of rules that will be applied in order to the physical plan before execution. */
            protected def preparations: Seq[Rule[SparkPlan]] = Seq(
              python.ExtractPythonUDFs,
              PlanSubqueries(sparkSession),
              EnsureRequirements(sparkSession.sessionState.conf),
              CollapseCodegenStages(sparkSession.sessionState.conf),
              ReuseExchange(sparkSession.sessionState.conf),
              ReuseSubquery(sparkSession.sessionState.conf))
//4:执行自定义的回调函数函数,该函数底层最终执行sparkplan的do把物理计划转化为rdd操作
            /**
               * Collect all elements from a spark plan.
               */
              private def collectFromPlan(plan: SparkPlan): Array[T] = {
                // This projection writes output to a `InternalRow`, which means applying this projection is not
                // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
                val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
                plan.executeCollect().map { row =>
                  // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
                  // parameter of its `get` method, so it's safe to use null here.
                  objProj(row).get(0, null).asInstanceOf[T]
                }
              }
//5:将sparkplan转为rdd,交给sparkContext提交job
              /**
                 * Runs this query returning the result as an array.
                 */
                def executeCollect(): Array[InternalRow] = {
                    //getByteArrayRdd调用execute(),再调用doExecute()方法,将sparkplan转为RDD
                  val byteArrayRdd = getByteArrayRdd()

                  val results = ArrayBuffer[InternalRow]()
                      //byteArrayRdd.collect()是rdd的action算子,会运行sc.runJob()提交job给spark集群
                  byteArrayRdd.collect().foreach { countAndBytes =>
                    decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
                  }
                  results.toArray
                }

相关文章

网友评论

      本文标题:spark-sql源码解读

      本文链接:https://www.haomeiwen.com/subject/pcneuhtx.html