美文网首页
《Flink入门》读取MySQl中的数据并写入到MySQl

《Flink入门》读取MySQl中的数据并写入到MySQl

作者: 饲养员壹号 | 来源:发表于2019-06-15 03:17 被阅读0次

1、

package db;

/**

* Created by admin on 2019/6/8.

*/

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

public class JdbcReaderextends RichSourceFunction> {

private static final Loggerlogger = LoggerFactory.getLogger(JdbcReader.class);

    private Connectionconnection =null;

    private PreparedStatementps =null;

    Stringusername ="root";

    Stringpassword ="root";

    Stringurl ="jdbc:mysql://192.168.191.1:3306/springboot?autoReconnect=true&useSSL=false&user="+username+"&password="+password;

    Stringdriver ="com.mysql.jdbc.Driver";

    Stringsql ="select *  from user_info";

    @Override

    public void open(Configuration parameters)throws Exception {

super.open(parameters);

        Class.forName(driver);

        connection = DriverManager.getConnection(url);

        ps =connection.prepareStatement(sql);

    }

//执行查询并获取结果

    @Override

    public void run(SourceContext> ctx)throws Exception {

try {

ResultSet resultSet =ps.executeQuery();

            while (resultSet.next()) {

String username = resultSet.getString("username");

                String password = resultSet.getString("password");

                String id = resultSet.getString("Id");

                logger.error("readJDBC name:{}", username +"==" +id+"==" + resultSet);

                Tuple3 tuple3 =new Tuple3();

                tuple3.setFields(id,username,password);

                ctx.collect(tuple3);

            }

}catch (Exception e) {

logger.error("runException:{}", e);

        }

}

//关闭数据库连接

    @Override

    public void cancel() {

try {

super.close();

            if (connection !=null) {

connection.close();

            }

if (ps !=null) {

ps.close();

            }

}catch (Exception e) {

logger.error("runException:{}", e);

        }

}

}

相关文章

网友评论

      本文标题:《Flink入门》读取MySQl中的数据并写入到MySQl

      本文链接:https://www.haomeiwen.com/subject/fcvefctx.html