美文网首页flink
11-flink读写MySQL

11-flink读写MySQL

作者: 当当一丢丢 | 来源:发表于2020-02-28 12:04 被阅读0次

    一、读MySQL

    1、通过JDBC方式定义MySQLDataSource类

    1.1首先加入JDBC依赖
    1.2定义JDBCInputFormat
    1.3获取Row类型的DataStreamSource
    1.4转化DataStream<Row>为DataStream<Student>

    public class MysqlDataSource {
    
        private static final Logger log = LoggerFactory.getLogger(MySQLDataSource.class);
    
        public static DataStream<Student> readFromDb(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
            //final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //1.定义field 类型
            TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
            //2.定义field name
            String[] fieldNames = new String[]{"id", "name", "password", "age"};
            //3.定义Row类型
            RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
            
            String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
            log.info(jdbcUrl);
    
            //4.定义JDBCInputFormat
            JDBCInputFormat jdbcInputFormat = JDBCInputFormat
                    .buildJDBCInputFormat()
                    .setDrivername("com.mysql.jdbc.Driver")
                    .setDBUrl(jdbcUrl)
                    .setUsername(parameterTool.get(PropertiesConstants.MYSQL_USERNAME))
                    .setPassword(parameterTool.get(PropertiesConstants.MYSQL_PASSWORD))
                    .setQuery("select id, name, password, age from student")
                    .setRowTypeInfo(rowTypeInfo)
                    .finish();
    
            //5.以JDBCInputFormat形式读取MySQL DB数据
            DataStreamSource<Row> dataStreamSourceRow = streamExecutionEnvironment.createInput(jdbcInputFormat);
    
            //阶段性验证可以正确读取
            dataStreamSourceRow.print();
    
            //6.将Row类型Stream转化为Entity类型
            DataStream<Student> dataStream = dataStreamSourceRow.map(new RichMapFunction<Row, Student>() {
                @Override
                public Student map(Row value) throws Exception {
                    Student s = new Student();
                    s.setId((Integer) value.getField(0));
                    s.setName((String) value.getField(1));
                    s.setPassword((String) value.getField(2));
                    s.setAge((Integer) value.getField(3));
                    return s;
                }
            });
    
            log.info("read datasource end");
            return dataStream;
    }
    

    2、通过自定义DataSource方式

    • 实现RichSourceFunction<T>接口,T设置DataStream数据类型
    • 使用模板
      • open()方法初始化全局使用数据(比如PrepareStatement等,可类比构造函数或者junit的的@Before这些)
      • run()方法
        • 一般使用while循环不断获取数据
        • while获取的数据需要以流的形式发送出去,使用SourceContext.collect(yourData)就好
        • 这里sourceContext收集(collect)的数据可以是单条(一条Student)也可是List<Student>集合,使用集合要把RichSourceFunction<T>泛型设为List<Student>
      • cancel()方法用于停止while循环,即停止获取数据
    /**
     * 通过RichSourceFunction 返回DataStream<Student>类型数据流,且每隔10s读取一次MySQL DB
     */
    public class JdbcReader2 extends RichSourceFunction<Student> {
        private static final Logger logger = LoggerFactory.getLogger(JdbcReader2.class);
    
        private Connection connection = null;
        private PreparedStatement ps = null;
        private volatile boolean isRunning = true;
    
        //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            DriverManager.registerDriver(new Driver());
            ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;
            String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
            connection = DriverManager.getConnection(jdbcUrl, "root", "abc123456");//获取连接
            ps = connection.prepareStatement("select id, name, password, age, flag from student where flag='true'");
        }
    
        //执行查询并获取结果
        @Override
        public void run(SourceContext<Student> ctx) throws Exception {
    
            // List<Student> students = new ArrayList<>();
    
            try {
                while (isRunning) {
                    ResultSet resultSet = ps.executeQuery();
                    while (resultSet.next()) {
    
                        Student student = new Student();
                        student.setId(resultSet.getInt(1));
                        student.setName(resultSet.getString(2));
                        student.setPassword(resultSet.getString(3));
                        student.setAge(resultSet.getInt(4));
    
                        String flag = resultSet.getString(5);
                        student.setFlag(flag);
    
                        if (Boolean.parseBoolean(flag)) {
                            //students.add(student);
                            //以单个Student为单位发送数据
                            ctx.collect(student);//发送结果
                            logger.info("student >>>>>>" + student);
                        }
                    }
    
                    Thread.sleep(1000 * 5);
                }
            } catch (Exception e) {
                logger.error("runException:{}", e);
            }
        }
    
        //关闭数据库连接
        @Override
        public void cancel() {
            try {
                super.close();
                if (connection != null) {
                    connection.close();
                }
                if (ps != null) {
                    ps.close();
                }
            } catch (Exception e) {
                logger.error("runException:{}", e);
            }
            isRunning = false;
        }
    }
    

    3、两种方式对比

    • JDBC需要引入单独的依赖,自定义DataSource方式无特殊依赖
    • JDBC不论读还是写只能处理批数据,RichSourceFunction还是付接口SourceFunction都是流式接口

    二、写MySQL

    1、通过JDBC方式

    Table API提供通过JDBC写MySQL的方式

    • 获取Table(可以通过DataStream转化而来)-table
    • 将table注册到Environment(作为临时view)-tempView
    • 创建inner-dest-table->out-dest-table映射(inner-dest-table是flink内部表,通过insert数据到inner-dest-table 将数据插入到out-dest-table中
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //1.添加数据源
            DataStream<Student> studentDataStream = env.addSource(new JdbcReader2());
    
    
            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    //.useBlinkPlanner()
                    .inStreamingMode()
                    .build();
    
            StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, settings);
    
            //2.从DataStream获取数据
            Table table = streamTableEnvironment.fromDataStream(studentDataStream);
    
            streamTableEnvironment.createTemporaryView("temp_table", table);
    
            //3.创建sink内部Table
            String destSql = FileUtil.readSourceFile("destination.sql");
            streamTableEnvironment.sqlUpdate(destSql);
    
            //4.将内部Table插入到outer system
            String insertSql = FileUtil.readSourceFile("insert.sql");
    
            streamTableEnvironment.sqlUpdate(insertSql);
    
    
            env.execute("sort-streaming-data");
    
            log.info("end");
    
    • sql文件,保存到resources目录,并用FileUtils读取(仅仅是外置SQL而已也可直接写到代码中)
    #destination.sql
    CREATE TABLE student_dest (
                                  id INT,
                                  name VARCHAR,
                                  password VARCHAR,
                                  age INT
    ) WITH (
        'connector.type' = 'jdbc', -- 使用 jdbc connector
        'connector.url' = 'jdbc:mysql://localhost:3306/flink_demo', -- jdbc url
        'connector.table' = 'student_2', -- 表名
        'connector.username' = 'root', -- 用户名
        'connector.password' = 'abc123456', -- 密码
        'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
    )
    
    #insert.sql, temp_table为代码临时table
    INSERT INTO student_dest
    SELECT
        id,
        name,
        password,
        age
    FROM temp_table
    
    

    2、通过自定义Sink方式

    • 通过实行RichSinkFunction接口
    • 步骤(同RichSourceFunction是一致的)
      • open() 初始化数据
      • invoke() 每获取一次数据将其处理存储到outer system
      • close() 清理及关闭资源
    public class MySQLSink extends RichSinkFunction<Student> {
    
        private static final Logger log = LoggerFactory.getLogger(MySQLSink.class);
    
        PreparedStatement ps;
        BasicDataSource dataSource;
        private Connection connection;
    
        /**
         * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            dataSource = new BasicDataSource();
            connection = getConnection(dataSource);
            String sql = "insert into student_2(id, name, password, age) values(?, ?, ?, ?);";
            if (connection != null) {
                ps = this.connection.prepareStatement(sql);
            }
        }
    
        @Override
        public void close() throws Exception {
            super.close();
            //关闭连接和释放资源
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        }
    
        /**
         * 每条数据的插入都要调用一次 invoke() 方法
         *
         * @param value
         * @param context
         * @throws Exception
         */
        @Override
        public void invoke(Student value, Context context) throws Exception {
            if (ps == null) {
                return;
            }
            //遍历数据集合
            Student student = value;
            //for (Student student : value) {
                ps.setInt(1, student.getId());
                ps.setString(2, student.getName());
                ps.setString(3, student.getPassword());
                ps.setInt(4, student.getAge());
                ps.addBatch();
            //}
            int[] count = ps.executeBatch();//批量后执行
            log.info("成功了插入了 {} 行数据", count.length);
        }
    
    
        private static Connection getConnection(BasicDataSource dataSource) {
            dataSource.setDriverClassName("com.mysql.jdbc.Driver");
            //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
            dataSource.setUrl("jdbc:mysql://localhost:3306/flink_demo");
            dataSource.setUsername("root");
            dataSource.setPassword("abc123456");
            //设置连接池的一些参数
            dataSource.setInitialSize(10);
            dataSource.setMaxTotal(50);
            dataSource.setMinIdle(2);
    
            Connection con = null;
            try {
                con = dataSource.getConnection();
                log.info("创建连接池:{}", con);
            } catch (Exception e) {
                log.error("-----------mysql get connection has exception , msg = {}", e.getMessage());
            }
            return con;
        }
    }
    

    3、jdbc和Sink方式对比

    • JDBC是用sqlQuery()和sqlUpdate()来执行所有查询和insert/update更新操作,只需要将创建table,insert语句用SQL整理到一起即可,通过insert语句插入out system

    相关文章

      网友评论

        本文标题:11-flink读写MySQL

        本文链接:https://www.haomeiwen.com/subject/rvjbchtx.html