美文网首页
代码提交Spark任务

代码提交Spark任务

作者: Jorvi | 来源:发表于2019-12-04 17:22 被阅读0次

    Spark以Standalone模式运行,其他模式未测试


    一、Spark统计任务

    1.1 jar
    hdfs:/home/mr/example/spark-example-1.0.jar
    
    1.2 main class
    org.learn.example.jobs.SparkJob
    
    public class SparkJob implements Serializable {
        public static void main(String[] args) {
            String fullClassName = args[0];
            String[] strArr = fullClassName.split("\\.");
            String appName = strArr[strArr.length - 1];
    
            SparkSession session = SparkSession.builder().appName("SparkJob_" + appName).getOrCreate();
    
            try {
                Class clazz = Class.forName(fullClassName);
                Method method = clazz.getDeclaredMethod("run", SparkSession.class);
                method.invoke(clazz.newInstance(), session);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    1.3 传入main class的args

    反射到统计任务

    org.learn.example.jobs.WordCount
    
    public class WordCount implements ISparkJob{
        @Override
        public void run(SparkSession session) {
            JavaRDD<String> javaRDD = session.createDataset(Arrays.asList("aaa bbb", "bbb ccc", "aaa"), Encoders.STRING()).javaRDD();
    
            javaRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    Iterator<String> iterator = Arrays.asList(line.split(" ")).iterator();
                    return iterator;
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<>(word, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }).saveAsTextFile("/home/example/result");
        }
    }
    

    二、提交上面的统计任务到Spark集群

    2.1 利用SparkSubmit.main提交
    String[] args = {
            "--master", "spark://11.11.11.11:6066,22.22.22.22:6066",
            "--deploy-mode", "cluster",
            "--executor-memory", "1G",
            "--total-executor-cores", "2",
            "--class", "org.learn.example.jobs.SparkJob",
            "hdfs:/home/mr/example/spark-example-1.0.jar",
            "org.learn.example.jobs.WordCount"
    };
    SparkSubmit.main(args);
    
    2.2 利用SparkLauncher提交
    try {
        SparkLauncher launcher = new SparkLauncher()
                .setSparkHome("/home/spark")
                .setConf("spark.driver.memory", "1G")
                .setConf("spark.executor.memory", "1G")
                .setConf("spark.executor.cores", "1")
                .setConf("spark.cores.max", "2")
                .setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
                .setDeployMode("cluster")
                .redirectOutput(new File("/home/example/logs/launch.log"))
                .setAppResource("hdfs:/home/mr/example/spark-example-1.0.jar")
                .setMainClass("org.learn.example.jobs.SparkJob")
                .addAppArgs("org.learn.example.jobs.WordCount");
    
        SparkAppHandle appHandle = launcher.startApplication(new SparkAppHandle.Listener() {
            @Override
            public void stateChanged(SparkAppHandle handle) {
            }
            @Override
            public void infoChanged(SparkAppHandle handle) {       
            }
        });
     } catch (Exception e) {
        e.printStackTrace();
     }
    
    2.3 利用RestSubmissionClient提交

    可获取提交结果

    try {
        String appResource = "hdfs:/home/mr/example/spark-example-1.0.jar";
        String mainClass = "org.learn.example.jobs.SparkJob";
        String[] args = {
                "org.learn.example.jobs.WordCount"
        };
        SparkConf sparkConf = new SparkConf()
                .setMaster("spark://11.11.11.11:6066,22.22.22.22:6066")
                .set("spark.executor.cores", "1")
                .set("spark.submit.deployMode", "cluster")
                .set("spark.executor.memory", "1G")
                .set("spark.cores.max", "2")
                .set("spark.app.name", ""); // 在后面的统计任务中设置
    
        // 注意: 这里是 scala.collection.immutable.HashMap
        CreateSubmissionResponse response = (CreateSubmissionResponse) RestSubmissionClient.run(appResource, mainClass,
                        args, sparkConf, new HashMap<String, String>());
    
        logger.info("======> response: " + response.toJson());
    } catch (Exception e) {
        e.printStackTrace();
    }
    

    相关文章

      网友评论

          本文标题:代码提交Spark任务

          本文链接:https://www.haomeiwen.com/subject/jqoaoqtx.html