美文网首页大数据点滴大数据spark生态系统
Spark2.1和2.2 SQL物理执行策略关键源码分析

Spark2.1和2.2 SQL物理执行策略关键源码分析

作者: orisonchan | 来源:发表于2018-10-14 10:30 被阅读10次

    1. 文章开始之前

    先附上一句SQL,使用tpc-ds的表结构,我们围绕这句SQL讲。

    • SQL:

    SQL> select
    avg(cs_ext_discount_amt)
    from
    catalog_sales, date_dim
    where
    d_date between '1999-02-22'
    and
    cast('1999-05-22' as date)
    and
    d_date_sk = cs_sold_date_sk
    group by cs_sold_date_sk;

    • 逻辑计划:
    Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
    +- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
       +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
          :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
          :  +- Filter isnotnull(cs_sold_date_sk#24)
          :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
          +- Project [d_date_sk#58]
             +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
                +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
    

    2. 物理计划源码分析

    2.1 物理策略

    def strategies: Seq[Strategy] =
          extraStrategies ++ (
          FileSourceStrategy ::
          DataSourceStrategy ::
          DDLStrategy ::
          SpecialLimits ::
          Aggregation ::
          JoinSelection ::
          InMemoryScans ::
          BasicOperators :: Nil)
    

    其中,extraStrategies是提供给外部人员可以自己添加的策略。调用这些strategies的代码如下:

    // Collect physical plan candidates.
    val candidates = strategies.iterator.flatMap(_(plan))
    

    将strategies逐个去应用在逻辑计划上,然后做flat操作,返回一个PhysicalPlan的iterator。那么每个策略什么作用?

    2.1.1 FileSourceStrategy

    一个针对Hadoop文件系统做的策略,当执行计划的底层Relation是HadoopFsRelation时会调用到,用来扫描文件。

    2.1.2 DataSourceStrategy

    Spark针对DataSource预定义了四种scan接口,TableScanPrunedScanPrunedFilteredScanCatalystScan(其中CatalystScan是unstable的,也是不常用的),如果开发者(用户)自己实现的DataSource是实现了这四种接口之一的,在scan到执行计划的底层Relation时,就会调用来扫描文件。

    2.1.3 DDLStrategy(2.2中已经消失了,2.1中有)

    会在create table的时候调用,因为后续版本不会存在,所以不做解释。

    2.1.4 SpecialLimits

    在Spark SQL中加limit n时候回调用到(如果不指定,Spark 默认也会limit 20),在源码中,会给每种case的limit节点的子节点使用PlanLater,这是个很神奇的东西下文会讲到。

    2.1.5 Aggregation

    顾名思义,执行聚合函数的策略。

    2.1.6 JoinSelection

    执行join的策略。Join的执行策略也同样分BroadcastJoin(也就是MapSideJoin),和ShuffledJoin,这个之后的文章会展开讲。

    2.1.7 InMemoryScans

    当数据在内存中被缓存过,就会用到该策略。

    2.1.8 BasicOperators

    一些基本操作的执行策略,如flatMap,sort,project等,但是实际上大都是给这些节点的子节点套上一个PlanLater

    2.2 PlanLater

    Spark SQL物理计划里一个非常重要的概念。字面意思很好理解,就是之后再计划。那么经过以上策略逐个去执行以后,原来的逻辑计划会变成什么样呢?

    ReturnAnswer
    +- GlobalLimit 21
       +- LocalLimit 21
          +- PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
             , Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
             +- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
                , Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
                +- PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
                   :- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
                      , Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
                   :  +- Filter isnotnull(cs_sold_date_sk#24)
                   :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
                   +- PlanLater Project [d_date_sk#58]
                      , Project [d_date_sk#58]
                      +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
                         +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
    

    有什么差别呢?主要有二:

      1. 顶层多了个ReturnAnswerLimit节点
      1. AggregateProjectJoin节点都用了PlanLater

    (其实Filter节点也是可以用PlanLater的,但是由于逻辑计划已经将Filter下推至底部,所以最底部的Project->Filter->Relation的三层节点是可以直接调用一个策略去执行的,因此只需要三层节点的最上层也就是Project节点使用PlanLater即可。)

    言归正传,语法树顶部多了ReturnAnswerLimit节点,很容易理解,Limit是Spark SQL默认限制行数,ReturnAnswer是将结果返回。那么加的PlanLater有什么作用?我的理解是,将物理计划分割成一段段,每一段物理计划会有其对应策略来执行。具体源码如下:

      def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
        // Obviously a lot to do here still...
    
        // Collect physical plan candidates.
        val candidates = strategies.iterator.flatMap(_(plan))
    
        // The candidates may contain placeholders marked as [[planLater]],
        // so try to replace them by their child plans.
        val plans = candidates.flatMap { candidate =>
          val placeholders = collectPlaceholders(candidate)
    
          if (placeholders.isEmpty) {
            // Take the candidate as is because it does not contain placeholders.
            Iterator(candidate)
          } else {
            // Plan the logical plan marked as [[planLater]] and replace the placeholders.
            placeholders.iterator.foldLeft(Iterator(candidate)) {
              case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
                // Plan the logical plan for the placeholder.
                val childPlans = this.plan(logicalPlan)
    
                candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
                  childPlans.map { childPlan =>
                    // Replace the placeholder by the child plan
                    candidateWithPlaceholders.transformUp {
                      case p if p == placeholder => childPlan
                    }
                  }
                }
            }
          }
        }
    
        val pruned = prunePlans(plans)
        assert(pruned.hasNext, s"No plan for $plan")
        pruned
      }
    

    可以看到,经过策略迭代器和flat过后的candidates候选计划们(一般来说只有一个,是最顶层的planLater),然后收集placeholder(其实就是planlater),这个时候对placeholders进行迭代,并对每个placeholder的child plan递归调用plan方法。举例文章这句SQL,递归调用plan方法,得到每个placeholder及其child plan节点(也就是 case (candidatesWithPlaceholders, (placeholder, logicalPlan))这句话的placeholder和logicalPlan两个变量)如下:

    placeholder:
    PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
    
    logicalPlan:
    Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
    +- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
       +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
          :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
          :  +- Filter isnotnull(cs_sold_date_sk#24)
          :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
          +- Project [d_date_sk#58]
             +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
                +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
    
    placeholder:
    PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
    
    logicalPlan:
    Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
    +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
       :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
       :  +- Filter isnotnull(cs_sold_date_sk#24)
       :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
       +- Project [d_date_sk#58]
          +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
             +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
    
    placeholder:
    PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
    
    logicalPlan:
    Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
    :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
    :  +- Filter isnotnull(cs_sold_date_sk#24)
    :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
    +- Project [d_date_sk#58]
       +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
          +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
    
    placeholder:
    PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
    
    logicalPlan:
    Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
    +- Filter isnotnull(cs_sold_date_sk#24)
       +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
    
    placeholder:
    PlanLater Project [d_date_sk#58]
    
    logicalPlan:
    Project [d_date_sk#58]
    +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
       +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
    

    那么可以看到,递归到最底处,就是project->filter->relation的三层节点组合,由于我实际是重写过了DataSource,这个时候会调用DataSourceStrategy,去读取获取数据,然后递归逐个返回根据每个planLater分割点会有对应的策略去对数据进行相应的操作。

    相关文章

      网友评论

        本文标题:Spark2.1和2.2 SQL物理执行策略关键源码分析

        本文链接:https://www.haomeiwen.com/subject/oprzaftx.html