Reference
使用 Dataset 抽象数据
Spark DataFrame
has been removed in Java API (is Scala SPI it is just an alias) in Spark 2.0. We should replace it with Dataset<Row>
.
需要 SparkSession
,可能也需要从 RDD
转换成 Dataset<Row>
。
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
SparkConf conf = new SparkConf().setMaster("local").setAppName("sample");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
"1111111||asdf",
"222222||zxcv"
));
rdd = rdd.map(line -> {return "0||" + line;});
List<StructField> structFields = new ArrayList();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("time", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(structFields);
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] attributes = record.split("\\|\\|");
return RowFactory.create(
Integer.parseInt(attributes[0]),
Integer.parseInt(attributes[1]),
attributes[2]
);
}
});
// Apply the schema to the RDD
Dataset<Row> dataset = spark.createDataFrame(rowRDD, schema);
SparkSQL 通过 JDBC 连接 MySQL 读写数据
import java.util.Properties;
Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "123456");
prop.put("driver", "com.mysql.jdbc.Driver");
dataset.write().mode("append").jdbc("jdbc:mysql://localhost:3306/DATABASE_NAME?useSSL=false", "TABLE_NAME", prop);
通过 spark-submit 提交任务
使用 spark-submit
提交任务到集群,必须通过 --driver-class-path
指定 MySQL 连接驱动 JAR 包。JAR 的版本需要和 MySQL 版本一致,可以通过 Maven 中央仓库查询。
spark-submit \
--name applicationName\
--master yarn --deploy-mode cluster \
--jars /xxx/libs/spark-sql_2.11-2.4.0.jar,/xxx/libs/mysql-connector-java-8.0.19.jar \
--driver-class-path /xxx/libs/mysql-connector-java-8.0.19.jar \
--class packageName.fileName.className \
/xxx/yyy/ZZZ.jar arg1 arg2
网友评论