美文网首页
SparkSQL connect MySQL with JDBC

SparkSQL connect MySQL with JDBC

作者: RoyTien | 来源:发表于2021-07-22 00:11 被阅读0次

Reference

使用 Dataset 抽象数据

Spark DataFrame has been removed in Java API (is Scala SPI it is just an alias) in Spark 2.0. We should replace it with Dataset<Row>.

需要 SparkSession,可能也需要从 RDD 转换成 Dataset<Row>


import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;

SparkConf conf = new SparkConf().setMaster("local").setAppName("sample");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
    "1111111||asdf",
    "222222||zxcv"
));

rdd = rdd.map(line -> {return "0||" + line;});

List<StructField> structFields = new ArrayList();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("time", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(structFields);

JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
    @Override
    public Row call(String record) throws Exception {
        String[] attributes = record.split("\\|\\|");
        return RowFactory.create(
                Integer.parseInt(attributes[0]),
                Integer.parseInt(attributes[1]),
                attributes[2]
        );
    }
});

// Apply the schema to the RDD
Dataset<Row> dataset = spark.createDataFrame(rowRDD, schema);

SparkSQL 通过 JDBC 连接 MySQL 读写数据

import java.util.Properties;

Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "123456");
prop.put("driver", "com.mysql.jdbc.Driver");

dataset.write().mode("append").jdbc("jdbc:mysql://localhost:3306/DATABASE_NAME?useSSL=false", "TABLE_NAME", prop);

通过 spark-submit 提交任务

使用 spark-submit 提交任务到集群,必须通过 --driver-class-path 指定 MySQL 连接驱动 JAR 包。JAR 的版本需要和 MySQL 版本一致,可以通过 Maven 中央仓库查询。

spark-submit \
--name applicationName\
--master yarn --deploy-mode cluster \
--jars /xxx/libs/spark-sql_2.11-2.4.0.jar,/xxx/libs/mysql-connector-java-8.0.19.jar \
--driver-class-path /xxx/libs/mysql-connector-java-8.0.19.jar \
--class packageName.fileName.className \
/xxx/yyy/ZZZ.jar arg1 arg2 

相关文章

网友评论

      本文标题:SparkSQL connect MySQL with JDBC

      本文链接:https://www.haomeiwen.com/subject/pwwsmltx.html