美文网首页
Spark Sql 以JDBC为数据源

Spark Sql 以JDBC为数据源

作者: 吴国友 | 来源:发表于2019-02-11 18:02 被阅读1次

    实现代码

    1、准备工作:
     SparkConf conf = new SparkConf().setAppName("JDBCDataSource");//.setMaster("local");  
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
    
    2、从JDBC数据读取:
    Map<String, String> options = new HashMap<String, String>();    options.put("url", "jdbc:mysql://spark1:3306/testdb");
            
        //读取第一个表    options.put("dbtable", "student_infos");
    DataFrame studentInfosDF = sqlContext.read().format("jdbc")
                    .options(options).load();       //读取第二个表    
    options.put("dbtable", "student_scores");   
    DataFrame studentScoresDF = sqlContext.read().format("jdbc")        
                                                .options(options).load();
    
    3、 写入数据到JDBC
    studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
                privatestaticfinallong serialVersionUID = 1L;
                
                public void call(Row row) throws Exception {
                    // TODO Auto-generated method stub
                    String sql = "insert into good_student_infos values(" 
                            + "'" + String.valueOf(row.getString(0)) + "',"
                            + Integer.valueOf(String.valueOf(row.get(1))) + ","
                            + Integer.valueOf(String.valueOf(row.get(2))) + ")";   
                    
                    Class.forName("com.mysql.jdbc.Driver");         
                    
                    Connection conn = null;
                    Statement stmt = null;
                    try {
                        //可能有重复创建conn的问题,此处不做讨论   
                        conn = DriverManager.getConnection("jdbc:mysql://spark1:3306/testdb", "", "");     
                        stmt = conn.createStatement();
                        stmt.executeUpdate(sql);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if(stmt != null) {
                            stmt.close();
                        } 
                        if(conn != null) {
                            conn.close();
                        }
                    }
                }
            });
    

    相关文章

      网友评论

          本文标题:Spark Sql 以JDBC为数据源

          本文链接:https://www.haomeiwen.com/subject/goaqeqtx.html