美文网首页
使用java代码定时提交spark任务

使用java代码定时提交spark任务

作者: zxydut | 来源:发表于2018-10-22 14:01 被阅读0次

在spark做批处理时,有时需要定时提交spark任务,可以采用写shell脚本和java代码两种方式。本文介绍使用java代码的方式。

  • 定时器
@Scheduled(cron = "0 20 0 * * *")
    public void tnvDSumSchd(){
        try{
            List<String> taskType_files_list = new ArrayList<>();
            taskType_files_list.add("TESTIN_VIDEO_D_SUMMAY");
            String legalDir = tools.getLegalDir(testinHdfsDir);
            taskType_files_list.addAll(tools.fileLists(legalDir, new TestInFilterPath(FileType.VIDEO.getType() + getDay())));

            String[] sparkParam = new String[4];
            sparkParam[0] = "1g";
            sparkParam[1] = "2g";
            sparkParam[2] = "2";
            sparkParam[3] = "1";
            if (taskType_files_list.size() > 1){
                submitLancher.submit(taskType_files_list,sparkParam,"TESTIN_VIDEO_D_SUMMAY");
            }else {
                log.info("{} does not have any testin video file",legalDir);
            }
        } catch (InterruptedException e) {
            log.error(e.getMessage(),e);
        } catch (IOException e) {
            log.error(e.getMessage(),e);
        }
    }
  • 提交spark任务的类
@Component
@Slf4j
public class SubmitLancher {
    @Value("${spark.home}")
    private String sparkHome;//sparkhome的路径

    @Value("${spark.submit.appResource}")
    private String submitAppResource;//提交到spark上的jar包

    public void submit(List<String> taskType_files_list, String[] sparkParam,String appName) throws IOException, InterruptedException {
        long start = System.currentTimeMillis();
        String[] taskType_files = taskType_files_list.toArray(new String[taskType_files_list.size()]);

        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new SparkLauncher()
                .setSparkHome(sparkHome)
                .setMaster("yarn")
                .setAppName(appName)
                .setConf("spark.driver.memory", sparkParam[0])
                .setConf("spark.executor.memory", sparkParam[1])
                .setConf("spark.executor.cores", sparkParam[2])
                .setConf("spark.driver.cores",sparkParam[3])
                .setAppResource(submitAppResource)
                .setMainClass("submit.SubmitMain")//程序主入口
                .setDeployMode("cluster")
                .addAppArgs(taskType_files)
                .startApplication(new SparkAppHandle.Listener(){
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        if (handle.getState().isFinal()){
                            countDownLatch.countDown();
                        }
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                    }
                });
        countDownLatch.await();
        long end = System.currentTimeMillis();
        log.info("{} summary finished,and used time:{} ms",taskType_files_list.get(0),end - start);
    }
}
  • 注意
  1. countDownLatch.await(); 这行代码会一直阻塞代码,直到countDownLatch 的值减到0结束阻塞。这样做是为了防止spark任务在exector上未运行结束,但是driver代码已经停止导致的任务异常结束。
  2. 使用java代码调度时尽量不用使用springboot框架,因为spark任务是从master上发送任务到executor上,在executor上不能创建spring容器。
  3. 由于spark依赖都是使用线上的spark环境,所以在启动程序时需要加上 SparkLauncher 类,因此需要将spark-launcher_2.10-1.6.2.jar 一起作为依赖包提交执行。

相关文章

  • 使用java代码定时提交spark任务

    在spark做批处理时,有时需要定时提交spark任务,可以采用写shell脚本和java代码两种方式。本文介绍使...

  • spark wordcount

    wordcount java编写spark执行 maven pom 项目结构 java代码 bash提交任务到Spark

  • 代码提交Spark任务

    Spark以Standalone模式运行,其他模式未测试 一、Spark统计任务 1.1 jar 1.2 main...

  • Spark jar包问题

    通常我们将spark任务编写后打包成jar包,使用spark-submit进行提交,因为spark是分布式任务,如...

  • 定时任务ScheduledThreadPoolExecutor

    定时任务示例代码: 先提交的任务500ms再执行,后提交的任务100ms后执行,打印结果是后提交的任务先执行,本文...

  • Spark作业基本运行原理

    当我们使用spark-submit提交一个Spark任务后,这个任务就会启动一个对应的Driver进程,然后根据你...

  • Spark学习笔记4

    任务的提交以及Standalone集群模式的部署 spark-submit 首先需要打包代码,如果你的代码需要依赖...

  • crontab 定时写法整理

    前言 目前 Hadoop 、Hive 及 Spark 脚本需要用定时脚本,任务调度使用的是 crontab 的任务...

  • Spark1.3.1源码分析 Spark job 提交流程

    spark提交 例如WordCount代码 提交脚本 所以,想要分析程序提交的流程,必须从spark-submit...

  • Spark-submit执行流程,了解一下

    摘要:本文主要是通过Spark代码走读来了解spark-submit的流程。 1.任务命令提交 我们在进行Spar...

网友评论

      本文标题:使用java代码定时提交spark任务

      本文链接:https://www.haomeiwen.com/subject/uqcgzftx.html