美文网首页
聊聊flink TableEnvironment的scan操作

聊聊flink TableEnvironment的scan操作

作者: go4it | 来源:发表于2019-01-22 10:28 被阅读19次

    本文主要研究一下flink TableEnvironment的scan操作

    实例

    //Scanning a directly registered table
    val tab: Table = tableEnv.scan("tableName")
    
    //Scanning a table from a registered catalog
    val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
    
    • scan操作用于从schema读取指定的table,也可以传入catalogName及dbName从指定的catalog及db读取

    TableEnvironment.scan

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala

    abstract class TableEnvironment(val config: TableConfig) {
    
      private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
      private val rootSchema: SchemaPlus = internalSchema.plus()
    
      //......
    
      @throws[TableException]
      @varargs
      def scan(tablePath: String*): Table = {
        scanInternal(tablePath.toArray) match {
          case Some(table) => table
          case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
        }
      }
    
      private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
        require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
        val schemaPaths = tablePath.slice(0, tablePath.length - 1)
        val schema = getSchema(schemaPaths)
        if (schema != null) {
          val tableName = tablePath(tablePath.length - 1)
          val table = schema.getTable(tableName)
          if (table != null) {
            return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
          }
        }
        None
      }
    
      private def getSchema(schemaPath: Array[String]): SchemaPlus = {
        var schema = rootSchema
        for (schemaName <- schemaPath) {
          schema = schema.getSubSchema(schemaName)
          if (schema == null) {
            return schema
          }
        }
        schema
      }
    
      //......
    }
    
    • scan方法内部调用的是scanInternal,scanInternal首先读取catalog及db信息,然后调用getSchema方法来获取schema
    • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
    • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

    小结

    • TableEnvironment的scan操作就是从Schema中查找Table,可以使用tableName,或者额外指定catalog及db来查找
    • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
    • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

    doc

    相关文章

      网友评论

          本文标题:聊聊flink TableEnvironment的scan操作

          本文链接:https://www.haomeiwen.com/subject/ujfljqtx.html