美文网首页
Dataset DataStream Table

Dataset DataStream Table

作者: jerome徐 | 来源:发表于2019-02-17 21:54 被阅读0次

Table api案例:来自官网https://flink.sojb.cn/dev/table/common.html#structure-of-table-api-and-sql-programs

官网demo

Table对应两种api ,table Api query和table Sql query.

Stream和Banch的Table环境变量

Flink内部提供的隐式函数。

implicit def table2RowDataStream(table: Table): DataStream[Row] = {

val tableEnv =table.tableEnv.asInstanceOf[ScalaStreamTableEnv] tableEnv.toAppendStream[Row](table)

}

将DataStream注册为表:

registerDataStreamInternal方法转换表的可以追踪

将DataStream转换为表

   将dataStream转换为Table.调用函数fromDataStream.

DataStream-->Table

创建一个唯一的表名字,并参数replace确定是否替换已经存在的表。registerDataStreamInternal方法根据DataStream创建一个DataStreamTable。

将DataStreamTable注册到CataLog中,利用CatalogManager实现。

利用datastreamtable创建一个FlinkTempTable,其实就是一个CataLogTable.Catalog内部有tableschema,column.最后调用createTable将catalogtable注入内存中。这个table是一个Map 数据结构。

最后调用scan方法将之前注册的变返回,利用的catalogManager

将表转换为DataStream

调用方法toAppendStream方法.创建返回的Typeinformation,将javastream转换为scala stream,返回

table.getRelNode的分析另外篇幅分析。

将DataSet转换为表,注册表通过FlinkInMemoryCatalog注册实现的。转换表和DataStream转换为表逻辑一样,先注册表然后从CataManager中取出这个表

将dataset注册为表 利用catalogManager注册表, 用新表代替原表

将表转换为DataSet

toDataSet方法。

相关文章

网友评论

      本文标题:Dataset DataStream Table

      本文链接:https://www.haomeiwen.com/subject/slegeqtx.html