美文网首页hive
flamy graph源码分析

flamy graph源码分析

作者: Choooooper | 来源:发表于2017-08-29 10:33 被阅读64次

    hql的执行过程

    hive在执行用户给定的hql时,会经过如下步骤:

    1. 语法解析

    Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象 语法树AST Tree;

    1. 语义解析

    遍历AST Tree,抽象出查询的基本组成单元QueryBlock;
    生成逻辑执行计划:遍历QueryBlock,翻译为执行操作树OperatorTree;

    1. 优化逻辑执行计划

    逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量;

    1. 生成物理执行计划

    遍历OperatorTree,翻译为MapReduce任务;

    1. 优化物理执行计划

    物理层优化器进行MapReduce任务的变换,生成最终的执行计划;

    Hive SQL的编译过程

    hive sql是如何被编译成mapreduce的,参考https://tech.meituan.com/hive-sql-to-mapreduce.html

    使用hive API获取hql的执行计划、lineage关系

    参考http://lxw1234.com/archives/2015/09/476.htm

    最后也给了一个获取hql的lineage关系的小示例,

    flamy hive graph

    flamy是hive管理工具,它提供的一个功能就是根据给定的hql绘制会各个表的关系依赖graph。

    flamy-demo提供了flamy的使用示例,这里只展示graph相关的功能使用。

    cd flamy-demo
    tree conf
    -----------------
    conf
    ├── flamy.properties
    └── log4j2.properties
    

    在conf文件夹下,flamy.properties是flamy的配置文件。配置了本地hsql文件的存放位置,已经hive连接相关的属性

    cat flamy.properties
    ---------------------
    ### Flaminem project
    
    flamy.model.dir.paths = model
    
    flamy.variables.path = ${flamy.model.dir.paths}/VARIABLES.properties
    
    flamy.env.model.hive.presets.path = ${flamy.model.dir.paths}/model_PRESETS.hql
    
    flamy.env.local.hive.presets.path = ${flamy.model.dir.paths}/model_PRESETS.hql
    flamy.env.local.hive.meta.fetcher.type = client
    flamy.env.local.hive.metastore.uri = "thrift://localhost:9083"
    flamy.env.local.hive.server.uri   = "localhost:10000"
    

    flamy.model.dir.paths 属性就是配置了model路径,示例中就是解析这些hql文件,生成对应的graph

    tree model
    -----------------------
    model
    ├── model_PRESETS.hql
    ├── nasa
    │   ├── facts.db
    │   │   └── http_status
    │   │       └── CREATE.hql
    │   ├── nasa_access.db
    │   │   ├── daily_logs
    │   │   │   ├── CREATE.hql
    │   │   │   └── POPULATE.hql
    │   │   ├── daily_url_error_rates
    │   │   │   ├── CREATE.hql
    │   │   │   └── POPULATE.hql
    │   │   ├── daily_urls
    │   │   │   ├── CREATE.hql
    │   │   │   └── POPULATE.hql
    │   │   └── daily_urls_with_error
    │   │       ├── CREATE.hql
    │   │       └── POPULATE.hql
    │   └── nasa_access_import.db
    │       ├── daily_logs
    │       │   ├── CREATE.hql
    │       │   └── POPULATE.hql
    │       └── raw_data
    │           └── CREATE.hql
    └── VARIABLES.properties
    

    flamy的安装与配置可以参考flamy的文档

    运行demo.sh可以运行示例,并输入show graph可以得到如下输出

    这就是对model文件夹下的hql之间的依赖关系图。

    接下来就看看flamy是如何实现这个功能的。

    flamy graph生成源码分析

    先将flamy源码导入到idea中。

    因为我们上面的运行入口是demo.sh,所以先看看demo.sh做了哪些工作

    cat demo.sh
    ------------------------
    #!/bin/bash
    
    if [[ ! -f ${FLAMY_HOME}/bin/flamy ]]; then
      echo 'Could not find ${FLAMY_HOME}/bin/flamy executable. Please make sure the environment variable FLAMY_HOME is correctly set.'
    fi
    
    $FLAMY_HOME/bin/flamy --config-file conf/flamy.properties shell
    
    

    可以看到,他就是将配置文件传给了$FLAMY_HOME/bin/flamy脚本,那就看看flamy又做了什么操作

    cat $FLAMY_HOME/bin/flamy
    -----------------------
    #!/bin/bash
    # Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in
    DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
    
    ARGS=("$@")
    FLAMY_JAVA_OPTIONS="-XX:MaxPermSize=512M -XX:+CMSClassUnloadingEnabled"
    RUN="java ${FLAMY_JAVA_OPTIONS} ${FLAMY_EXTRA_JAVA_OPTIONS} -cp $DIR/../lib/*:$DIR/../conf:$DIR/../additional_jars/* com.flaminem.flamy.Launcher"
    
    exec ${RUN} "${ARGS[@]}"
    

    flamy脚本配置了flamy运行时需要的jvm参数,和依赖jar包的拷贝,然后运行的main函数为com.flaminem.flamy.Launcher,并讲参数都传给了它。所以,我们的源码入口为com.flaminem.flamy.Launcher

    Launcher

    object Launcher的main函数如下:

    def main(args: Array[String]): Unit = {
        // 初始化输出相关
        FlamyOutput.init()
        val returnStatus =
          try {
            // 以shell模式开启了一个Launcher class并启动
            new Launcher(args, withShell = true).launch()
          }
          finally{
            FlamyOutput.shutdown()
          }
        System.exit(returnStatus.exitCode)
      }
    

    函数内新建了一个Launcher class并将参数传给它启动。

    def launch(globalOptions: FlamyGlobalOptions = new FlamyGlobalOptions()): ReturnStatus = {
        /* This is necessary to avoid collisions with other running instances of flamy */
        try {
            // other code
            unsafeLaunch(globalOptions)
        }
        // other code
      }
    

    Launcher.launch()方法调用了unsafeLaunch方法,而unsafeLaunch方法则是根据不同的Commands进行不同的处理。

    private def unsafeLaunch(rootGlobalOptions: FlamyGlobalOptions): ReturnStatus = {
        if(opts.optException.isDefined) {
          handleException(opts.optException.get)
        }
    
        /* This "if" cannot be moved inside the "match case" because opts.subcommands is not defined when optError is defined */
        if(opts.optError.isDefined) {
          opts.optError.get
        }
        else {
          opts.subcommands match {
            // other cases
    
            case (command: FlamySubcommand) :: subCommands =>
              val res: ReturnStatus = command.doCommand(rootGlobalOptions.overrideWith(opts.globalOptions), subCommands)
              stats = command.stats.orNull
              res
        }
      }
    

    对于是FlamySubcommand类型的command,直接调用doCommand执行命令。

    现在看看flamy定义了哪些Commands类型

    class Commands(args: Seq[String]) extends Options(args) {
        val show = new commands.Show
        val diff = new commands.Diff
        val describe = new commands.Describe
        val check = new commands.Check
        val run = new commands.Run
        val push = new commands.Push
        val repair = new commands.Repair
        val count = new commands.Count
        val waitForPartition = new commands.WaitForPartition
        val gatherInfo = new commands.GatherInfo
      }
    

    因为show graph是查看hql的graph关系的,所以我们只看commands.Show

    查看com.flaminem.flamy.commands.Show,它也定义一些子命令

    val conf =  new Subcommand("conf") ...
    val schemas = new Subcommand("schemas") ...
    val tables = new Subcommand("tables") ...
    val partitions = new Subcommand("partitions") ...
    val graph = new ShowGraph
    val select = new Subcommand("select") ...
    

    Show的doCommand方法如下:

     override def doCommand(globalOptions: FlamyGlobalOptions, subCommands: List[ScallopConf]): ReturnStatus = {
        subCommands match {
          case  (command: FlamySubcommand)::Nil => command.doCommand(globalOptions, Nil)
          // other cases
        }
      }
    

    也是直接调用了子命令的doCommand,所以接下来看ShowGraph.doCommand所做的处理。

    override def doCommand(globalOptions: FlamyGlobalOptions, subCommands: List[ScallopConf]): ReturnStatus = {
        //初始化了flamy上下文
        val context = new FlamyContext(globalOptions)
        // 处理hql
        showGraph(context)
        ReturnSuccess
      }
    

    所以这个graph的处理就是在showGraph方法中

    private def showGraph(context: FlamyContext): Unit = {
        val model: Model =
          if(complete()){
            Model.getCompleteModel(context, Nil)
          }
          else{
            Model.getIncompleteModel(context, Nil)
          }
        val g: TableGraph = TableGraph(model).applySkipViews.applyFilter
    
        val graphDir: String = FlamyGlobalContext.RUN_DIR.getProperty + "/result"
        context.getLocalFileSystem.fileSystem.mkdirs(new Path(graphDir))
    
        val graphPath = makeGraphPath(graphDir, items())
    
        val lightPath = s"${graphPath}_light.png"
        val fullPath = s"$graphPath.png"
    
        if(schemaOnly()){
          g.export.toSchemaPng(graphPath)
          println(
            f"""graph printed at :
               |   ${FilePath(fullPath)}
               """.stripMargin
          )
          openGraph(fullPath)
        }
        else {
          g.export.toLightPng(graphPath + "_light")
          g.export.toFullPng(graphPath)
          println(
            f"""graphs printed at :
               |   ${FilePath(fullPath)}
               |   ${FilePath(lightPath)}
               """.stripMargin
          )
          openGraph(lightPath, fullPath)
        }
      }
    

    从代码中可以看出来,showGraph有三个步骤:

    1. 获取hql的模型
    2. 根据模型获取TableGraph
    3. 输出graph

    接下来逐步分析。

    model的获取

    以为第一次运行,model的获取走的是else,所以看Model.getIncompleteModel,源码如下

    def getIncompleteModel(context: FlamyContext, items:Iterable[ItemName]): IncompleteModel = {
        val index = context.getFileIndex.strictFilter(items).get
        new IncompleteModelFactory(context).generateModel(index)
      }
    

    FlamyContext中定义了一个FileIndex的字段,用来获取配置的model.dir.paths下的文件位置

    private lazy val fileIndex: FileIndex = FileIndex(this)
    

    然后再看看FileIndex是如何解析的

    object FileIndex {
    
      def apply(context: FlamyContext): FileIndex = {
        val index: Index = new Index()
        // flamy.model.dir.paths配置的路径
        val dbPaths = context.modelDirs
        // dbPaths 所有文件
        val files: FileList = FileUtils.listItemFiles(dbPaths)
    
        val schemaFiles = new mutable.HashMap[SchemaName, SchemaFile]()
        for {
        // 遍历所有.db文件夹
          dbDir: File <- listDatabaseDirectories(dbPaths)
        } {
          val schemaFile = new MissingSchemaFile(dbDir)
          schemaFiles += schemaFile.schemaName -> schemaFile
        }
    
        for {
          file <- files
          fileType <- FileType.getTypeFromFileName(file.getName)
        } {
          if (TableFile.VALID_FILE_TYPES.contains(fileType)) {
            val tableFile: TableFile = new ExistingTableFile(file, fileType)
            index.addFile(tableFile)
          }
          else if (SchemaFile.VALID_FILE_TYPES.contains(fileType)) {
            val schemaFile: SchemaFile = new ExistingSchemaFile(file)
            schemaFiles += schemaFile.schemaName -> schemaFile
          }
        }
        schemaFiles.values.foreach {
          index.addFile
        }
        new FileIndex(index)
      }
    
    }
    

    FileIndex就是根据model下的文件路径进行了解析,然后添加到FileIndex中。

    flamy中运行定义的FileType

    object FileType {
        // CREATE.hql
      case object CREATE extends FileType {
        override val filePrefix: String = "CREATE"
        override val fileExtension: String = "hql"
        override val multipleFilesAllowed: Boolean = false
      }
        // VIEW.hql
      case object VIEW extends FileType {
        override val filePrefix: String = "VIEW"
        override val fileExtension: String = "hql"
        override val multipleFilesAllowed: Boolean = false
      }
        //POPULATE.hql
      case object POPULATE extends FileType {
        override val filePrefix: String = "POPULATE"
        override val fileExtension: String = "hql"
        override val multipleFilesAllowed: Boolean = true
      }
        //TEST.hql
      case object TEST extends FileType {
        override val filePrefix: String = "TEST"
        override val fileExtension: String = "hql"
        override val multipleFilesAllowed: Boolean = true
      }
        //META.properties
      case object META extends FileType {
        override val filePrefix: String = "META"
        override val fileExtension: String = "properties"
        override val multipleFilesAllowed: Boolean = false
      }
        //CREATE_SCHEMA.hql
      case object CREATE_SCHEMA extends FileType {
        override val filePrefix: String = "CREATE_SCHEMA"
        override val fileExtension: String = "hql"
        override val multipleFilesAllowed: Boolean = false
      }
        //PRESETS.hql
      case object PRESETS extends FileType {
        override val filePrefix: String = "PRESETS"
        override val fileExtension: String = "hql"
        override val multipleFilesAllowed: Boolean = false
      }
     }
    

    在之前,我们看到了model文件夹下的hql文件有CREATE,POPULATE类型的。所以这里会被ExistingTableFile所封装。

    因为getIncompleteModel的items参数为nil,所以val index = context.getFileIndex.strictFilter(items).get没有过滤。

    然后就是调用IncompleteModelFactory.generateModel(index)生成模型操作了。

    // 生成模型
      def generateModel(fileIndex: FileIndex): IncompleteModel = {
        logger.info("Generating model")
        mergeableTableInfoSet = MergeableTableInfoCollection()
        //获取所有CREATE.hql文件
        val creates = fileIndex.getAllTableFilesOfType(FileType.CREATE)
        val views = fileIndex.getAllTableFilesOfType(FileType.VIEW)
        // 获取所有POPULATE.hql文件
        val populates = fileIndex.getAllTableFilesOfType(FileType.POPULATE)
        // 获取所有META.properties文件
        val metas = fileIndex.getAllTableFilesOfType(FileType.META)
        // 解析create语句
        runner.run(analyzeCreate(_: TableFile), creates)
        // 解析查询语句
        runner.run(analyzePopulate(_: TableFile), populates ++ views)
        runner.run(analyzeMeta(_: TableFile), metas)
    
        val stats: FileRunner#Stats = runner.getStats
        FlamyOutput.out.info(stats.format("analyzed"))
    
        if (stats.getFailCount > 0) {
          throw new FlamyException("Interrupting command, some file were not validated.")
        }
        // 加入所有未被处理的hql文件
        mergeableTableInfoSet ++= getMissingTables(fileIndex)
    
        val result = new IncompleteModel(mergeableTableInfoSet.toTableInfoCollection, fileIndex)
        logger.info("model generated")
        result
      }
    

    generateModel先获取model文件夹下所有的hql文件,然后对不同类型的hql文件进行分析,这里先看analyzeCreate

    model\nasa\nasa_access_import.db\daily_logs\CREATE.hql为例

    -- DROP TABLE IF EXISTS nasa_access_import.daily_logs ;
    CREATE TABLE nasa_access_import.daily_logs(
      source_ip STRING,
      source_url STRING,
      time TIMESTAMP ,
      action STRING,
      url STRING,
      protocol STRING,
      response_code INT,
      size INT,
      line STRING
    )
    PARTITIONED BY (day STRING)
    STORED AS ORC
    ;
    

    analyzeCreate方法代码如下:

    // 这里是获取create语句
      private def analyzeCreate(tableFile: TableFile) {
        //解析创建的table
        val table: Table = CreateTableParser.parseText(tableFile.text)(context)
        checkName(table, tableFile)
        mergeableTableInfoSet += TableInfo(table)
      }
    

    首先调用CreateTableParser.parseText解析create语句

    CreateTableParser object内容如下:

    object CreateTableParser {
    
      @throws(classOf[SemanticException])
      @throws(classOf[ParseException])
      @throws(classOf[IOException])
      def parseQuery(query: String)(implicit context: FlamyContext): Table = {
        val cti: CreateTableInfo = new CreateTableInfo(context)
        cti.parse(query)
      }
    
      @throws(classOf[SemanticException])
      @throws(classOf[ParseException])
      @throws(classOf[IOException])
      def parseText(text: String)(implicit context: FlamyContext): Table = {
        // 获取查询语句,用的是正则
        val queries: Seq[String] = QueryUtils.cleanAndSplitQuery(text)
        if (queries.size != 1) {
          throw new RuntimeException("More than 1 query parsed")
        }
        parseQuery(queries.iterator.next)
      }
    }
    

    只有两个函数:

    • parseText 分割查询语句,删掉注释
    • parseQuery 处理parseText获取的查询语句。

    parseText只是将提供的文本进行了注释的去除和用正则表达式按;分割文本。

    parseQuery就是处理单条查询语句了。在parseQuery语句中新建了一个CreateTableInfo对象。

    CreateTableInfo实现了org.apache.hadoop.hive.ql.lib.NodeProcessor接口,该接口是hive解析hql的操作接口。

    /**
     * Base class for processing operators which is no-op. The specific processors
     * can register their own context with the dispatcher.
     */
    public interface NodeProcessor {
    
     
      Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
          Object... nodeOutputs) throws SemanticException;
    }
    

    parseQuery调用了CreateTableInfo.parse方法

    @throws(classOf[ParseException])
    @throws(classOf[SemanticException])
    @throws(classOf[IOException])
    //noinspection ScalaStyle
    def parse(query: String): Table = {
        // 这里调用了hive提供的ParseDriver,进行句法分析
        val pd: ParseDriver = new ParseDriver
        // 获取语法树
        val completeTree = pd.parse(query, ModelHiveContext.getLightContext(context).hiveContext)
        var tree: ASTNode = completeTree
        while ((tree.getToken == null) && (tree.getChildCount > 0)) {
          tree = tree.getChild(0)
        }
        table = null
        columns.clear()
        partitions.clear()
        val rules: util.Map[Rule, NodeProcessor] = new util.LinkedHashMap[Rule, NodeProcessor]
        val disp: Dispatcher = new DefaultRuleDispatcher(this, rules, null)
        val ogw: GraphWalker = new DefaultGraphWalker(disp)
        val topNodes: util.ArrayList[Node] = new util.ArrayList[Node]
        topNodes.add(tree)
        ogw.startWalking(topNodes, null)
        if (table == null) {
          throw new UnexpectedBehaviorException("Could not parse this AST:" + HiveParserUtils.drawTree(completeTree))
        }
        table.columns = columns
        table.partitions = partitions
        table
      }
    

    在函数的第一行就初始化了org.apache.hadoop.hive.ql.parse.ParseDriver,ParseDriver提供了语法树的解析生成功能,所以调用ParseDriver.parse可以获取当前查询语句的语法树。

    获取到语法树后,就可以遍历语法树进行处理了。

    之后又实例化了DefaultRuleDispatcherDefaultGraphWalker,最后调用DefaultGraphWalker.startWalking对语法进行遍历。

    public void startWalking(Collection<Node> startNodes,
          HashMap<Node, Object> nodeOutput) throws SemanticException {
        toWalk.addAll(startNodes);
        while (toWalk.size() > 0) {
          Node nd = toWalk.remove(0);
          walk(nd);
          if (nodeOutput != null) {
            nodeOutput.put(nd, retMap.get(nd));
          }
        }
      }
    

    对每个节点调用walk方法。

    public void walk(Node nd) throws SemanticException {
        if (opStack.empty() || nd != opStack.peek()) {
          opStack.push(nd);
        }
    
        if ((nd.getChildren() == null)
            || getDispatchedList().containsAll(nd.getChildren())) {
          // all children are done or no need to walk the children
          if (!getDispatchedList().contains(nd)) {
            dispatch(nd, opStack);
          }
          opStack.pop();
          return;
        }
        // add children, self to the front of the queue in that order
        getToWalk().add(0, nd);
        getToWalk().removeAll(nd.getChildren());
        getToWalk().addAll(0, nd.getChildren());
      }
    }
    

    显示判断当前节点以及其子节点是否被遍历过,若子节点已经被遍历,自己没有被遍历,则调用dispatch进行处理。这里可以看出,DefaultGraphWalker的遍历方式是深度优先遍历。

    public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
        dispatchAndReturn(nd, ndStack);
     }
    public <T> T dispatchAndReturn(Node nd, Stack<Node> ndStack) throws SemanticException {
        Object[] nodeOutputs = null;
        if (nd.getChildren() != null) {
          nodeOutputs = new Object[nd.getChildren().size()];
          int i = 0;
          for (Node child : nd.getChildren()) {
            nodeOutputs[i++] = retMap.get(child);
          }
        }
    
        Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
        retMap.put(nd, retVal);
        return (T) retVal;
      } 
    

    先获取当前节点的子节点,保存在nodeOutputs数组中,然后调用dispatcher.dispatch方法。这里的dispatcher是CreateTableInfo实例的引用,所以看CreateTableInfo的process方法。

    @throws(classOf[SemanticException])
      def process(pt: Node, stack: util.Stack[Node], procCtx: NodeProcessorCtx, nodeOutputs: AnyRef*): AnyRef = {
        pt.getToken.getType match {
          case HiveParser.TOK_CREATETABLE =>
            table = HiveParserUtils.getTable(TableType.TABLE, pt.getChild(0))
          case HiveParser.TOK_TABCOLLIST =>
            pt.getParent.getType match {
              case HiveParser.TOK_CREATETABLE =>
                columns.addAll(HiveParserUtils.getColumns(pt))
              case HiveParser.TOK_TABLEPARTCOLS =>
                partitions.addAll(HiveParserUtils.getColumns(pt).map {
                  new PartitionKey(_)
                })
              case _ =>
            }
          case _ =>
        }
        null
      }
    

    可以看到,CreateTableInfo只对三种类型做了处理

    1. 类型为创建table的TOK_CREATETABLE
    2. 类型为TOK_TABCOLLIST
    3. TOK_TABLEPARTCOLS

    最后将遍历的节点保存在retMap中。

    所以在遍历中CreateTableInfo只保存了table, columns, partitions信息。

    analyzeCreate的工作就是分析了CREATE.hql中创建table的语句

    analyzePopulate函数就是分析了POPULATE.hql中查询的语句了。

    这里看它具体是如何处理的。

    private def analyzePopulate(tableFile: TableFile) {
        // 获取所有源表与目标表的依赖关系
        val tables: MergeableTableDependencyCollection = PopulatePreParser.parseText(tableFile.text)
        mergeableTableInfoSet ++= tables.map{TableInfo(_, tableFile)}
        tables.foreach{table => checkName(table, tableFile)}
      }
    

    可以看出,只要是获取查询语句各个table的依赖关系。调用PopulatePreParser.parseText方法获取。

    def parseText(text: String): MergeableTableDependencyCollection = {
        // 获取所有查询语句
        val queries: Seq[String] = QueryUtils.cleanAndSplitQuery(text).map{emptyStrings}
        queries.flatMap{parseQuery}.toMergeableTableDependencyCollection
      }
    
    

    同样是先获取查询语句,然后再对每条查询语句调用parseQuery进行处理。

    // 处理查询语句
      private def parseQuery(query: String): MergeableTableDependencyCollection = {
        // 获取Common Table Expression tables
        val cteTables: Set[String] = findCTEs(query).map{_.toLowerCase}.toSet
        logger.debug(s"CTE founds: $cteTables")
        // 获取所有目标tables
        val destTables =
          getTableNamesFromRegex(destRE,query).flatMap{ nameToTableDep(_,TableType.TABLE,Set()) }++
          getTableNamesFromRegex(viewRE,query).flatMap{ nameToTableDep(_,TableType.VIEW,Set()) }
        // 获取所有source tables
        val srcTables = findSourceTables(query).flatMap{ nameToTableDep(_,TableType.REF, cteTables) }
        // 建立起依赖关系
        destTables.foreach{ t => t.tableDeps = new TableDependencyCollection(srcTables)}
        destTables.toMergeableTableDependencyCollection
      }
    

    显而易见,flamy的parseQuery先是获取destTables,srcTables,然后建立起他们之间的关系。

    所有tables的获取都是通过正则表达式匹配查询语句来获取的。由TableDependency保存。

    一个TableDependency保存了一个table所有的结构信息以及它的依赖信息。

    得到tables,和tables之间的依赖信息,然后再实例化一个IncompleteModel对象并返回,就完成了generateModel的处理。

    也及时说showGraph就完成了Model的获取,接下来就是根据model生成tableGraph。

    TableGraph建立了tables之间的依赖图

    def apply(model: Model): TableGraph = {
        var graph: Graph[TableName, DiEdge] = Graph[TableName, DiEdge]()
    
        model.tables.foreach{
          td: TableInfo =>
            graph += TableName(td.fullName)
            graph ++= td.tableDeps.map{t => TableName(t.fullName) ~> TableName(td.fullName)}
        }
        val set1: Set[TableName] = model.fileIndex.getTableNames
        val set2: Set[TableName] = graph.getNodeSeq.toSet
        assert(set1.forall{set2.contains}, s"Please report a bug: the following tables are in the fileIndex and not in the graph: ${set1.diff(set2)}")
    
        new TableGraph(model, graph)
      }
    

    然后就是输出graph,它是将graph转化为dot语言,然后根据Graphviz将dot转化为png图片文件。

    flamy的show graph命令的执行逻辑就完毕了。

    通过hive api获取Lineage信息

    通过对flamy获取表创建语句的表信息,是通过ParseDriver来进行解析的,这里简单写一个例子

    /**
     * @(#)MyLineageInfo.java, 2017/8/28.
     * <p/>
     * Copyright 2017 Netease, Inc. All rights reserved.
     * NETEASE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     */
    package org.jjzhu.hive;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.Set;
    import java.util.Stack;
    import java.util.TreeSet;
    
    import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
    import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
    import org.apache.hadoop.hive.ql.lib.Dispatcher;
    import org.apache.hadoop.hive.ql.lib.GraphWalker;
    import org.apache.hadoop.hive.ql.lib.Node;
    import org.apache.hadoop.hive.ql.lib.NodeProcessor;
    import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
    import org.apache.hadoop.hive.ql.lib.Rule;
    import org.apache.hadoop.hive.ql.parse.ASTNode;
    import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
    import org.apache.hadoop.hive.ql.parse.HiveParser;
    import org.apache.hadoop.hive.ql.parse.ParseDriver;
    import org.apache.hadoop.hive.ql.parse.ParseException;
    import org.apache.hadoop.hive.ql.parse.SemanticException;
    import org.jjzhu.hive.util.HiveParserUtil;
    
    /**
     * @author 祝佳俊(hzzhujiajun@corp.netease.com)
     */
    public class MyLineageInfo implements NodeProcessor {
        /**
         * Stores input tables in sql.
         */
        private TreeSet<String> inputTableList = new TreeSet<>();
        /**
         * Stores output tables in sql.
         */
        private TreeSet<String> outputTableList = new TreeSet<>();
    
        private Set<String> columns = new TreeSet<>();
    
        private Set<String> partitions = new TreeSet<>();
    
        public Set<String> getColumns() {
            return columns;
        }
    
        public TreeSet getInputTableList() {
            return inputTableList;
        }
    
        public TreeSet getOutputTableList() {
            return outputTableList;
        }
    
        public Set<String> getPartitions() {
            return partitions;
        }
    
        /**
         * Implements the process method for the NodeProcessor interface.
         */
        public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
                              Object... nodeOutputs) throws SemanticException {
            ASTNode pt = (ASTNode) nd;
    
            switch (pt.getToken().getType()) {
    
                case HiveParser.TOK_CREATETABLE:
                case HiveParser.TOK_TAB:
                    outputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)));
                    break;
                case HiveParser.TOK_TABCOLLIST:
                    switch (pt.getParent().getType()){
                        case HiveParser.TOK_CREATETABLE:
                            columns.addAll(HiveParserUtil.getColumns(pt));
                            break;
                        case HiveParser.TOK_TABLEPARTCOLS:
                            partitions.addAll(HiveParserUtil.getColumns(pt));
                            break;
                    }
                    break;
                case HiveParser.TOK_TABREF:
                    ASTNode tabTree = (ASTNode) pt.getChild(0);
                    String table_name = (tabTree.getChildCount() == 1) ?
                            BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) :
                            BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1);
                    inputTableList.add(table_name);
                    break;
            }
            return null;
        }
    
        /**
         * parses given query and gets the lineage info.
         *
         * @param query 查询语句
         * @throws ParseException exception
         */
        public void getLineageInfo(String query) throws ParseException,
                SemanticException {
    
            /*
             * Get the AST tree
             */
            ParseDriver pd = new ParseDriver();
            ASTNode tree = pd.parse(query);
    
            while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
                tree = (ASTNode) tree.getChild(0);
            }
    
            /*
             * initialize Event Processor and dispatcher.
             */
            inputTableList.clear();
            outputTableList.clear();
            partitions.clear();
            columns.clear();
            // create a walker which walks the tree in a DFS manner while maintaining
            // the operator stack. The dispatcher
            // generates the plan from the operator tree
            Map<Rule, NodeProcessor> rules = new LinkedHashMap<>();
    
            // The dispatcher fires the processor corresponding to the closest matching
            // rule and passes the context along
            Dispatcher disp = new DefaultRuleDispatcher(this, rules, null);
            GraphWalker ogw = new DefaultGraphWalker(disp);
    
            // Create a list of topop nodes
            ArrayList<Node> topNodes = new ArrayList<>();
            topNodes.add(tree);
            ogw.startWalking(topNodes, null);
        }
    
        public static void main(String[] args) throws IOException, ParseException,
                SemanticException {
            String query = "INSERT OVERWRITE TABLE hzzhujiajun.table1 SELECT a.name FROM hzzhujiajun.table2 a join hzzhujiajun.table3 b ON (a.id = b.id)";
            String query2 = "CREATE EXTERNAL TABLE facts.http_status (\n" +
                    "  code INT,\n" +
                    "  status_group STRING,\n" +
                    "  message STRING,\n" +
                    "  description STRING\n" +
                    ")\n" +
                    "ROW FORMAT DELIMITED \n" +
                    "FIELDS TERMINATED BY '\\t'\n" +
                    "LINES TERMINATED BY '\\n'\n" +
                    "STORED AS TEXTFILE\n" +
                    "LOCATION \"${EXTERNAL_DATA_LOCATION}/facts.db/http_status\"\n";
            String query3 = "CREATE TABLE nasa_access.daily_logs(\n" +
                    "  source_ip STRING,\n" +
                    "  source_url STRING,\n" +
                    "  time TIMESTAMP ,\n" +
                    "  action STRING,\n" +
                    "  url STRING,\n" +
                    "  size INT,\n" +
                    "  line STRING\n" +
                    ")\n" +
                    "PARTITIONED BY (day STRING)\n" +
                    "STORED AS ORC";
            MyLineageInfo lep = new MyLineageInfo();
            lep.getLineageInfo(query);
            HiveParserUtil.output(lep);
            System.out.println("------------------------------------------");
            lep.getLineageInfo(query2);
            HiveParserUtil.output(lep);
    
            System.out.println("------------------------------------------");
            lep.getLineageInfo(query3);
            HiveParserUtil.output(lep);
        }
    }
    

    运行得到的输出

    Input tables = [hzzhujiajun.table2, hzzhujiajun.table3]
    Output tables = [hzzhujiajun.table1]
    columns = []
    partitions columns:[]
    ------------------------------------------
    Input tables = []
    Output tables = [facts.http_status]
    columns = [code, description, message, status_group]
    partitions columns:[]
    ------------------------------------------
    Input tables = []
    Output tables = [nasa_access.daily_logs]
    columns = [action, line, size, source_ip, source_url, time, url]
    partitions columns:[day]
    

    相关文章

      网友评论

        本文标题:flamy graph源码分析

        本文链接:https://www.haomeiwen.com/subject/maqodxtx.html