美文网首页
Flink基础系列23-Sink之JDBC

Flink基础系列23-Sink之JDBC

作者: 只是甲 | 来源:发表于2021-10-28 15:11 被阅读0次

    一.Sink之JDBC概述

    Flink的Sink支持的数据库:


    image.png

    Bahir中支持的数据库:


    image.png

    从上两图可以看到,Flink的Sink并支持类似MySQL的这种关系型数据库,那么如果我需要通过Flink连接MySQL,该如何操作呢?

    这个时候我们可以使用Flink Sink的JDBC连接。

    二.pom文件配置

    此处,我本地MySQL版本是 8.0.19

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.19</version>
    </dependency>
    

    三.MySQL配置

    新建数据库及表

    CREATE DATABASE `flink_test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
    
    CREATE TABLE `sensor_temp` (
      `id` varchar(32) NOT NULL,
      `temp` double NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    四.编写Java代码

    package org.flink.sink;
    
    import org.flink.beans.SensorReading;
    import org.example.SourceTest4_UDF;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    /**
     * @author 只是甲
     * @date   2021-09-15
     * @remark Sink之JDBC
     */
    
    public class SinkTest4_Jdbc {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从文件读取数据
    //        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
    //
    //        // 转换成SensorReading类型
    //        DataStream<SensorReading> dataStream = inputStream.map(line -> {
    //            String[] fields = line.split(",");
    //            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    //        });
    
            DataStream<SensorReading> dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());
    
            dataStream.addSink(new MyJdbcSink());
    
            env.execute();
        }
    
        // 实现自定义的SinkFunction
        public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
            // 声明连接和预编译语句
            Connection connection = null;
            PreparedStatement insertStmt = null;
            PreparedStatement updateStmt = null;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
                insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
                updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
            }
    
            // 每来一条数据,调用连接,执行sql
            @Override
            public void invoke(SensorReading value, Context context) throws Exception {
                // 直接执行更新语句,如果没有更新那么就插入
                updateStmt.setDouble(1, value.getTemperature());
                updateStmt.setString(2, value.getId());
                updateStmt.execute();
                if( updateStmt.getUpdateCount() == 0 ){
                    insertStmt.setString(1, value.getId());
                    insertStmt.setDouble(2, value.getTemperature());
                    insertStmt.execute();
                }
            }
    
            @Override
            public void close() throws Exception {
                insertStmt.close();
                updateStmt.close();
                connection.close();
            }
        }
    }
    

    五.运行Flink程序查看数据

    mysql> select * from sensor_temp;
    +-----------+-------------------+
    | id        | temp              |
    +-----------+-------------------+
    | sensor_3  | 65.31089123002162 |
    | sensor_10 | 20.23454807781744 |
    | sensor_4  | 79.87349739590283 |
    | sensor_1  | 68.79742249825429 |
    | sensor_2  |  44.1766638371653 |
    | sensor_7  | 99.47000620947128 |
    | sensor_8  |  68.7360059804266 |
    | sensor_5  |  69.9135258264366 |
    | sensor_6  | 38.85722751176939 |
    | sensor_9  | 69.97758295030204 |
    +-----------+-------------------+
    10 rows in set (0.00 sec)
    
    mysql>
    

    参考:

    1. https://www.bilibili.com/video/BV1qy4y1q728
    2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae

    相关文章

      网友评论

          本文标题:Flink基础系列23-Sink之JDBC

          本文链接:https://www.haomeiwen.com/subject/pwnailtx.html