8.Spark Sql

作者: 山间浓雾有路灯 | 来源:发表于2019-06-12 08:58 被阅读0次

    定义

    SparkSql是Apache Spark大数据框架的一部分,主要用于处理结构化数据和对Spark数据执行类sql的查询,Spark为其提供了一个称为DataFrame的编程抽象,充当分布式sql查询引擎

    功能

    • 集成:可以无缝将Sql查询与Spark程序混合
    • 统一数据访问:加载来自各种来源的数据
    • 兼容性:Spark Sql重用Hive前端和MetaStore,与现有Hive数据、查询和UDF的安全兼容,只需要和Hive一起安装即可
    • 标准连接:JDBC和ODBC
    • 扩展性:对交互式查询和长查询使用相同的引擎,Spark Sql利用RDD模型来支持查询容错,使其扩展大大型作业

    框架

    把数据接入到Spark Sql中,Spark Sql进行数据处理或算法实现,然后再把处理后的数据输出到相应的输出源


    images

    input(接入数据):

    • 数据源丰富:Hive,json,txt,JDBC等
    • Spark Sql存在两个类进行对接:HiveContext和SQLContext,其中HiveContext继承了SQLContext的所有方法,同时又进行了扩展
    • SQLContext用于对接绝大多数类型数据源,HiveContext是SQLContext的超集

    超集定义:如果一个集合S2中的每一个元素都在集合S1中,且集合S1中可能包含S2中没有的元素,则集合S1就是S2的一个超集,反过来,S2是S1的子集。 S1是S2的超集,若S1中一定有S2中没有的元素,则S1是S2的真超集,反过来S2是S1的真子集。

    • Spark Sql处理读取的数据,采用的是DataFrame中提供的方法

    DataFrame

    DataFrame是一个以命名列方式组织的分布式数据集,按照命名列的形式组织数据,等同于关系型数据库中的一个表
    通过调用DataFrame的内容作为行RDD(RDD of Rows)返回的Rdd方法,可以将DataFrame转换成RDD
    创建DataFrame方式

    • 已有RDD
    • 结构化数据文件
    • JSON数据
    • Hive表
    • 外部数据库

    RDD以record为单位,Spark优化时无法洞悉record内部的细节,无法深度优化,限制了SparkSql性能的提升
    DataFrame包含了每个record的metadata元数据信息,DataFrame的优化可以对列内部优化


    images
    细节
    • DataFrame是基于RDD的抽象,底层结构是RDD,Spark在你使用DataFrame时会优化你的代码


      images
    • Spark对于DataFrame在执行时间和内存使用上对于RDD有极大的优化

      • Catalyst引擎:使得执行时间减少75%
      • Project Tungsten Off-head内存管理:使内存使用量减少75%,无垃圾回收器

      off-head:本质是突破JVM内存管理限制,分配堆外内存

    Catalyst(优化引擎)

    • 本质是把Sql、DataFrame相结合,以树tree的形式来存储、优化
    • SQL优化器核心执行策略的两个方向:规则和代价
      images

    基于规则

    基于规则体现在经验式、启发式的优化思路,更多的依靠前辈总结出来的优化规则,简单易行且能够覆盖到大部分优化逻辑
    例如join算子(两张表做join)

    • 外排
      • 大循环外排:A,B两张表都很大,复杂度O(M * N),一般不用
      • 游标式外排:归并排序


        images
    • 内排
      • 小表放内存,大表做遍历(hive中的mapside join)

    基于代价

    • 核心算子的优化
    • 评估每种策略选择的代价,根据代价估算,确定代价最小的方案
    • 代价估算模型,调整join的顺序,减少中间shuffle数据的规模

    工作流程

    parser

    • 是针对sql的解析器,SQLParser生成LogicPlan Tree
    • 主要先进行词法分析,再进行语法分析
      • 词法分析:将输入的sql语句串解析为一个一个的token,再根据一定语义规则解析为一颗语法树
      • 语法分析:再词法分析基础上,将单词序列组合成各类语法短语,组成各个LogicPlan
    • 示例
      一个示例性的sql语句(有两张表,其中people表主要存储用户基本信息,score表存储用户的各种成绩),通过parser解析后的AST语法树,如图所示


      images

    analyzer

    • 遍历整个语法树,对树上的每个节点进行数据类型绑定以及函数绑定
    • 根据元数据表解析为包含必要列的表,并且相应字段解析为相应的数据类型,相应的计算逻辑解析为对应的函数
    • SparkSql中analyzer定义了各种解析规则


      images
    • 示例
      通过解析后的逻辑执行计划基本有了骨架,但是
      系统并不知道score,sum这些都是些什么,此时需要基本的元数据信息来表达这些词素,最重要的元数据信息主要包括两部分:
      1)表的Scheme:包括表的基本定义(列名,数据类型),表的数据格式(json,text),表的物理位置等
      2)基本函数信息:类消息
      analyzer会再次遍历整个语法树,对树上的每个节点进行数据类型绑定以及函数绑定,比如people词素会根据元数据表信息解析为包含age,id以及name三列的表,people,age会被解析为数据类型int的变量,sum会被解析为聚合函数


      images

    optimizer

    • 是Catalyst的核心
    • 基于规则优化实际上对语法树再次做遍历,模式匹配能够满足特定细节的节点,再进行相应的等价交换,因此,基于规则优化说到底就是一棵树等价的转换为另一棵树
    • 经典规则
      • 谓词下推
        下图左边是经过analyzer解析后的语法树,语法树中两个表先做join,之后再使用age > 10对结果进行过滤。join算子通常是一个非常耗时的算子,耗时多少一般取决于参与join的两个表的大小,如果能够减少参与join两表的大小,就可以大大降低join算子所需时间。谓词下推就是这样一种功能,它会将过滤操作下推到join之前进行,下图中过滤条件age > 0 以及 id != null两个条件就分别下推到了join之前。这样,系统在扫描数据的时候就怼数据进行了过滤,参与join的数据量将会得到显著的减少,join耗时必然也会降低


        images
      • 常量累加
        常量累加其实很简单,就是 x + (1 + 2) --> x + 3,虽然是一个很小的改动,但是意义巨大。下图示例中如果没有进行优化的话,每一条结果都需要执行一次100 + 80的操作,然后再与变量math_score以及english_score相加,而优化后就不需要再执行100 + 80操作


        images
      • 列值裁剪
        下图示例中对于people表来说,并不需要扫描它的所有列值,而只需要列值id,所以在扫描people之后需要将其他列进行裁剪,只留下列id。这个优化一方面大幅度减少了网络、内存数据量消耗,另一方面对于列存数据库来说大大提高了扫描效率


        images

    physical planning

    用物理操作算子产生一个或者多个物理计划,然后用cost模型选择一个物理计划。目前基于cost-based的优化仅仅用于选择join算子,对已知的很小的relations。sparkSql会选择使用spark的提高的点对点的广播功能实现broadcast join。
    示例
    此时就需要将逻辑执行计划转换为物理执行计划,将逻辑上可行的执行计划变为Spark可以真正执行的计划。比如join算子,Spark根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin以及SortMergeJoin等(可以将join理解为一个接口,BroadcastHashJoin是其中一个具体实现),物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现


    images

    执行计划

    使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划


    images

    相关文章

      网友评论

        本文标题:8.Spark Sql

        本文链接:https://www.haomeiwen.com/subject/tpyhxctx.html