Spark以Standalone模式运行,其他模式未测试
一、Spark统计任务
1.1 jar
hdfs:/home/mr/example/spark-example-1.0.jar
1.2 main class
org.learn.example.jobs.SparkJob
public class SparkJob implements Serializable {
public static void main(String[] args) {
String fullClassName = args[0];
String[] strArr = fullClassName.split("\\.");
String appName = strArr[strArr.length - 1];
SparkSession session = SparkSession.builder().appName("SparkJob_" + appName).getOrCreate();
try {
Class clazz = Class.forName(fullClassName);
Method method = clazz.getDeclaredMethod("run", SparkSession.class);
method.invoke(clazz.newInstance(), session);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3 传入main class的args
反射到统计任务
org.learn.example.jobs.WordCount
public class WordCount implements ISparkJob{
@Override
public void run(SparkSession session) {
JavaRDD<String> javaRDD = session.createDataset(Arrays.asList("aaa bbb", "bbb ccc", "aaa"), Encoders.STRING()).javaRDD();
javaRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
Iterator<String> iterator = Arrays.asList(line.split(" ")).iterator();
return iterator;
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}).saveAsTextFile("/home/example/result");
}
}
二、提交上面的统计任务到Spark集群
2.1 利用SparkSubmit.main
提交
String[] args = {
"--master", "spark://11.11.11.11:6066,22.22.22.22:6066",
"--deploy-mode", "cluster",
"--executor-memory", "1G",
"--total-executor-cores", "2",
"--class", "org.learn.example.jobs.SparkJob",
"hdfs:/home/mr/example/spark-example-1.0.jar",
"org.learn.example.jobs.WordCount"
};
SparkSubmit.main(args);
2.2 利用SparkLauncher
提交
try {
SparkLauncher launcher = new SparkLauncher()
.setSparkHome("/home/spark")
.setConf("spark.driver.memory", "1G")
.setConf("spark.executor.memory", "1G")
.setConf("spark.executor.cores", "1")
.setConf("spark.cores.max", "2")
.setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
.setDeployMode("cluster")
.redirectOutput(new File("/home/example/logs/launch.log"))
.setAppResource("hdfs:/home/mr/example/spark-example-1.0.jar")
.setMainClass("org.learn.example.jobs.SparkJob")
.addAppArgs("org.learn.example.jobs.WordCount");
SparkAppHandle appHandle = launcher.startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
}
@Override
public void infoChanged(SparkAppHandle handle) {
}
});
} catch (Exception e) {
e.printStackTrace();
}
2.3 利用RestSubmissionClient
提交
可获取提交结果
try {
String appResource = "hdfs:/home/mr/example/spark-example-1.0.jar";
String mainClass = "org.learn.example.jobs.SparkJob";
String[] args = {
"org.learn.example.jobs.WordCount"
};
SparkConf sparkConf = new SparkConf()
.setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
.set("spark.executor.cores", "1")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.memory", "1G")
.set("spark.cores.max", "2")
.set("spark.app.name", ""); // 在后面的统计任务中设置
// 注意: 这里是 scala.collection.immutable.HashMap
CreateSubmissionResponse response = (CreateSubmissionResponse) RestSubmissionClient.run(appResource, mainClass,
args, sparkConf, new HashMap<String, String>());
logger.info("======> response: " + response.toJson());
} catch (Exception e) {
e.printStackTrace();
}
网友评论