美文网首页
(1)通过FlinkSQL将数据写入mysql demo

(1)通过FlinkSQL将数据写入mysql demo

作者: NBI大数据可视化分析 | 来源:发表于2022-08-06 16:55 被阅读0次
    1.png

    FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。
    (1)首先引入POM依赖:

    <properties>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.0</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.66</version>
        </dependency>
    </dependencies>
    

    (2)编写代码

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);
    
    
        String ddl = "CREATE TABLE flinksinksds(\r\n" +
                "componentname STRING,\r\n" +
                "componentcount INT,\r\n" +
                "componentsum INT\r\n" +
                ") WITH(\r\n" +
                "'connector.type'='jdbc',\r\n" +
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',\r\n" +
                "'connector.table'='flinksink',\r\n" +
                "'connector.username'='root',\r\n" +
                "'connector.password'='root',\r\n" +
                "'connector.write.flush.max-rows'='1'\r\n" +
                ")";
        System.err.println(ddl);
        ste.executeSql(ddl);
    
        String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" +
                "values('1024', 1 , 2 )";
        ste.executeSql(insert);
        env.execute();
        System.exit(0);
    }
    

    (3)执行结果:


    2.png

    相关文章

      网友评论

          本文标题:(1)通过FlinkSQL将数据写入mysql demo

          本文链接:https://www.haomeiwen.com/subject/aykvwrtx.html