美文网首页
SparkSQL connect MySQL with JDBC

SparkSQL connect MySQL with JDBC

作者: RoyTien | 来源:发表于2021-07-22 00:11 被阅读0次

    Reference

    使用 Dataset 抽象数据

    Spark DataFrame has been removed in Java API (is Scala SPI it is just an alias) in Spark 2.0. We should replace it with Dataset<Row>.

    需要 SparkSession,可能也需要从 RDD 转换成 Dataset<Row>

    
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.JavaSparkContext;  
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.sql.types.StructField;
    
    SparkConf conf = new SparkConf().setMaster("local").setAppName("sample");
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
    
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())
    JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
        "1111111||asdf",
        "222222||zxcv"
    ));
    
    rdd = rdd.map(line -> {return "0||" + line;});
    
    List<StructField> structFields = new ArrayList();
    structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
    structFields.add(DataTypes.createStructField("time", DataTypes.IntegerType, true));
    structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
    StructType schema = DataTypes.createStructType(structFields);
    
    JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
        @Override
        public Row call(String record) throws Exception {
            String[] attributes = record.split("\\|\\|");
            return RowFactory.create(
                    Integer.parseInt(attributes[0]),
                    Integer.parseInt(attributes[1]),
                    attributes[2]
            );
        }
    });
    
    // Apply the schema to the RDD
    Dataset<Row> dataset = spark.createDataFrame(rowRDD, schema);
    

    SparkSQL 通过 JDBC 连接 MySQL 读写数据

    import java.util.Properties;
    
    Properties prop = new Properties();
    prop.put("user", "root");
    prop.put("password", "123456");
    prop.put("driver", "com.mysql.jdbc.Driver");
    
    dataset.write().mode("append").jdbc("jdbc:mysql://localhost:3306/DATABASE_NAME?useSSL=false", "TABLE_NAME", prop);
    

    通过 spark-submit 提交任务

    使用 spark-submit 提交任务到集群,必须通过 --driver-class-path 指定 MySQL 连接驱动 JAR 包。JAR 的版本需要和 MySQL 版本一致,可以通过 Maven 中央仓库查询。

    spark-submit \
    --name applicationName\
    --master yarn --deploy-mode cluster \
    --jars /xxx/libs/spark-sql_2.11-2.4.0.jar,/xxx/libs/mysql-connector-java-8.0.19.jar \
    --driver-class-path /xxx/libs/mysql-connector-java-8.0.19.jar \
    --class packageName.fileName.className \
    /xxx/yyy/ZZZ.jar arg1 arg2 
    

    相关文章

      网友评论

          本文标题:SparkSQL connect MySQL with JDBC

          本文链接:https://www.haomeiwen.com/subject/pwwsmltx.html