美文网首页
2022-06-11-Flink-48(七)

2022-06-11-Flink-48(七)

作者: 冰菓_ | 来源:发表于2022-06-11 23:12 被阅读0次

    1. 基本Table API

    引入依赖
       <!-- flink-table桥接器 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- flink-如果我们希望在IDEA中允许table则要引入如下依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
    

    说明: 如果是生产环境,lib 目录下默认已经有了 planner,就只需要有 bridge 就可以了。当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common 里

    表创建和查询
    
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class tableDemo2 {
    
         public static void main(String[] args) {
              EnvironmentSettings setting = EnvironmentSettings.newInstance()
                      .inStreamingMode()
                      .useBlinkPlanner()
                      .build();
              
              //不通过流环境创建table环境
              //基于blink版本planner进行流处理
              TableEnvironment tableEnv = TableEnvironment.create(setting);
              
              /* 小米,./click,10000
              小红,./click,10000
              小牛,./click,10000
              小黄,./click,10000*/
    
              //创建一张输入的表
              String createDDL = "CREATE TABLE clickTable (" +
              " user_name STRING, "+
              " url STRING, " +
              " ts BIGINT " +
              ") WITH (" +
              " 'connector' = 'filesystem', " +
              " 'path' = 'src/main/resources/c.txt', " +
              " 'format' = 'csv'" +
              ")";
    
              tableEnv.executeSql(createDDL);
    
              //table-api查询
              Table table1 = tableEnv.from("clickTable").where($("user_name").isEqual("小米")).select($("user_name"), $("url"));
              tableEnv.createTemporaryView("table1",table1);
    
    
    
              //创建一张输出的表
              String createOutDDL = "CREATE TABLE outTable (" +
                      " user_name STRING, "+
                      " url STRING " +
                      ") WITH (" +
                      " 'connector' = 'filesystem', " +
                      " 'path' = 'output/2022-06-11', " +
                      " 'format' = 'csv' " +
                      ")";
    
              //执行SQL查询
              Table table2 = tableEnv.sqlQuery("select user_name , url from clickTable");
    
              Table table_view = tableEnv.sqlQuery("select user_name , url from table1");
              tableEnv.executeSql(createOutDDL);
    
              table_view.executeInsert("outTable");
              table2.executeInsert("outTable");
    
         }
    }
    
    

    查询有两种:一个是直接table-api查,另一种是SQL查询

    bug:Exception in thread “main“ org.apache.flink.table.api.SqlParserException: SQL parse failed. Encounte
    解决:user是关键字,或其他关键字问题,使用``框起来

    //print打印
              String createOutDDL = "CREATE TABLE outTable (" +
                      " user_name STRING, "+
                      " url STRING " +
                      ") WITH (" +
                      " 'connector' = 'print' " +
                      ")";
    
    表转换成流

    更新日志流

    import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    public class tableDemo3 {
    
        public static void main(String[] args) throws Exception {
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //创建一张输入的表
            String createDDL = "CREATE TABLE clickTable (" +
                    " user_name STRING, "+
                    " url STRING, " +
                    " ts BIGINT " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
            //注册表
            tableEnv.executeSql(createDDL);
    
            Table table = tableEnv.sqlQuery("select user_name , count(1) from clickTable where user_name = '小米' group by  user_name");
    
    
            tableEnv.toChangelogStream(table).print("agg");
    
            env.execute();
        }
    }
    
    流转换成表
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import java.time.Duration;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class tableDemo1 {
    
         public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
              env.setParallelism(1);
              SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                   @Override
                   public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                   }
              }));
    
              //1. 创建表环境
              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
              //2. 将stream转换成table
              Table table = tableEnv.fromDataStream(streamOperator);
    
              //写SQL转换
              //这里的table不是表而是一个对象,实现流对象和表之间转换的媒介是虚拟表
              Table table1 = tableEnv.sqlQuery("select user , url , `timestamp` from " + table);
    
              //基于Table直接转换
              Table table2 = table.select($("user"), $("url"), $("timestamp")).where($("url").isEqual("./click"));
    
              tableEnv.toDataStream(table1).print("table1");
              tableEnv.toDataStream(table2).print("table2");
    
              env.execute();
    
         }
    
    }
    
    

    2. 流处理中的特殊概念

    流处理和关系代数(表,及 SQL)的区别
    区别
    动态表(Dynamic Tables)

    动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)。连续查询永远不会终止,并会生成另一个动态表。查询(Query)会不断更新其动态结果表,以反映其动态输入表上的更改。

    流式持续查询的过程
    流式持续查询过程
    1. 流被转换为动态表
    2. 对动态表计算连续查询,生成新的动态表
    3. 生成的动态表被转换回流
    动态表转换成流

    与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改。将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink 的 Table API 和 SQL 支持三种方式对动态表的更改进行编码:

    1. 仅追加(Append-only)流
      只有插入操作,不涉及数据的Upsert
    2. 撤回(Retract)流
      Retract 流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息
    3. Upsert(更新插入)流
      要有唯一的key,撤回(Retract)流:UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息

    3. 时间和窗口

    事件时间
    
    public class tableDemo4 {
    
        public static void main(String[] args) throws Exception {
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //在DDL中定义时间属性
            String createDDL = "CREATE TABLE clickTable (" +
                    " user_name STRING, "+
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(createDDL);
    
            //流转换表中定义时间属性
            SingleOutputStreamOperator<Event> streamOperator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
            Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
    
            table.printSchema();
            env.execute();
    
        }
    }
    
    处理时间

    proctime(计算列)

    
    public class tableDemo4 {
    
        public static void main(String[] args) throws Exception {
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //在DDL中定义时间属性
            String createDDL = "CREATE TABLE clickTable (" +
                    " user_name STRING, "+
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as PROCTIME() " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(createDDL);
    
            tableEnv.from("clickTable").printSchema();
    
            //流转换表中定义时间属性
            DataStreamSource<Event> streamOperator = env.addSource(new clickSource());
    
            Table table = tableEnv.fromDataStream(streamOperator, $("user"), $("url"), $("timestamp").as("ts"), $("et").proctime());
    
            table.printSchema();
    
        }
    }
    
    

    我蒙了???处理时间语义要用水位线吗,没必要把!!!

    窗口(window)

    4. 聚合查询

    分组聚合(老版本)
    
    public class tableDemo5 {
    
        public static void main(String[] args) throws Exception {
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, "+
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(ddlTable);
    
            Table table1 = tableEnv.sqlQuery("select user_name, count(1) from clickTable group by user_name");
    
            Table table2 = tableEnv.sqlQuery("select user_name, count(1), TUMBLE_END(et, INTERVAL '10' SECOND )  from clickTable group by user_name , TUMBLE(et, INTERVAL '10' SECOND )");
    
            tableEnv.toChangelogStream(table1).print("agg: ");
            tableEnv.toDataStream(table2).print("window: ");
    
            env.execute();
    
    
        }
    }
    
    
    窗口聚合

    使用窗口TVF实现

    
    public class tableDemo6 {
    
    
        public static void main(String[] args) throws Exception {
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv= StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, "+
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/d.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(ddlTable);
    
            //滚动窗口,含头不含尾
            Table table3 = tableEnv.sqlQuery("select user_name, count(1),window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");
    
            //滑动窗口
            Table table4 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(HOP(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");
    
            //累计窗口与滑动窗口的区别
            Table table5 = tableEnv.sqlQuery("select user_name, count(1),window_start, window_end from TABLE(CUMULATE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '5' SECOND ,INTERVAL '10' SECOND  )) group by user_name ,window_start ,window_end");
    
            //tableEnv.toDataStream(table3).print("TUMBLE: ");
            tableEnv.toDataStream(table4).print("HOP: ");
            tableEnv.toDataStream(table5).print("CUMULATE: ");
            env.execute();
    
    
        }
    }
    

    理解累计窗口

    小米,./click,1000
    小米,./click,2000
    小米,./click,5000
    小米,./click,6000
    小米,./click,9000
    小米,./click,10000
    小米,./click,10001
    小米,./click,14001
    小米,./click,15001
    小米,./click,19001
    
    
    HOP: > +I[小米, 2, 1970-01-01T07:59:55, 1970-01-01T08:00:05]
    CUMULATE: > +I[小米, 2, 1970-01-01T08:00, 1970-01-01T08:00:05]
    HOP: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
    CUMULATE: > +I[小米, 5, 1970-01-01T08:00, 1970-01-01T08:00:10]
    CUMULATE: > +I[小米, 3, 1970-01-01T08:00:10, 1970-01-01T08:00:15]
    CUMULATE: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
    HOP: > +I[小米, 6, 1970-01-01T08:00:05, 1970-01-01T08:00:15]
    HOP: > +I[小米, 5, 1970-01-01T08:00:10, 1970-01-01T08:00:20]
    HOP: > +I[小米, 2, 1970-01-01T08:00:15, 1970-01-01T08:00:25]
    
    开窗函数

    解决: OVER windows‘ ordering in stream mode must be defined on a time attribute

            //over 窗口
            Table table6 =  tableEnv.sqlQuery("select user_name, COUNT(1) OVER w as cnt from clickTable WINDOW w as (PARTITION BY user_name order by et rows between unbounded preceding and current row) ");
    
    应用实例TOP_N案例

    普通TOP_N

    在执行过程中,Flink SQL会对输入的数据流根据排序键进行排序。如果某个分区的前N条记录发生了改变,则会将改变的那几条数据以更新流的形式发给下游

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
        ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
      FROM table_name)
    WHERE rownum <= N [AND conditions]
    
    
    public class tableDemo1 {
    
        public static void main(String[] args) throws Exception {
    
    
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, " +
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(ddlTable);
    
    
            Table query = tableEnv.sqlQuery("select user_name, rn , cnt from " +
                    "(select user_name,cnt, row_number() OVER(PARTITION BY user_name ORDER BY  cnt DESC ) as rn from " +
                    "(select user_name , count(1) as cnt from clickTable group by user_name) ) " +
                    "where rn <= 2 ");
    
            tableEnv.toChangelogStream(query).print();
    
            env.execute();
        }
    }
    

    窗口TOP_N

    
    public class tableDemo2 {
    
    
        public static void main(String[] args) throws Exception {
    
    
            LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, " +
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
            tableEnv.executeSql(ddlTable);
    
            String sqlQuery = "select user_name , count(1) as cnt , window_start,window_end from TABLE(TUMBLE(TABLE clickTable ,DESCRIPTOR(et) ,INTERVAL '10' SECOND)) group by user_name,window_start,window_end";
    
            Table query = tableEnv.sqlQuery("select user_name, rn , cnt,window_start,window_end from " +
                    "(select user_name,cnt,window_start,window_end, row_number() OVER(PARTITION BY window_start,window_end ORDER BY  cnt DESC ) as rn from " +
                    "("+ sqlQuery +") ) " +
                    "where rn <= 2 ");
    
            tableEnv.toDataStream(query).print();
    
            env.execute();
        }
    }
    
    

    5. Join查询

    常规联结查询
    间隔联结查询

    6. 函数

    系统函数
    UDF函数
    public class tableDemo4 {
    
        public static void main(String[] args) throws Exception {
    
            LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, " +
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(ddlTable);
    
            //2.注册自定义标量函数
            tableEnv.registerFunction("MyHash",new MyHashFunction());
            //createTemporarySystemFunction??怎么用 -- 见下面遇到的bug
            //tableEnv.createTemporarySystemFunction("MyHash",MyHashFunction.class);
            //3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
            Table resultTable = tableEnv.sqlQuery("select user_name,MyHash(user_name) from clickTable");
    
            //4.转换成流打印
            tableEnv.toDataStream(resultTable).print();
    
            env.execute();
        }
    
        //自定义实现ScalarFunction
        public static class MyHashFunction extends ScalarFunction{
            public int eval(String str){
                return str.hashCode();
            }
        }
    }
    
    

    bug:Unsupported class file major version 58
    调整java版本:类文件具有错误的版本

    调整java版本,14不支持...查了好久
    
    public class tableDemo4 {
    
        public static void main(String[] args) throws Exception {
    
            LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, " +
                    " url STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '1' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'filesystem', " +
                    " 'path' = 'src/main/resources/c.txt', " +
                    " 'format' = 'csv'" +
                    ")";
    
    
            tableEnv.executeSql(ddlTable);
    
            //2.注册自定义标量函数
            tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
            //createTemporarySystemFunction??怎么用
            //3.调用UDF进行查询转换 (查询当前user以及user的hashcode)
            Table resultTable = tableEnv.sqlQuery("select user_name,url, word,length from clickTable ,LATERAL TABLE(MySplit(url)) AS T(word,length)");
    
            //4.转换成流打印
            tableEnv.toDataStream(resultTable).print();
    
            env.execute();
        }
    
        //自定义实现ScalarFunction
        @FunctionHint (output = @DataTypeHint("ROW<word STRING, length INT>"))
        public static class MyFunction extends TableFunction<Row> {
    
            public void eval(String str){
                String[] split = str.split(",");
                for (String s : split) {
                     collect(Row.of(s,s.length()));
                }
            }
        }
    }
    
    

    bug:List of column aliases must have same degree as table; the returned table of function
    自定义类集成TableFunction时没有实现getResultType方法如我们此时输出的是Row类型,我们就需要重写此方法(旧API需要重新)

    @Override
    public TypeInformation<Row> getResultType() {
    return Types.ROW(Types.STRING, Types.INT);
    }
    

    7. SQL客户端

    8. 连接到外部系统

    kafka

    CREATE TABLE MyUserTable (
      ...
    ) WITH (
      'connector.type' = 'kafka',       
    
      'connector.version' = '0.11',     -- required: valid connector versions are
                                        -- "0.8", "0.9", "0.10", "0.11", and "universal"
    
      'connector.topic' = 'topic_name', -- required: topic name from which the table is read
    
      'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
      'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
      'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
      'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset", 
                                                       -- "latest-offset", "group-offsets", 
                                                       -- or "specific-offsets"
    
      -- optional: used in case of startup mode with specific offsets
      'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
    
      'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions 
                                             -- into Kafka's partitions valid are "fixed" 
                                             -- (each Flink partition ends up in at most one Kafka partition),
                                             -- "round-robin" (a Flink partition is distributed to 
                                             -- Kafka partitions round-robin)
                                             -- "custom" (use a custom FlinkKafkaPartitioner subclass)
      -- optional: used in case of sink partitioner custom
      'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
      
      'format.type' = '...',                 -- required: Kafka connector requires to specify a format,
      ...                                    -- the supported formats are 'csv', 'json' and 'avro'.
                                             -- Please refer to Table Formats section for more details.
    )
    
    public class tableDemo6 {
    
        public static void main(String[] args) throws Exception {
            LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            String ddlTable = "CREATE TABLE clickTable (" +
                    " user_name STRING, " +
                    " ts BIGINT, " +
                    " et as TO_TIMESTAMP( FROM_UNIXTIME(ts /1000 ) ), " +
                    " WATERMARK FOR et as et - INTERVAL '4' SECOND " +
                    ") WITH (" +
                    " 'connector' = 'kafka', " +
                    " 'topic' = 'topic03', " +
                    " 'properties.bootstrap.servers' = '43.142.80.86:9093', " +
                    " 'scan.startup.mode' = 'earliest-offset', " +
                    " 'format' = 'csv'" +
                    ")";
    
            tableEnv.executeSql(ddlTable);
    
            Table table = tableEnv.sqlQuery("select * from clickTable");
    
            tableEnv.toDataStream(table).print();
            env.execute();
    
        }
    }
    

    相关文章

      网友评论

          本文标题:2022-06-11-Flink-48(七)

          本文链接:https://www.haomeiwen.com/subject/easbmrtx.html