美文网首页spark||flink||scala
数据治理篇-元数据-血缘分析: queryparser概述

数据治理篇-元数据-血缘分析: queryparser概述

作者: larluo_罗浩 | 来源:发表于2020-06-07 16:07 被阅读0次

    前言

    1. 数据字典 DataDictionary
    2. 数据血缘 DataLineage
    3. 元数据触发器 MetaTrigger

    一. 血缘分析的导推形式

    1. sql
    2. kafka streams
    3. spark rdd
    4. flink datastream

    二. 血缘分析的技术方案分析。

    1. 通过调度器反向推导血缘关系。
    2. 通过计算引擎系统提供的血缘分析接口进行收集。
    3. 通过计算引擎系统的解析过程源码进行提取
    4. 通用的sql解析器工具

    三. queryparser内功心法.

    1. 语法解析过程
    2. 信息传播过程
    3. 特定的计算模型
    4. 如何新增新的数据库方言。

    前言

    最近在元数据的项目建设中,主要涉及了三方面的基础工作分析。

    • 数据字典 DataDictionary
      数据字典算是元数据的最基础功能,相当于元数据模型里面的实体。可以进行信息的查询及搜索,第三方API接口使用。

      目前最核心的实体包括三块:用户(User),数据集(Dataset),
      数据管道(Datapipe)。

      [用户User实体] 可以通过ldap集成,可以用于关联元数据实体及后期资产管理,甚至集成后期的数据安全建设。

      [数据集Dataset实体] 则是不同数据源的模式,可以额外包含很多细节的选项,包括样例数据,存储开销,健康度(字段描述完整性,后期集成数据加载质量检测)。

      [数据导管Datapipe实体] 数据导管主要关注数据流通过程,分为几类。最常用的是sql,其实可以是kafka streams, spark rdd之类的代码。这里的数据导管属于静态资源。

    • 数据血缘 DataLineage
      前面定义了数据实体,接着就要定义数据关系。

      [User实体]关系由于重要度不太高,这里不做过多关注,留于后续考虑,甚至需要其它关系建立之后会有更多意义。

      [Dataset实体]与[Datapipe实体]之间的关系主要由血缘分析这个过程完成。
      我们定义这个数据关系为有向图导管DagConduit。
      如果大家熟悉haskell conduit库的话,就会很自然想到conduit由pipe而来。

      DagConduit由Datapipe 推导而来,而Datapipe是依赖于Dataset。
      这样每个DagConduit形成了一个局部关系图:
      [Input Dataset] -> Datapipe -> [Output Dataset]
      每个InputDataset又依赖于Datapipe,这样每个DagConduit输出自己的关系,在图数据库存储里面进行关联就会形成完整的dag关系图了。

      这里的DagConduit就是我们今天的主角,下一节讲会重点讲解。

    • 元数据触发器 MetaTrigger
      这里的主要意图是完成元数据驱动编程的思想。
      当前主要考虑的点则是,通过血缘分析生成dag信息去触发airflow调度器系统。
      当然元数据驱动的使用范围非常广泛,也是非常核心的内容。
      后续会做更多的探索及进一步分享与交流。

    一. 血缘分析的导推形式

    血缘分析在这里主要是我们前文所说的DagConduit的推导过程。这个推导过程很大依赖于Datapipe。

    因为元数据是个通用平台,可推导的范围还是非常多的,我们这里仅从实用角度考虑并介绍。

    常见的etl过程主要分为sql, kafka streams, spark rdd, flink datastream,这几个。

    sql是最常用的Datapipe,但是其变种也是最多的。
    kafka streams, spark rdd, flink datastream则是另一种形式,会生成相应的topology相关的lineage,由于灵活度不高,可以使用对应的库进行提取。

    这里我们重点关注sql血缘分析,因为sql这种dsl是最有表达力,最符合元数据模型的。
    甚至kafka, spark, flink也绝大多数会提供sql接口。

    由于本人大多时间关注于sql推导模型,未对其它模型做过多探索,将于后续交流及使用后补充相关内容。

    二. 血缘分析的技术方案分析。

    我们常见的血缘分析实现方式主要分为四种:

      1. 通过调度器反向推导血缘关系。

      因为调度器跟DagConduit有着很密切的关系。我们很容易从调度器的每个DAG里面提取出对应的Datapipe以及它们的关系。

      这种方案的可行性非常高,成本非常廉。但缺点也是很显而易见的,这种方案不能支持字段级别的血缘,而字段关系对于元数据非常重要,甚至是后续扩展数据质量及数据安全的重要思路之一。并且对于业务方面的影响分析也是非常有价值意义的。

      1. 通过计算引擎系统提供的血缘分析接口进行收集。

      由于数据治理的重要性逐渐为人所认识,血缘分析功能也成为了etl工具箱的重要一环。所以hive就直接提供了血缘分析接口。

      这种方式的优点是显而易成的,即支持了字段级别血缘分析,成本也是非常低的,对于大部分公司其实是够用的。但是缺点也是非常明显的。

      因为计算引擎与血缘分析在一起,如果血缘分析的功能需要增加,计算引擎势必也要受到牵连。毕竟字段血缘分析在很多情况下,很多场景下并不是标准统一的。比如select a from t where b = 'c'。这里的a字段到底需不需要依赖于b的值呢?这里不同的需求可能找到的答案并不会一致。如果需要更一步进行统计信息获取后进行相关的血缘扩展分析,也是非常困难的,因为接口是非常固定有限的。

      另外血缘分析在运行时执行,这样血缘分析已经与系统运行时有了直接的绑定关系。所以无法进行血缘分析驱动,将一个系统换成另一个系统。如果是sql解析出来的血缘分析,我们可以通过元数据触发器触发到各种各样的平台,是非常灵活的。

      当然,这些弱点对于实际使用来说并不是特别致命。毕竟成本低,且简单可用是非常有吸引力的,系统耦合之类的事情虽然比较烂,但先用起来再说,无可厚非,所以atlas也是这么干的。所以我坚信atlas可以解决问题,但绝不是未来。并且这里面还隐藏着另一个大的隐患,并不是所有etl工具箱都会给你提供血缘接口,原因很明显。对于成熟的产品,绝不会让血缘分析时与计算引擎绑定在一起受波折。

      所以,sql血缘解析才是未来。

      1. 通过计算引擎系统的解析过程源码进行提取

      这个应该是最常用的做法了,成本虽然高一点,但是整体可控,毕竟基础代码完成了很大一部分功能了,还可以按需定制,对于个人成长及公司的定制化需求都是非常令人满意的。毕竟互联网公司大部分使用开源产品,开源的世界任你折腾。

      这里面困惑其实也不少。如果是商业产品,如果商家没有提供解析器,基本上就非常无奈了。对于每套计算引擎各自的规则可能不一样,致使门槛非常高了,甚至出现了专用的商业公司去统一解决这个问题。甚至对于字段级血缘提取,代码的功力也是非常有要求的。并且开源软件为了自己的一些内部功能,在代码里面渗杂了过多的东西,致使学习成本可以吓退不少人了。

    1. 通用的sql解析器工具
      由于前面的问题,所以大家都想要一套通用的sql解析工具去专门解决sql分析的问题。但是这个里面涉及一个非常严重的问题,就是成熟度。由于sql解析过程有非常大的工作量,需要对语法规则非常熟练,写起来繁杂的工作量及系统多样性很容易让人退缩。所以市面上一直找不到成熟的通用的sql解析器。

      所以这里面就回归到了我们的主角,queryparser由此而生。queryparser是uber公司开发的,成熟的多年应用到了生产系统。目前支持hive, queryparser, vertica, teradata。对于pg系统,有由专用的gpu数据库公司SQream员工开发的解析器hssqlppp。这两套系统采用通用的解析器内核parsec。

      那么在这里放出地址给各位:

      这个queryparser强大在哪里呢?
      一看吓死人,核心代码只要7000多行。而sql结构定义就占了3000多行,也就是说逻辑只有4000行左右,擅抖吧,奥利给!

    [nix-shell:~/my-repo/queryparser/src]$ cloc .
          20 text files.
          20 unique files.                              
           0 files ignored.
    
    github.com/AlDanial/cloc v 1.84  T=0.08 s (251.9 files/s, 121135.2 lines/s)
    -------------------------------------------------------------------------------
    Language                     files          blank        comment           code
    -------------------------------------------------------------------------------
    Haskell                         20           1724            562           7332
    -------------------------------------------------------------------------------
    SUM:                            20           1724            562           7332
    -------------------------------------------------------------------------------
    
    [nix-shell:~/my-repo/queryparser/src]$ find . -name "*.hs" | xargs -I {} wc -l {}  | sort -nr
    2334 ./Database/Sql/Type/Query.hs
    1125 ./Database/Sql/Type.hs
    836 ./Database/Sql/Util/Scope.hs
    647 ./Database/Sql/Type/Names.hs
    641 ./Database/Sql/Type/Schema.hs
    505 ./Database/Sql/Util/Columns.hs
    438 ./Database/Sql/Util/Tables.hs
    403 ./Database/Sql/Util/Eval.hs
    389 ./Database/Sql/Info.hs
    383 ./Database/Sql/Util/Joins.hs
    372 ./Database/Sql/Util/Lineage/ColumnPlus.hs
    372 ./Database/Sql/Type/Scope.hs
    361 ./Database/Sql/Util/Schema.hs
    216 ./Database/Sql/Util/Eval/Concrete.hs
    159 ./Database/Sql/Util/Lineage/Table.hs
    147 ./Database/Sql/Type/TableProps.hs
    109 ./Database/Sql/Position.hs
    66 ./Database/Sql/Pretty.hs
    64 ./Database/Sql/Helpers.hs
    51 ./Database/Sql/Type/Unused.hs
    
    

    三. queryparser内功心法.

    平复一下内心的平静,让我们来了解一上queryparser到底是怎么实现的。

    queryparser包含了核心逻辑的实现(大部分就是标准sql逻辑),每个方言有各自特写的扩展,我们这里以queryparser-hive为例。

    queryparser-hive将一些特定sql的功能处理掉,核心功能交给 queryparser来处理。由于我们接触的大部分是标准sql,我们在这里分析标准的sql语句,在后续源码篇里面我们会进一步介绍详细介绍queryparser-hive与queryparser的整体过程。

    整个解析过程分为三个步骤:

    1. parseManyAll 语法解析过程
      将文本打碎(词法分析)并组装成结构化(语法分析)
    2. resolveHiveStatement 信息传播过程
      遍历数据结构,进行外部的catalog关联,以及相应信息的传递,属于通用的信息加工逻辑阶段。
    3. getColumnLineage 特定的计算模型
      计算模型分为很多种,有各自的需求定义。在提供的接口中,用户实现特定的逻辑完成所需的功能。
      queryparser提供了两种模型,一种是字段血缘模型,一种是数据计算模型。当然我们可以随意实现接口定制自己的模型。
      比如通过实现不同的sql逻辑处理方法将sql语句转换成对应的http restful请求。
      一个类似的功能可参照:
    SELECT status, content_type, content::json->>'data' AS data
      FROM http_patch('http://httpbin.org/patch', '{"this":"that"}', 'application/json');
    

    https://github.com/pramsey/pgsql-http

    1. 语法解析过程

    语法解析常用的实现分为两种:
    一种是parser generator,就是你写好规则,自动帮你生成解析代码,比如常见的antlr就是。这种方式的特写是性能高,效率高。缺点则是定制化扩展弱,异常信息非常难读懂。
    另一种是parser combinator,这种就是你自己手写语法规则,非常灵活,效率会低一点,可控性比较强。

    queryparser就是使用了parser combinator方式进行解析,所以我们可以自由定制。

    语法解析的目地是生成AST,简单来说就是一个递归的数据结构,有点类似json。在语法解析过程中要做的工作就是,定义数据结构,然后把信息塞进去。其实难度并不大,但是由于sql规则的复杂度,整个工作量并不轻松。

    语法规则的过程分为两部分:词法解析及语法解析。
    为什么需要词法解析呢?因为sql跟平常我们所说的csv文件并不一样,支持可以使用分隔符来定义结构。有些东西在不同的环境下有不同的意义。比如任何事物在注释里面它就没有效果了,里面可能还有变量声明,关键字信息等。

    所以词法分析器就是按sql的基本单元进行拆分,然后语法分析器将其组装起来形成递归的结构体。为了便于理解,我们给出实际代码里面的词法结构及逻辑片段。

    https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Token.hs

    data Token = TokWord !Bool !Text
               | TokString !ByteString
               | TokNumber !Text
               | TokSymbol !Text
               | TokVariable !Text VariableName  -- the Text is for namespace, the
                                                 -- Token is the param name which
                                                 -- may be another TokVariable!
               | TokError !String
                 deriving (Show, Eq)
    

    可以看出,hive的基本单元分为了TokWord(通用名称), TokString(字符), TokNumber(数字), TokSymbol(符号), TokVariable(变量)

    这个解析的过程叫做scanner,就是常用的字符处理生成Token流。
    https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Scanner.hs

    有了Token流,我们进行组合生成AST。
    那么我们可以看HIVE AST的结构定义:
    https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Type.hs

    data HiveStatement r a = HiveStandardSqlStatement (Statement Hive r a)
                             | HiveUseStmt (Use a)
                             | HiveAnalyzeStmt (Analyze r a)
                             | HiveInsertDirectoryStmt (InsertDirectory r a)
                             | HiveTruncatePartitionStmt (TruncatePartition r a)
                             | HiveAlterTableSetLocationStmt (AlterTableSetLocation r a)
                             | HiveAlterPartitionSetLocationStmt (AlterPartitionSetLocation r a)
                             | HiveSetPropertyStmt (SetProperty a)
                             | HiveUnhandledStatement a
    
    

    然后hive的HiveStandardSqlStatement包包含的Statement定义在queryparser核心结构里面,由于sql结构定义query部分比较多,单独拎出来形成了两个文件:

    https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type.hs

    data Statement
        d -- sql dialect
        r -- resolution level (raw or resolved)
        a -- per-node parameters - typically Range or ()
            = QueryStmt (Query r a)
            | InsertStmt (Insert r a)
            | UpdateStmt (Update r a)
            | DeleteStmt (Delete r a)
            | TruncateStmt (Truncate r a)
            | CreateTableStmt (CreateTable d r a)
            | AlterTableStmt (AlterTable r a)
            | DropTableStmt (DropTable r a)
            | CreateViewStmt (CreateView r a)
            | DropViewStmt (DropView r a)
            | CreateSchemaStmt (CreateSchema r a)
            | GrantStmt (Grant a)
            | RevokeStmt (Revoke a)
            | BeginStmt a
            | CommitStmt a
            | RollbackStmt a
            | ExplainStmt a (Statement d r a)
            | EmptyStmt a
    
    

    https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type/Query.hs

    data Query r a
        = QuerySelect a (Select r a)
        | QueryExcept a (ComposedQueryColumns r a) (Query r a) (Query r a)
        | QueryUnion a Distinct (ComposedQueryColumns r a) (Query r a) (Query r a)
        | QueryIntersect a (ComposedQueryColumns r a) (Query r a) (Query r a)
        | QueryWith a [CTE r a] (Query r a)
        | QueryOrder a [Order r a] (Query r a)
        | QueryLimit a (Limit a) (Query r a)
        | QueryOffset a (Offset a) (Query r a)
    
    

    接下来就是把token组装起来。
    由于每个方言组装逻辑语法规则不一样,所以各自有自己的实现,核心接口并未提供。

    https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Parser.hs

    statementParser :: Parser (HiveStatement RawNames Range)
    statementParser = do
        maybeStmt <- optionMaybe $ choice
            [ HiveUseStmt <$> useP
            , HiveAnalyzeStmt <$> analyzeP
            , do
                  let options =
                        -- this list is hive-specific statement types that may be
                        -- preceded by an optional `WITH` and an optional inverted
                        -- `FROM`
                        [ (void insertDirectoryPrefixP, fmap HiveInsertDirectoryStmt . insertDirectoryP)
                        ]
                      prefixes = map fst options
                      baseParsers = map snd options
                  _ <- try $ P.lookAhead $ optional withP >> invertedFromP >> choice prefixes
                  with <- option id withP
                  invertedFrom <- invertedFromP
                  let parsers = map ($ (with, invertedFrom)) baseParsers
                  choice $ parsers
            , try $ HiveTruncatePartitionStmt <$> truncatePartitionStatementP
            , HiveUnhandledStatement <$> describeP
            , HiveUnhandledStatement <$> showP
            , do
                  _ <- try $ P.lookAhead createFunctionPrefixP
                  HiveUnhandledStatement <$> createFunctionP
            , do
                  _ <- try $ P.lookAhead dropFunctionPrefixP
                  HiveUnhandledStatement <$> dropFunctionP
            , HiveStandardSqlStatement <$> statementP
            , try $ HiveAlterTableSetLocationStmt <$> alterTableSetLocationP
            , try $ HiveUnhandledStatement <$> alterTableSetTblPropertiesP
            , alterPartitionP
            , HiveSetPropertyStmt <$> setP
            , HiveUnhandledStatement <$> reloadFunctionP
            ]
        case maybeStmt of
            Just stmt -> terminator >> return stmt
            Nothing -> HiveStandardSqlStatement <$> emptyStatementP
      where
        terminator = (Tok.semicolonP <|> eof) -- normal statements may be terminated by `;` or eof
        emptyStatementP = EmptyStmt <$> Tok.semicolonP  -- but we don't allow eof here. `;` is the
        -- only way to write the empty statement, i.e. `` (empty string) is not allowed.
    

    解析过程并不复杂,就是按结构体深度递归,跟json解析差不了多少。
    整个过程就是,至上而下解析,解析一项,如果失败了则尝试其它项,深度递归下去。
    体力活比较重,因为规则多,每一项都要写规则尝试匹配。
    就这样,整个结构体就组装起来了。

    2. 信息传播过程

    其实queryparser的信息传播过程非常简单,就是单纯从catalog里面去查询表信息,关联到表生成的字段,然后将以前的表名转换成表信息,生成引用信息。

    https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type/Scope.hs

    具体内容我们可以看前后结果对比

    data RawNames
    deriving instance Data RawNames
    instance Resolution RawNames where
        type TableRef RawNames = OQTableName
        type TableName RawNames = OQTableName
        type CreateTableName RawNames = OQTableName
        type DropTableName RawNames = OQTableName
        type SchemaName RawNames = OQSchemaName
        type CreateSchemaName RawNames = OQSchemaName
        type ColumnRef RawNames = OQColumnName
        type NaturalColumns RawNames = Unused
        type UsingColumn RawNames = UQColumnName
        type StarReferents RawNames = Unused
        type PositionExpr RawNames = Unused
        type ComposedQueryColumns RawNames = Unused
    
    instance Resolution ResolvedNames where
        type TableRef ResolvedNames = RTableRef
        type TableName ResolvedNames = RTableName
        type CreateTableName ResolvedNames = RCreateTableName
        type DropTableName ResolvedNames = RDropTableName
        type SchemaName ResolvedNames = FQSchemaName
        type CreateSchemaName ResolvedNames = RCreateSchemaName
        type ColumnRef ResolvedNames = RColumnRef
        type NaturalColumns ResolvedNames = RNaturalColumns
        type UsingColumn ResolvedNames = RUsingColumn
        type StarReferents ResolvedNames = StarColumnNames
        type PositionExpr ResolvedNames = Expr ResolvedNames
        type ComposedQueryColumns ResolvedNames = ColumnAliasList
    
    data RTableName a = RTableName (FQTableName a) SchemaMember
        deriving (Generic, Data, Eq, Ord, Show, Functor, Foldable, Traversable)
    
    data SchemaMember = SchemaMember
        { tableType :: TableType
        , persistence :: Persistence ()
        , columnsList :: [UQColumnName ()]
        , viewQuery :: Maybe (Query ResolvedNames ())  -- this will always be Nothing for tables
        } deriving (Generic, Data, Eq, Ord, Show)
    
    
    

    可以看到,sql解析之前表名,列名信息就是简单的字符OQTableName, UQColumnName。
    解析之后就映射成了实际的各种形式。resolve出的RTableName就包含了从catalog里面查询出来的表的所有列员。
    这个信息可以用来解析insert into时不带字段名,以及select *时的一些模糊逻辑。
    当然本人觉得如果sql够标准的话,表的字段信息是完成可以从传播过程中自动推导的,而不用依赖于外部提供的catalog模式信息,这个也是本人正在尝试加强信息传播过程中优化的方向之一。

    整个resolve过程代码:
    https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Scope.hs

    由于是概述篇,就不展开讲述,后续将在源码篇里详细介绍。

    3. 特定的计算模型

    有了前面两部分过程之后,我们的结构体经过遍历关联处理有了更详细的信息。
    最后一步则是通过这个结构体去做对应的计算。
    因为整个遍历过程跟处理过程,相似度比较高。
    所以queryparser提供了一个标准计算模型,应用则可以定制化特定部分进行逻辑处理, 只需要编写对应的函数,不需要自己去编写整个过程。
    当然如果你有兴趣,自己定义重写或者扩展一套计算模型也是非常方便的。
    我们可以看一下这个计算逻辑的定义:

    class (Monad (EvalRow e), Monad (EvalMonad e), Traversable (EvalRow e)) => Evaluation e where
        type EvalValue e :: *
        type EvalRow e :: * -> *
        type EvalMonad e :: * -> *
        addItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
        removeItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
        unionItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
        intersectItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
        distinctItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e]
        offsetItems :: Proxy e -> Int -> RecordSet e -> RecordSet e
        limitItems :: Proxy e -> Int -> RecordSet e -> RecordSet e
        filterBy :: Expr ResolvedNames Range -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
        inList :: EvalValue e -> [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        inSubquery :: EvalValue e -> EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        existsSubquery :: EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        atTimeZone :: EvalValue e -> EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleConstant :: Proxy e -> Constant a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleCases :: Proxy e -> [(Expr ResolvedNames Range, Expr ResolvedNames Range)] -> Maybe (Expr ResolvedNames Range) -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleFunction :: Proxy e -> FunctionName Range -> Distinct -> [Expr ResolvedNames Range] -> [(ParamName Range, Expr ResolvedNames Range)] -> Maybe (Filter ResolvedNames Range) -> Maybe (OverSubExpr ResolvedNames Range) -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleGroups ::  [RColumnRef ()] -> EvalRow e ([EvalValue e], EvalRow e [EvalValue e]) -> EvalRow e (RecordSet e)
        handleLike :: Proxy e -> Operator a -> Maybe (Escape ResolvedNames Range) -> Pattern ResolvedNames Range -> Expr ResolvedNames Range -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleOrder :: Proxy e -> [Order ResolvedNames Range] -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
        handleSubquery :: EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleJoin :: Proxy e -> JoinType a -> JoinCondition ResolvedNames Range -> RecordSet e -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
        handleStructField :: Expr ResolvedNames Range -> StructFieldName a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        handleTypeCast :: CastFailureAction -> Expr ResolvedNames Range -> DataType a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        binop :: Proxy e -> TL.Text -> Maybe (EvalValue e -> EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e))
        unop :: Proxy e -> TL.Text -> Maybe (EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e))
    
    

    queryparser定义了两种计算模型:

      1. 数据计算模型
        也就是说可以真实通过在数据上运行sql上跑出结果。当然这个里面的逻辑并不是非常优化的,没有做rbo或者cbo相关的优化,并且逻辑实现比较简单。所以不适合海量数据运算,则提供基础实现版本。

    https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Eval/Concrete.hs

    这里面的细节由于跟本文关联不大,就不一一介绍了,如有兴趣,可以源码篇作更进一步介绍。

    字段血缘模型就是用得比较多的了,简单来讲呢,依旧是深度遍历到列级别,然后组装起来。

    我们可以简单看看计算模型的组装过程分为哪些。
    https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Eval.hs

    最基础的内容是常量与(列)表达式,表结构

    instance Evaluation e => Evaluate e (Constant a) where
        type EvalResult e (Constant a) = EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
        eval p constant = handleConstant p constant
    
    instance Evaluation e => Evaluate e (Expr ResolvedNames Range) where
        type EvalResult e (Expr ResolvedNames Range) = EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
    ...
        eval _ (ColumnExpr _ col) = do
            row <- asks evalRow
            case M.lookup (void col) row of
                Just x -> pure x
                Nothing -> throwError $ "failure looking up column: " ++ show (void col) ++ " in " ++ show (M.keys row)
    ...
    
    instance Evaluation e => Evaluate e (Tablish ResolvedNames Range) where
        type EvalResult e (Tablish ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
    ...
        eval _ (TablishTable _ _ (RTableRef tableName table)) = asks evalFromTable <*> pure (RTableName tableName table) >>= \case
            Nothing -> throwError $ "missing table: " ++ show (void tableName)
            Just result -> pure result
        eval _ (TablishTable _ _ (RTableAlias (TableAlias _ aliasName alias))) = asks (M.lookup alias . evalAliasMap) >>= \case
            Nothing -> throwError $ "missing table alias: " ++ show aliasName
            Just result -> pure result
    ...
    

    表结构向上传播到TablishJoin , TablishLateralView , SelectFrom

    列表达式通过表达式一层层往上计算。
    然后到达SelectWhere, JoinCondition, SelectTimeseries , SelectGroup, SelectHaving

    然后组合形成了基本的select语句

    instance Evaluation e => Evaluate e (Select ResolvedNames Range) where
        type EvalResult e (Select ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
        eval p Select{..} = do
            -- nb. if we handle named windows at resolution time (T637160)
            -- then we shouldn't need to do anything with them here
            unfiltered <- maybe (pure $ emptyRecordSet p) (eval p) selectFrom
            filtered <- maybe pure (eval p) selectWhere unfiltered
            interpolated <- maybe pure (eval p) selectTimeseries filtered
            groups <- maybe (const $ pure . pure) (eval p) selectGroup selectCols interpolated
            having <- maybe pure (eval p) selectHaving groups
            records <- mapM (eval p selectCols) having
            let rows = recordSetItems =<< records
                labels = map void $ selectionNames =<< selectColumnsList selectCols
                indistinct = makeRecordSet p labels rows
    
            pure $ case selectDistinct of
                 Distinct True -> indistinct { recordSetItems = distinctItems p $ recordSetItems indistinct }
                 Distinct False -> indistinct
    

    然后加上CTE功能就形成了完整的Query,然后在SelectFrom里面可以递归到这个Query进行递归组合。

    instance Evaluation e => Evaluate e (Query ResolvedNames Range) where
        type EvalResult e (Query ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
        eval p (QuerySelect _ select) = eval p select
        eval p (QueryExcept _ (ColumnAliasList cs) lhs rhs) = do
            exclude <- recordSetItems <$> eval p rhs
            RecordSet{recordSetItems = unfiltered, ..} <- eval p lhs
            let labels = map (RColumnAlias . void) cs
            makeRecordSet p labels <$> removeItems p exclude unfiltered
    
        eval p (QueryUnion _ (Distinct False) (ColumnAliasList cs) lhs rhs) = do
            RecordSet{recordSetItems = lhsRows, ..} <- eval p lhs
            RecordSet{recordSetItems = rhsRows} <- eval p rhs
            let labels = map (RColumnAlias . void) cs
            makeRecordSet p labels <$> unionItems p lhsRows rhsRows
    
        eval p (QueryUnion info (Distinct True) cs lhs rhs) = do
            result@RecordSet{recordSetItems} <- eval p (QueryUnion info (Distinct False) cs lhs rhs)
            pure $ result{recordSetItems = distinctItems p recordSetItems}
    
        eval p (QueryIntersect _ (ColumnAliasList cs) lhs rhs) = do
            RecordSet{recordSetItems = litems, ..} <- eval p lhs
            ritems <- recordSetItems <$> eval p rhs
            let labels = map (RColumnAlias . void) cs
            makeRecordSet p labels <$> intersectItems p litems ritems
    
        eval p (QueryWith _ [] query) = eval p query
        eval p (QueryWith info (CTE{..}:ctes) query) = do
            RecordSet{..} <- eval p cteQuery
            columns <- override cteColumns recordSetLabels
            let result = makeRecordSet p columns recordSetItems
            introduceAlias p (void cteAlias) result $ eval p $ QueryWith info ctes query
          where
            override [] ys = pure ys
            override (alias:xs) (_:ys) = do
                ys' <- override xs ys
                pure $ (RColumnAlias $ void alias) : ys'
            override _ [] = throwError "more aliases than columns in CTE"
    
        eval p (QueryLimit _ limit query) = eval p limit <$> eval p query
        eval p (QueryOffset _ offset query) = eval p offset <$> eval p query
        eval p (QueryOrder _ orders query) = eval p query >>= handleOrder p orders
    

    当然这个过程我们会在源码篇详细讲解,对于普通读者,可以了解一下大致的计算过程。
    整个过程有个EvalContext,
    evalAliasMap里面会记录表别名对应的信息,
    evalFromTable里面会记录表对应的信息,
    evalRow里面会对应字段对应的信息

    data EvalContext e = EvalContext
        { evalAliasMap :: Map TableAliasId (RecordSet e)
        , evalFromTable :: RTableName Range -> Maybe (RecordSet e)
        , evalRow :: Map (RColumnRef ()) (EvalValue e)
        }
    
    data RecordSet e = RecordSet
        { recordSetLabels :: [RColumnRef ()]
        , recordSetItems :: EvalRow e [EvalValue e]
        }
    

    这个信息在字段血缘分析的过程中表体表现为ColumnPlusSet:

    instance Evaluation ColumnLineage where
        type EvalValue ColumnLineage = ColumnPlusSet
        type EvalRow ColumnLineage = Writer ColumnPlusSet
        type EvalMonad ColumnLineage = Identity
    
    data ColumnPlusSet = ColumnPlusSet
        { columnPlusColumns :: Map FQCN (Map FieldChain (Set Range))
        , columnPlusTables :: Map FQTN (Set Range)
        } deriving (Eq, Show)
    
    

    ColumnPlusSet的主要信息就是相关的列信息与表信息。

    type ColumnLineagePlus = Map (Either FQTN FQCN) ColumnPlusSet
    

    最终解析出来的结果有两种形式:
    对于每个表FQTN, 有对应的ColumnPlusSet(表依赖集及字段依赖集)
    对于表的每个字段FQCN,有对应的ColumnPlusSet(表依赖集及字段依赖集)

    有人可能奇怪了,在什么时候,字段会依赖于表?
    这个在select count(1) as cnt from b的时候cnt就是依赖于表的。当然其它情况,我会进一步整理后补充。

    4. 如何新增新的数据库方言。

    我们前面讲过,整个血缘分析分为三部分。
    第一部分sql解析的这个词法解析及语法解析比较依赖于sql的语法规则,所以需要手写,难度并不大,只是工作量比较大。比如支持存储过程之类的也是语法规则问题。
    第二部分信息传播,基本上是通用的。对于除标准sql以外的信息,我们大部分不需要处理,特定的需要处理的工作量也非常少,可以简单抄抄改改。比如支持存储过程之类的大部分也是sql逻辑处理后组合起来。
    第三部分计算模型,也基本上是通用的,由于对于血缘分析来说,所涉及到关注面会更少,所需要改动是最小的。。。

    5. SQL解析潮流从此开始

    所以我们可以看到,queryparser仅仅就干好了一件事情,就几千行代码,并且干得简单。扩展性也很强。
    当然我们也注意到了,它的整个列信息推理过程比较原始,直接查询了catalog,但是也是非常方便后续扩展的。

    它的计算模型对比其它解析器是非常大的一个亮点,你可以基于各种方言写支持各种生态的扩展。只需要将对应方言规则的sql解析后,实现对应计算模型里的方法即可,给了我们相当大的想象力。
    你可以配置sql去做http restful递归请求,配置sql去做etl数据加载。。。

    相关文章

      网友评论

        本文标题:数据治理篇-元数据-血缘分析: queryparser概述

        本文链接:https://www.haomeiwen.com/subject/gkqctktx.html