美文网首页
聊聊flink的Table API及SQL Programs

聊聊flink的Table API及SQL Programs

作者: go4it | 来源:发表于2019-01-21 12:51 被阅读34次

    本文主要研究一下flink的Table API及SQL Programs

    实例

    // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // create a TableEnvironment
    // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // register a Table
    tableEnv.registerTable("table1", ...)            // or
    tableEnv.registerTableSource("table2", ...);     // or
    tableEnv.registerExternalCatalog("extCat", ...);
    // register an output Table
    tableEnv.registerTableSink("outputTable", ...);
    
    // create a Table from a Table API query
    Table tapiResult = tableEnv.scan("table1").select(...);
    // create a Table from a SQL query
    Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
    
    // emit a Table API result Table to a TableSink, same for SQL result
    tapiResult.insertInto("outputTable");
    
    // execute
    env.execute();
    
    • 本实例展示了flink的Table API及SQL Programs的基本用法

    Table API实例

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // register Orders table
    
    // scan registered Orders table
    Table orders = tableEnv.scan("Orders");
    // compute revenue for all customers from France
    Table revenue = orders
      .filter("cCountry === 'FRANCE'")
      .groupBy("cID, cName")
      .select("cID, cName, revenue.sum AS revSum");
    
    // emit or convert Table
    // execute query
    
    • 通过tableEnv.scan方法来创建Table,之后使用Table的各种查询api

    sqlQuery实例

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // register Orders table
    
    // compute revenue for all customers from France
    Table revenue = tableEnv.sqlQuery(
        "SELECT cID, cName, SUM(revenue) AS revSum " +
        "FROM Orders " +
        "WHERE cCountry = 'FRANCE' " +
        "GROUP BY cID, cName"
      );
    
    // emit or convert Table
    // execute query
    
    • sqlQuery内部是使用Apache Calcite来实现的

    sqlUpdate实例(TableSink)

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // register "Orders" table
    // register "RevenueFrance" output table
    
    // compute revenue for all customers from France and emit to "RevenueFrance"
    tableEnv.sqlUpdate(
        "INSERT INTO RevenueFrance " +
        "SELECT cID, cName, SUM(revenue) AS revSum " +
        "FROM Orders " +
        "WHERE cCountry = 'FRANCE' " +
        "GROUP BY cID, cName"
      );
    
    // execute query
    
    • 这里使用TableSink注册output table之后,就可以使用TableEnvironment的sqlUpdate方法sink到output table

    insertInto实例(TableSink)

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // create a TableSink
    TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
    
    // register the TableSink with a specific schema
    String[] fieldNames = {"a", "b", "c"};
    TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
    tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
    
    // compute a result Table using Table API operators and/or SQL queries
    Table result = ...
    // emit the result Table to the registered TableSink
    result.insertInto("CsvSinkTable");
    
    // execute the program
    
    • 通过Table.insertInto方法sink到output table

    DataStream(或DataSet)与Table转换

    注册DataStream为Table

    // get StreamTableEnvironment
    // registration of a DataSet in a BatchTableEnvironment is equivalent
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    DataStream<Tuple2<Long, String>> stream = ...
    
    // register the DataStream as Table "myTable" with fields "f0", "f1"
    tableEnv.registerDataStream("myTable", stream);
    
    // register the DataStream as table "myTable2" with fields "myLong", "myString"
    tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
    
    • 通过StreamTableEnvironment.registerDataStream注册DataStream为Table

    DataStream转Table实例

    // get StreamTableEnvironment
    // registration of a DataSet in a BatchTableEnvironment is equivalent
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    DataStream<Tuple2<Long, String>> stream = ...
    
    // Convert the DataStream into a Table with default fields "f0", "f1"
    Table table1 = tableEnv.fromDataStream(stream);
    
    // Convert the DataStream into a Table with fields "myLong", "myString"
    Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
    
    • 这里通过StreamTableEnvironment.fromDataStream将DataStream转为Table

    Table转DataStream实例

    
    // get StreamTableEnvironment. 
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // Table with two fields (String name, Integer age)
    Table table = ...
    
    // convert the Table into an append DataStream of Row by specifying the class
    DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
    
    // convert the Table into an append DataStream of Tuple2<String, Integer> 
    //   via a TypeInformation
    TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
      Types.STRING(),
      Types.INT());
    DataStream<Tuple2<String, Integer>> dsTuple = 
      tableEnv.toAppendStream(table, tupleType);
    
    // convert the Table into a retract DataStream of Row.
    //   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
    //   The boolean field indicates the type of the change. 
    //   True is INSERT, false is DELETE.
    DataStream<Tuple2<Boolean, Row>> retractStream = 
      tableEnv.toRetractStream(table, Row.class);
    
    • 这里通过StreamTableEnvironment.toRetractStream将Table转换为DataStream

    Table转DataSet实例

    // get BatchTableEnvironment
    BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // Table with two fields (String name, Integer age)
    Table table = ...
    
    // convert the Table into a DataSet of Row by specifying a class
    DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
    
    // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
    TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
      Types.STRING(),
      Types.INT());
    DataSet<Tuple2<String, Integer>> dsTuple = 
      tableEnv.toDataSet(table, tupleType);
    
    • 这里通过BatchTableEnvironment.toDataSet将Table转换为DataSet

    Data Types与Table Schema映射

    Position-based Mapping(Tuple类型)

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    //---Tuple类型---
    
    DataStream<Tuple2<Long, Integer>> stream = ...
    
    // convert DataStream into Table with default field names "f0" and "f1"
    Table table = tableEnv.fromDataStream(stream);
    
    // convert DataStream into Table with field names "myLong" and "myInt"
    Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
    
    • Position-based的映射要求新指定的字段名不能与input data type重名,如果没有指定,则默认从f0开始来命名原始类型;此模式适用于Tuple、Row类型,POJO类型不能使用此模式

    Name-based Mapping(POJO类型)

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    //---Tuple类型---
    
    DataStream<Tuple2<Long, Integer>> stream = ...
    
    // convert DataStream into Table with default field names "f0" and "f1"
    Table table = tableEnv.fromDataStream(stream);
    
    // convert DataStream into Table with field "f1" only
    Table table = tableEnv.fromDataStream(stream, "f1");
    
    // convert DataStream into Table with swapped fields
    Table table = tableEnv.fromDataStream(stream, "f1, f0");
    
    // convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
    Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
    
    //---POJO类型---
    
    // Person is a POJO with fields "name" and "age"
    DataStream<Person> stream = ...
    
    // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
    Table table = tableEnv.fromDataStream(stream);
    
    // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
    Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
    
    // convert DataStream into Table with projected field "name" (name-based)
    Table table = tableEnv.fromDataStream(stream, "name");
    
    // convert DataStream into Table with projected and renamed field "myName" (name-based)
    Table table = tableEnv.fromDataStream(stream, "name as myName");
    
    • Tuple或者POJO类型都可以使用这种模式,也可以使用as进行别名

    Atomic类型

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    DataStream<Long> stream = ...
    
    // convert DataStream into Table with default field name "f0"
    Table table = tableEnv.fromDataStream(stream);
    
    // convert DataStream into Table with field name "myLong"
    Table table = tableEnv.fromDataStream(stream, "myLong");
    
    • 原始类型被转换为单个字段

    Row类型

    // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
    DataStream<Row> stream = ...
    
    // convert DataStream into Table with default field names "name", "age"
    Table table = tableEnv.fromDataStream(stream);
    
    // convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
    Table table = tableEnv.fromDataStream(stream, "myName, myAge");
    
    // convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
    Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
    
    // convert DataStream into Table with projected field "name" (name-based)
    Table table = tableEnv.fromDataStream(stream, "name");
    
    // convert DataStream into Table with projected and renamed field "myName" (name-based)
    Table table = tableEnv.fromDataStream(stream, "name as myName");
    
    • Row类型支持任意数量的字段,并允许字段值为null,它可以使用Position-based Mapping及Name-based Mapping

    小结

    flink的Table API及SQL Programs的基本用法

    • 首先是创建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment),之后就是创建Table或者TableSource并注册到catalog(默认使用的catalog是internal的,也可以自己选择注册external catalog),然后就进行table的query,之后就是一些转换操作
    • 关于Table的创建可以从DataSet、DataStream转换过来;关于Table的查询可以使用api query(scan方法),也可以使用sql query(sqlQuery方法),或者是混合使用
    • 也可以将查询的Table转换为DataSet或者DataStream进行其他处理;如果输出也是输出到table的话,可以注册TableSink,然后使用TableEnvironment的sqlUpdate方法或Table的insertInto方法输出到TableSink

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Table API及SQL Programs

          本文链接:https://www.haomeiwen.com/subject/niccjqtx.html