美文网首页
Hive on spark的架构与解析SQL的过程

Hive on spark的架构与解析SQL的过程

作者: ZYJ2016 | 来源:发表于2016-09-18 09:09 被阅读0次

    一、 Hive on spark的基本架构/

    1. Hive 的架构

    Hive架构

    Hive的整体架构可以分成以下几大部分:

    1. 用户接口 支持CLI, JDBC和Web UI
    2. Driver Driver负责将用户指令翻译转换成为相应的MapReduce Job
    3. MetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎

    2. Hive on spark的架构

    Hive on Spark总体的设计思路是,尽可能重用Hive逻辑层面的功能;从生成物理计划开始,提供一整套针对Spark的实现。

    用什么算?算什么?怎么算?

    计算引擎:spark

    通过hive.execution.engine来设置计算引擎,该参数可选的值为mr、tez和spark。

    Hadoop:mr

    计算对象:以Hive的表作为RDD

    将Hive的表转化为RDD以便Spark处理。本质上,Hive的表和Spark的HadoopRDD都是HDFS上的一组文件,通过InputFormat和RecordReader读取其中的数据,因此这个转化是自然而然的。

    Hadoop : DSM
    RDD & DSM 的对比

    计算逻辑:使用Hive原语

    这里主要是指使用Hive的操作符对数据进行处理。将Hive的操作符包装为Function,然后应用到RDD上。这样,我们只需要依赖较少的几种RDD的转换,而主要的计算逻辑仍由Hive提供。

    Spark :Transformation

    Spark为RDD提供了一系列的转换(Transformation),其中有些转换也是面向SQL的,如groupByKey、join等。但如果使用这些转换(就如Shark所做的那样),就意味着我们要重新实现一些Hive已有的功能;而且当Hive增加新的功能时,我们需要相应地修改Hive on Spark模式。
    由于使用了Hive的原语,因此我们需要显式地调用一些Transformation来实现Shuffle的功能。下表中列举了Hive on Spark使用的所有转换。

    repartitionAndSortWithinPartitions功能目的是提供一种MapReduce风格的Shuffle。虽然sortByKey也提供了排序的功能,但某些情况下我们并不需要全局有序,另外其使用的Range Partitioner对于某些Hive的查询并不适用。

    元数据管理:HiveMetastoreCatalog

    HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。

    Paste_Image.png

    HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api,对Hive Library依赖。

    物理执行计划:Spark Task

    通过SparkCompiler将Operator Tree转换为Task Tree,其中需要提交给Spark执行的任务即为SparkTask。

    MapReduce:Map+Reduce的两阶段执行模式。
    Spark:DAG执行模式。

    DAG(Directed acyclic graph,有向无环图)

    在Spark作业调度系统中,调度的前提是判断多个作业任务的依赖关系,这些作业任务之间可能存在因果的依赖关系,也就是说有些任务必须先获得执行,然后相关的依赖任务才能执行,但是任务之间显然不应出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG表示。

    Spark之所以outperform Hadoop的关键有二:DAG scheduler和intermediate data in memory。Hadoop用的是AG而不是DAG。一个DAG可以包含多个AG。DAG除了可以提升scheduler效率之外,它同时是Spark Fault tolerance机制-Lineage 追溯的基础。

    因此一个SparkTask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。执行SparkTask时,就根据SparkWork所表示的DAG计算出最终的RDD,然后通过RDD的foreachAsync来触发运算。
    使用foreachAsync是因为我们使用了Hive原语,因此不需要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。

    任务监控与统计信息收集:SparkListener & Spark API & Accumulator

    Spark提供了SparkListener接口来监听任务执行期间的各种事件,因此我们可以实现一个Listener来监控任务执行进度以及收集任务级别的统计信息(目前任务级别的统计由SparkListener采集,任务进度则由Spark提供的专门的API来监控)。
    另外Hive还提供了Operator级别的统计数据信息,比如读取的行数等。

    MapReduce:Hadoop Counter
    Spark:Accumulator

    二、 Hive on spark的内部实现机制

    参考链接:

    Hive on MR:Hive SQL执行计划深度解析
    Hive on spark:Hive on Spark解析
    Spark SQL:sql的解析与执行
    Hive on spark:hive on spark实现详解

    1. Hive流程:

    • 语法分析阶段,Hive利用Antlr将用户提交的SQL语句解析成一棵抽象语法树(Abstract Syntax Tree,AST)。
    • 生成逻辑计划包括通过Metastore获取相关的元数据,以及对AST进行语义分析。得到的逻辑计划为一棵由Hive操作符组成的树,Hive操作符即Hive对表数据的处理逻辑,比如对表进行扫描的TableScanOperator,对表做Group的GroupByOperator等。
    • 逻辑优化即对Operator Tree进行优化,与之后的物理优化的区别主要有两点:一是在操作符级别进行调整;二是这些优化不针对特定的计算引擎。比如谓词下推(Predicate Pushdown)就是一个逻辑优化:尽早的对底层数据进行过滤以减少后续需要处理的数据量,这对于不同的计算引擎都是有优化效果的。
    • 生成物理计划即针对不同的引擎,将Operator Tree划分为若干个Task,并按照依赖关系生成一棵Task的树(在生成物理计划之前,各计算引擎还可以针对自身需求,对Operator Tree再进行一轮逻辑优化)。比如,对于MapReduce,一个GROUP BY+ORDER BY的查询会被转化成两个MapReduce的Task,第一个进行Group,第二个进行排序。
    • 物理优化则是各计算引擎根据自身的特点,对Task Tree进行优化。比如对于MapReduce,Runtime Skew Join的优化就是在原始的Join Task之后加入一个Conditional Task来处理可能出现倾斜的数据。
    • 最后按照依赖关系,依次执行Task Tree中的各个Task,并将结果返回给用户。每个Task按照不同的实现,会把任务提交到不同的计算引擎上执行。

    2. Hive on spark解析SQL的过程

    SQL

    SQL语句在分析执行过程中会经历下图所示的几个步骤

    1. 语法解析
    2. 操作绑定
    3. 优化执行策略
    4. 交付执行

    语法解析

    语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。

    策略优化

    形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。

    以大家熟悉的join操作为例,下图给出一个join优化的示例。A JOIN B等同于B JOIN A,但是顺序的调整可能给执行的性能带来极大的影响,下图就是调整前后的对比图。

    在Hash Join中,首先被访问的表称之为“内部构建表”,第二个表为“探针输入”。创建内部表时,会将数据移动到数据仓库指向的路径;创建外部表,仅记录数据所在的路径。

    再举一例,一般来说尽可能的先实施聚合操作(Aggregate)然后再join

    这种优化自动完成,在调优时不需要考虑。

    HQL

    HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。
    既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。

    Entrypoint

    hiveql是整个的入口点

      def hiveql(hqlQuery: String): SchemaRDD = {
        val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
        // We force query optimization to happen right away instead of letting it happen lazily like
        // when using the query DSL.  This is so DDL commands behave as expected.  This is only
        // generates the RDD lineage for DML queries, but does not perform any execution.
        result.queryExecution.toRdd
        result
      }
    

    上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参。

    对比:

    sql函数的定义如下

      def sql(sqlText: String): SchemaRDD = {
        val result = new SchemaRDD(this, parseSql(sqlText))
    
        result.queryExecution.toRdd
        result
      }
    

    HiveQL, parser

    parseSql的函数定义如代码所示,解析过程中将指令分成两大类:

    • nativecommand 非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成

    • 非nativecommand 主要是select语句

    def parseSql(sql: String): LogicalPlan = {
        try {
          if (sql.toLowerCase.startsWith("set")) {
            NativeCommand(sql)
          } else if (sql.toLowerCase.startsWith("add jar")) {
            AddJar(sql.drop(8))
          } else if (sql.toLowerCase.startsWith("add file")) {
            AddFile(sql.drop(9))
          } else if (sql.startsWith("dfs")) {
            DfsCommand(sql)
          } else if (sql.startsWith("source")) {
            SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
          } else if (sql.startsWith("!")) {
            ShellCommand(sql.drop(1))
          } else {
            val tree = getAst(sql)
    
            if (nativeCommands contains tree.getText) {
              NativeCommand(sql)
            } else {
              nodeToPlan(tree) match {
                case NativePlaceholder => NativeCommand(sql)
                case other => other
              }
            }
          }
        } catch {
          case e: Exception => throw new ParseException(sql, e)
          case e: NotImplementedError => sys.error(
            s"""
              |Unsupported language features in query: $sql
              |${dumpTree(getAst(sql))}
            """.stripMargin)
        }
      } 
    

    哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量。

    对于非nativeCommand,最重要的解析函数就是nodeToPlan

    Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。

    3. SQL到Spark作业的转换过程

    native command的执行流程

    由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下

    SparkTask的生成和执行

    我们通过一个例子来看一下一个简单的两表JOIN查询如何被转换为SparkTask并被执行。下图左半部分展示了这个查询的Operator Tree,以及该Operator Tree如何被转化成SparkTask;右半部分展示了该SparkTask执行时如何得到最终的RDD并通过foreachAsync提交Spark任务。

    SparkCompiler遍历Operator Tree,将其划分为不同的MapWork和ReduceWork。

    MapWork为根节点,总是由TableScanOperator(Hive中对表进行扫描的操作符)开始;后续的Work均为ReduceWork。ReduceSinkOperator(Hive中进行Shuffle输出的操作符)用来标记两个Work之间的界线,出现ReduceSinkOperator表示当前Work到下一个Work之间的数据需要进行Shuffle。因此,当我们发现ReduceSinkOperator时,就会创建一个新的ReduceWork并作为当前Work的子节点。包含了FileSinkOperator(Hive中将结果输出到文件的操作符)的Work为叶子节点。

    与MapReduce最大的不同在于,我们并不要求ReduceWork一定是叶子节点,即ReduceWork之后可以链接更多的ReduceWork,并在同一个SparkTask中执行。

    从该图可以看出,这个查询的Operator Tree被转化成了两个MapWork和一个ReduceWork。

    执行SparkTask步骤:

    1. 根据MapWork来生成最底层的HadoopRDD,
    2. 将各个MapWork和ReduceWork包装成Function应用到RDD上。
    3. 在有依赖的Work之间,需要显式地调用Shuffle转换,具体选用哪种Shuffle则要根据查询的类型来确定。另外,由于这个例子涉及多表查询,因此在Shuffle之前还要对RDD进行Union。
    4. 经过这一系列转换后,得到最终的RDD,并通过foreachAsync提交到Spark集群上进行计算。

    toRdd

    在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素

    override lazy val toRdd: RDD[Row] =
          analyzed match {
            case NativeCommand(cmd) =>
              val output = runSqlHive(cmd)
    
              if (output.size == 0) {
                emptyResult
              } else {
                val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
                sparkContext.parallelize(asRows, 1)
              }
            case _ =>
              executedPlan.execute().map(_.copy())
          }
    

    相关文章

      网友评论

          本文标题:Hive on spark的架构与解析SQL的过程

          本文链接:https://www.haomeiwen.com/subject/gbbwettx.html