1、
package db;
/**
* Created by admin on 2019/6/8.
*/
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class JdbcReaderextends RichSourceFunction> {
private static final Loggerlogger = LoggerFactory.getLogger(JdbcReader.class);
private Connectionconnection =null;
private PreparedStatementps =null;
Stringusername ="root";
Stringpassword ="root";
Stringurl ="jdbc:mysql://192.168.191.1:3306/springboot?autoReconnect=true&useSSL=false&user="+username+"&password="+password;
Stringdriver ="com.mysql.jdbc.Driver";
Stringsql ="select * from user_info";
@Override
public void open(Configuration parameters)throws Exception {
super.open(parameters);
Class.forName(driver);
connection = DriverManager.getConnection(url);
ps =connection.prepareStatement(sql);
}
//执行查询并获取结果
@Override
public void run(SourceContext> ctx)throws Exception {
try {
ResultSet resultSet =ps.executeQuery();
while (resultSet.next()) {
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String id = resultSet.getString("Id");
logger.error("readJDBC name:{}", username +"==" +id+"==" + resultSet);
Tuple3 tuple3 =new Tuple3();
tuple3.setFields(id,username,password);
ctx.collect(tuple3);
}
}catch (Exception e) {
logger.error("runException:{}", e);
}
}
//关闭数据库连接
@Override
public void cancel() {
try {
super.close();
if (connection !=null) {
connection.close();
}
if (ps !=null) {
ps.close();
}
}catch (Exception e) {
logger.error("runException:{}", e);
}
}
}
网友评论