美文网首页
Spark SQL:1.架构概览

Spark SQL:1.架构概览

作者: 丹之 | 来源:发表于2018-12-05 09:19 被阅读23次

基于spark sql 2.3 源码解读
spark sql 的前身是shark,类似于 hive, 用户可以基于spark引擎使用sql语句对数据进行分析,而不用去编写程序代码。



上图很好的展示了spark sql的功能,提供了 jdbc,console,编程接口三种方式来操作RDD(Resilient Distributed Datasets),用户只需要编写sql即可,不需要编写程序代码。
spark sql的运行流程如下:



大概有6步:
1.sql 语句经过 SqlParser 解析成 Unresolved Logical Plan;
2.analyzer 结合 catalog 进行绑定,生成 Logical Plan;

3.optimizer 对 Logical Plan 优化,生成 Optimized LogicalPlan;
4.SparkPlan 将 Optimized LogicalPlan 转换成 Physical Plan;
5.prepareForExecution()将 Physical Plan 转换成 executed Physical Plan;
6.execute()执行可执行物理计划,得到RDD;
上述流程在spark中对应的源码部分:

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {

  // TODO: Move the planner an optimizer into here from SessionState.
  protected def planner = sparkSession.sessionState.planner

  def assertAnalyzed(): Unit = analyzed

  def assertSupported(): Unit = {
    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
      UnsupportedOperationChecker.checkForBatch(analyzed)
    }
  }

  lazy val analyzed: LogicalPlan = {
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.executeAndCheck(logical)
  }

  lazy val withCachedData: LogicalPlan = {
    assertAnalyzed()
    assertSupported()
    sparkSession.sharedState.cacheManager.useCachedData(analyzed)
  }

  lazy val optimizedPlan: LogicalPlan = {
    sparkSession.sessionState.optimizer.execute(withCachedData)
  }

  lazy val sparkPlan: SparkPlan = {
    SparkSession.setActiveSession(sparkSession)
    // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
    //       but we will implement to choose the best plan.
    planner.plan(ReturnAnswer(optimizedPlan)).next()
  }

  // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

  /** Internal version of the RDD. Avoids copies and has no schema */
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

逻辑交代的非常清楚,从最后一行的 lazy val toRdd: RDD[InternalRow] = executedPlan.execute() 往前推便可清晰看到整个流程。细心的同学也可以看到 所有步骤都是lazy的,只有调用了execute才会触发执行,这也是spark的重要设计思想。

https://github.com/sddyljsx/spark-sql-2.3-source-code-interpretation/blob/master/spark%20sql%202.3%20%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB%20-%20%E6%9E%B6%E6%9E%84%E6%A6%82%E8%A7%88%20(1).md

相关文章

网友评论

      本文标题:Spark SQL:1.架构概览

      本文链接:https://www.haomeiwen.com/subject/fpxicqtx.html