实现代码
1、准备工作:
SparkConf conf = new SparkConf().setAppName("JDBCDataSource");//.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
2、从JDBC数据读取:
Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:mysql://spark1:3306/testdb");
//读取第一个表 options.put("dbtable", "student_infos");
DataFrame studentInfosDF = sqlContext.read().format("jdbc")
.options(options).load(); //读取第二个表
options.put("dbtable", "student_scores");
DataFrame studentScoresDF = sqlContext.read().format("jdbc")
.options(options).load();
3、 写入数据到JDBC
studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
privatestaticfinallong serialVersionUID = 1L;
public void call(Row row) throws Exception {
// TODO Auto-generated method stub
String sql = "insert into good_student_infos values("
+ "'" + String.valueOf(row.getString(0)) + "',"
+ Integer.valueOf(String.valueOf(row.get(1))) + ","
+ Integer.valueOf(String.valueOf(row.get(2))) + ")";
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
Statement stmt = null;
try {
//可能有重复创建conn的问题,此处不做讨论
conn = DriverManager.getConnection("jdbc:mysql://spark1:3306/testdb", "", "");
stmt = conn.createStatement();
stmt.executeUpdate(sql);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
}
}
});
网友评论