美文网首页
2022-06-11-Flink-48(七)

2022-06-11-Flink-48(七)

作者: 冰菓_ | 来源:发表于2022-06-11 23:12 被阅读0次

1. 基本Table API

引入依赖
   <!-- flink-table桥接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink-如果我们希望在IDEA中允许table则要引入如下依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

说明: 如果是生产环境,lib 目录下默认已经有了 planner,就只需要有 bridge 就可以了。当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common 里

表创建和查询

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class tableDemo2 {

     public static void main(String[] args) {
          EnvironmentSettings setting = EnvironmentSettings.newInstance()
                  .inStreamingMode()
                  .useBlinkPlanner()
                  .build();
          
          //不通过流环境创建table环境
          //基于blink版本planner进行流处理
          TableEnvironment tableEnv = TableEnvironment.create(setting);
          
          /* 小米,./click,10000
          小红,./click,10000
          小牛,./click,10000
          小黄,./click,10000*/

          //创建一张输入的表
          String createDDL = "CREATE TABLE clickTable (" +
          " user_name STRING, "+
          " url STRING, " +
          " ts BIGINT " +
          ") WITH (" +
          " 'connector' = 'filesystem', " +
          " 'path' = 'src/main/resources/c.txt', " +
          " 'format' = 'csv'" +
          ")";

          tableEnv.executeSql(createDDL);

          //table-api查询
          Table table1 = tableEnv.from("clickTable").where($("user_name").isEqual("小米")).select($("user_name"), $("url"));
          tableEnv.createTemporaryView("table1",table1);



          //创建一张输出的表
          String createOutDDL = "CREATE TABLE outTable (" +
                  " user_name STRING, "+
                  " url STRING " +
                  ") WITH (" +
                  " 'connector' = 'filesystem', " +
                  " 'path' = 'output/2022-06-11', " +
                  " 'format' = 'csv' " +
                  ")";

          //执行SQL查询
          Table table2 = tableEnv.sqlQuery("select user_name , url from clickTable");

          Table table_view = tableEnv.sqlQuery("select user_name , url from table1");
          tableEnv.executeSql(createOutDDL);

          table_view.executeInsert("outTable");
          table2.executeInsert("outTable");

     }
}

查询有两种:一个是直接table-api查,另一种是SQL查询

bug:Exception in thread “main“ org.apache.flink.table.api.SqlParserException: SQL parse failed. Encounte
解决:user是关键字,或其他关键字问题,使用``框起来

//print打印
          String createOutDDL = "CREATE TABLE outTable (" +
                  " user_name STRING, "+
                  " url STRING " +
                  ") WITH (" +
                  " 'connector' = 'print' " +
                  ")";
表转换成流

更新日志流

import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class tableDemo3 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //创建一张输入的表
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";

        //注册表
        tableEnv.executeSql(createDDL);

        Table table = tableEnv.sqlQuery("select user_name , count(1) from clickTable where user_name = '小米' group by  user_name");


        tableEnv.toChangelogStream(table).print("agg");

        env.execute();
    }
}
流转换成表

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class tableDemo1 {

     public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
          env.setParallelism(1);
          SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
               @Override
               public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
               }
          }));

          //1. 创建表环境
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

          //2. 将stream转换成table
          Table table = tableEnv.fromDataStream(streamOperator);

          //写SQL转换
          //这里的table不是表而是一个对象,实现流对象和表之间转换的媒介是虚拟表
          Table table1 = tableEnv.sqlQuery("select user , url , `timestamp` from " + table);

          //基于Table直接转换
          Table table2 = table.select($("user"), $("url"), $("timestamp")).where($("url").isEqual("./click"));

          tableEnv.toDataStream(table1).print("table1");
          tableEnv.toDataStream(table2).print("table2");

          env.execute();

     }

}

2. 流处理中的特殊概念

流处理和关系代数(表,及 SQL)的区别
区别
动态表(Dynamic Tables)

动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)。连续查询永远不会终止,并会生成另一个动态表。查询(Query)会不断更新其动态结果表,以反映其动态输入表上的更改。

流式持续查询的过程
流式持续查询过程
  1. 流被转换为动态表
  2. 对动态表计算连续查询,生成新的动态表
  3. 生成的动态表被转换回流
动态表转换成流

与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改。将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink 的 Table API 和 SQL 支持三种方式对动态表的更改进行编码:

  1. 仅追加(Append-only)流
    只有插入操作,不涉及数据的Upsert
  2. 撤回(Retract)流
    Retract 流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息
  3. Upsert(更新插入)流
    要有唯一的key,撤回(Retract)流:UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息

3. 时间和窗口

事件时间

public class tableDemo4 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //在DDL中定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(createDDL);

        //流转换表中定义时间属性
        SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());

        table.printSchema();
        env.execute();

    }
}
处理时间

proctime(计算列)


public class tableDemo4 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //在DDL中定义时间属性
        String createDDL = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as PROCTIME() " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(createDDL);

        tableEnv.from("clickTable").printSchema();

        //流转换表中定义时间属性
        DataStreamSource<Event> streamOperator = env.addSource(new clickSource());

        Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").proctime());

        table.printSchema();

    }
}

我蒙了???处理时间语义要用水位线吗,没必要把!!!

窗口(window)

4. 聚合查询

分组聚合(老版本)

public class tableDemo5 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        Table table1 = tableEnv.sqlQuery("select user_name, count(1) from clickTable group by user_name");

        Table table2 = tableEnv.sqlQuery("select user_name, count(1), TUMBLE_END(et, INTERVAL '10' SECOND )  from clickTable group by user_name , TUMBLE(et, INTERVAL '10' SECOND )");

        tableEnv.toChangelogStream(table1).print("agg: ");
        tableEnv.toDataStream(table2).print("window: ");

        env.execute();


    }
}

窗口聚合

使用窗口TVF实现


public class tableDemo6 {


    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, "+
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/d.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        //滚动窗口,含头不含尾
        Table table3 = tableEnv.sqlQuery("select user_name, count(1),window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");

        //滑动窗口
        Table table4 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(HOP(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");

        //累计窗口与滑动窗口的区别
        Table table5 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(CUMULATE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");

        //tableEnv.toDataStream(table3).print("TUMBLE: ");
        tableEnv.toDataStream(table4).print("HOP: ");
        tableEnv.toDataStream(table5).print("CUMULATE: ");
        env.execute();


    }
}

理解累计窗口

小米,./click,1000
小米,./click,2000
小米,./click,5000
小米,./click,6000
小米,./click,9000
小米,./click,10000
小米,./click,10001
小米,./click,14001
小米,./click,15001
小米,./click,19001


HOP: > +I[小米, 2, 1970-01-01T07:59:55, 1970-01-01T08:00:05]
CUMULATE: > +I[小米, 2, 1970-01-01T08:00, 1970-01-01T08:00:05]
HOP: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
CUMULATE: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
CUMULATE: > +I[小米, 3, 1970-01-01T08:00:10, 1970-01-01T08:00:15]
CUMULATE: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
HOP: > +I[小米, 6, 1970-01-01T08:00:05, 1970-01-01T08:00:15]
HOP: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
HOP: > +I[小米, 2, 1970-01-01T08:00:15, 1970-01-01T08:00:25]
开窗函数

解决: OVER windows‘ ordering in stream mode must be defined on a time attribute

        //over 窗口
        Table table6 =  tableEnv.sqlQuery("select user_name, COUNT(1) OVER w as cnt from clickTable WINDOW w as (PARTITION BY user_name order by et rows between unbounded preceding and current row) ");
应用实例TOP_N案例

普通TOP_N

在执行过程中,Flink SQL会对输入的数据流根据排序键进行排序。如果某个分区的前N条记录发生了改变,则会将改变的那几条数据以更新流的形式发给下游

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
    ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  FROM table_name)
WHERE rownum <= N [AND conditions]

public class tableDemo1 {

    public static void main(String[] args) throws Exception {


        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);


        Table query = tableEnv.sqlQuery("select user_name, rn , cnt from " +
                "(select user_name,cnt, row_number() OVER(PARTITION BY user_name ORDER BY  cnt DESC ) as rn from " +
                "(select user_name , count(1) as cnt from clickTable group by user_name) ) " +
                "where rn <= 2 ");

        tableEnv.toChangelogStream(query).print();

        env.execute();
    }
}

窗口TOP_N


public class tableDemo2 {


    public static void main(String[] args) throws Exception {


        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(ddlTable);

        String sqlQuery = "select user_name , count(1) as cnt , window_start,window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND)) group by user_name,window_start,window_end";

        Table query = tableEnv.sqlQuery("select user_name, rn , cnt,window_start,window_end from " +
                "(select user_name,cnt,window_start,window_end, row_number() OVER(PARTITION BY window_start,window_end ORDER BY  cnt DESC ) as rn from " +
                "("+ sqlQuery +") ) " +
                "where rn <= 2 ");

        tableEnv.toDataStream(query).print();

        env.execute();
    }
}

5. Join查询

常规联结查询
间隔联结查询

6. 函数

系统函数
UDF函数
public class tableDemo4 {

    public static void main(String[] args) throws Exception {

        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        //2.注册自定义标量函数
        tableEnv.registerFunction("MyHash",new MyHashFunction());
        //createTemporarySystemFunction??怎么用 -- 见下面遇到的bug
        //tableEnv.createTemporarySystemFunction("MyHash",MyHashFunction.class);
        //3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
        Table resultTable = tableEnv.sqlQuery("select user_name,MyHash(user_name) from clickTable");

        //4.转换成流打印
        tableEnv.toDataStream(resultTable).print();

        env.execute();
    }

    //自定义实现ScalarFunction
    public static class MyHashFunction extends ScalarFunction{
        public int eval(String str){
            return str.hashCode();
        }
    }
}

bug:Unsupported class file major version 58
调整java版本:类文件具有错误的版本

调整java版本,14不支持...查了好久

public class tableDemo4 {

    public static void main(String[] args) throws Exception {

        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'src/main/resources/c.txt', " +
                " 'format' = 'csv'" +
                ")";


        tableEnv.executeSql(ddlTable);

        //2.注册自定义标量函数
        tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
        //createTemporarySystemFunction??怎么用
        //3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
        Table resultTable = tableEnv.sqlQuery("select user_name,url, word,length from clickTable ,LATERAL TABLE(MySplit(url)) AS T(word,length)");

        //4.转换成流打印
        tableEnv.toDataStream(resultTable).print();

        env.execute();
    }

    //自定义实现ScalarFunction
    @FunctionHint (output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class MyFunction extends TableFunction<Row> {

        public void eval(String str){
            String[] split = str.split(",");
            for (String s : split) {
                 collect(Row.of(s,s.length()));
            }
        }
    }
}

bug:List of column aliases must have same degree as table; the returned table of function
自定义类集成TableFunction时没有实现getResultType方法如我们此时输出的是Row类型,我们就需要重写此方法(旧API需要重新)

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}

7. SQL客户端

8. 连接到外部系统

kafka

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'kafka',       

  'connector.version' = '0.11',     -- required: valid connector versions are
                                    -- "0.8", "0.9", "0.10", "0.11", and "universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset", 
                                                   -- "latest-offset", "group-offsets", 
                                                   -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions 
                                         -- into Kafka's partitions valid are "fixed" 
                                         -- (each Flink partition ends up in at most one Kafka partition),
                                         -- "round-robin" (a Flink partition is distributed to 
                                         -- Kafka partitions round-robin)
                                         -- "custom" (use a custom FlinkKafkaPartitioner subclass)
  -- optional: used in case of sink partitioner custom
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  
  'format.type' = '...',                 -- required: Kafka connector requires to specify a format,
  ...                                    -- the supported formats are 'csv', 'json' and 'avro'.
                                         -- Please refer to Table Formats section for more details.
)
public class tableDemo6 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String ddlTable = "CREATE TABLE clickTable (" +
                " user_name STRING, " +
                " ts BIGINT, " +
                " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                " WATERMARK FOR et as et - INTERVAL '4' SECOND " +
                ") WITH (" +
                " 'connector' = 'kafka', " +
                " 'topic' = 'topic03', " +
                " 'properties.bootstrap.servers' = '43.142.80.86:9093', " +
                " 'scan.startup.mode' = 'earliest-offset', " +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(ddlTable);

        Table table = tableEnv.sqlQuery("select * from clickTable");

        tableEnv.toDataStream(table).print();
        env.execute();

    }
}

相关文章

  • 2022-06-11-Flink-48(七)

    1. 基本Table API 引入依赖 说明: 如果是生产环境,lib 目录下默认已经有了 planner,就只...

  • 听书:胡雪岩

    七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七夕七...

  • 人生七态

    人生:七难、七气、七别、七养、七心、七耐、七笑、七然、七稳 人生七难 1、相识容易,相知难 2、从恶容易,为善难 ...

  • 人生:七难、七气、七别、七养、七心、七耐、七笑、七然、七稳

    人生:七难、七气、七别、七养、七心、七耐、七笑、七然、七稳 人生七难 1、相识容易,相知难 2、从恶容易,为善难 ...

  • 七•七,七•七!

    前几天党庆百年,全国各地的政治团体和祖国人民花样翻新的歌唱伟大政党。 今天是“七•七”,却鲜见各级政府和人民来说些...

  • 七月七

    七月七,系七夕。情人节,女儿会。 七月七,鹊桥兮。相约双,牵手对。 七月七,别依依。天涯来,海角去。 七月七,空唧...

  • 初七用典

    新年快乐,“七”历史 水淹七军 七擒七纵 七步之才 竹林七贤 七七事变 “七子均养者,鳲鳩之仁也。”“七”典故: ...

  • 前七日

    ——《创世纪》 七天七座里程碑 七天七篇日记 岁月的河流啊!狡黠地狂奔 七天七次收获 七天七个奇迹 七天之后人类的...

  • 学佛闻思:农历七月真的是鬼月吗?佛弟子能做什么?

    七月是吉祥月 “七”,在中国的数字当中,是一个奇数:七巧、七星、七彩、七律;基督教也有“七天创造宇宙”之说,甚至七...

  • 天上七夕 人间七惜

    天上有七夕,人间有七惜。 七夕 “七惜” 余生, 好好珍惜这七种人, 七夕,是浪漫, 七惜,是真心。 一惜:生...

网友评论

      本文标题:2022-06-11-Flink-48(七)

      本文链接:https://www.haomeiwen.com/subject/easbmrtx.html