美文网首页
十一、Flink Table

十一、Flink Table

作者: 木戎 | 来源:发表于2019-05-22 17:17 被阅读0次

    简介

    Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。

    Flink SQL的编程模型

    创建一个TableEnvironment

    TableEnvironment是Table API和SQL集成的核心概念,它主要负责:

    1. 在内部目录中注册一个Table
    2. 注册一个外部目录
    3. 执行SQL查询
    4. 注册一个用户自定义函数(标量、表及聚合)
    5. 将DataStream或者DataSet转换成Table
    6. 持有ExecutionEnvironment或者StreamExecutionEnvironment的引用 一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。 TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。

    在目录中注册表

    TableEnvironment允许通过各种源来注册一个表:

    1. 一个已存在的Table对象,通常是Table API或者SQL查询的结果 Table projTable = tableEnv.scan("X").select(...);
    2. TableSource,可以访问外部数据如文件、数据库或者消息系统 TableSource csvSource = new CsvTableSource("/path/to/file", ...);
    3. DataStream或者DataSet程序中的DataStream或者DataSet //将DataSet转换为 Table Table table= tableEnv.fromDataSet(tableset);

    注册TableSink

    注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],......):

    TableSink csvSink = new CsvTableSink("/path/to/file", ...); 
    
    String[] fieldNames = {"a", "b", "c"}; 
                    TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 
                    tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
    

    示例

    public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
    
            List list  =  new ArrayList();
            String wordsStr = "Hello Flink Hello TOM";
            String[] words = wordsStr.split("\\W+");
            for(String word : words){
                WC wc = new WC(word, 1);
                list.add(wc);
            }
            DataSet<WC> input = env.fromCollection(list);
            tEnv.registerDataSet("WordCount", input, "word, frequency");
            Table table = tEnv.sqlQuery(
                    "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
            DataSet<WC> result = tEnv.toDataSet(table, WC.class);
            result.print();
        }
    
        public static class WC {
            public String word;//hello
            public long frequency;//1
    
            // public constructor to make it a Flink POJO
            public WC() {}
    
            public WC(String word, long frequency) {
                this.word = word;
                this.frequency = frequency;
            }
    
            @Override
            public String toString() {
                return "WC " + word + " " + frequency;
            }
        }
    

    由于Table API是Scala和Java的语言集成查询API,所以maven需要把scala的pom依赖加进来

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.12</artifactId>
          <version>1.8.0</version>
        </dependency>
    
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.12</artifactId>
          <version>1.8.0</version>
        </dependency>
    

    相关文章

      网友评论

          本文标题:十一、Flink Table

          本文链接:https://www.haomeiwen.com/subject/jyiuzqtx.html