美文网首页
(7)FlinkSQL将kafka数据写入到mysql方式二

(7)FlinkSQL将kafka数据写入到mysql方式二

作者: NBI大数据可视化分析 | 来源:发表于2022-08-11 10:57 被阅读0次
    public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            tableEnv.executeSql("CREATE TABLE WaterSensor (" +
                    "id STRING," +
                    "ts BIGINT," +
                    "vc BIGINT," +
    //                "`pt` TIMESTAMP(3),"+
    //                "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +
                    "pt as PROCTIME() " +
                    ") WITH (" +
                    "'connector' = 'kafka'," +
                    "'topic' = 'kafka_data_waterSensor'," +
                    "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
                    "'properties.group.id' =  'test'," +
                    "'scan.startup.mode' = 'earliest-offset'," +
    //                "'json.fail-on-missing-field' = 'false'," +
    //                "'json.ignore-parse-errors' = 'true'," +
                    "'format' = 'json'" +
                    ")"
            );
    
            tableEnv.executeSql("CREATE TABLE flinksink (" +
                    "componentname STRING," +
                    "componentcount BIGINT NOT NULL," +
                    "componentsum BIGINT" +
                    ") WITH (" +
                    "'connector.type' = 'jdbc'," +
                    "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
                    "'connector.table' = 'flinksink'," +
                    "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                    "'connector.username' = 'root'," +
                    "'connector.password' = 'root'," +
                    "'connector.write.flush.max-rows'='3'\r\n" +
                    ")"
            );
    
            Table result = tableEnv.sqlQuery(
                    "SELECT " +
                            "id as componentname, " +                //window_start, window_end,
                            "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
                            "FROM TABLE( " +
                            "TUMBLE( TABLE WaterSensor , " +
                            "DESCRIPTOR(pt), " +
                            "INTERVAL '10' SECOND)) " +
                            "GROUP BY id , window_start, window_end"
            );
    
    //        //方式一:写入数据库
    ////        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");
    //
            //方式二:写入数据库
            tableEnv.createTemporaryView("ResultTable", result);
            tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();
    
            env.execute();
        }
    

    相关文章

      网友评论

          本文标题:(7)FlinkSQL将kafka数据写入到mysql方式二

          本文链接:https://www.haomeiwen.com/subject/ihkwwrtx.html