美文网首页
spring batch 纯注解学习笔记(八)--多job运行与

spring batch 纯注解学习笔记(八)--多job运行与

作者: 小偷阿辉 | 来源:发表于2021-01-02 22:17 被阅读0次

前序文章陆续已经将spring batch所有组件模块介绍完毕,并一一演示了作用,本文将对前面业务做一个补充和优化工作。

1.多Job运行

对于业务复杂的应用,往往是多个Job同时存在的运行,其实处理非常简单,通过以下示例展示如果存在多个Job运行,这里为了直接凸显作用,部分代码没有公开:

@EnableBatchProcessing
@Slf4j
@Import(ScopeConfiguration.class)
public class BatchConfig {
    private static JobContext jobContext = JobContext.getInstance();


    @Bean("synchroJob")
    public Job ceateSynchroJob(@Qualifier("getDataToCacheStep") Step getDataToCacheStep,
                               @Qualifier("camDataListener") CamDataListener camDataListener,
                               @Qualifier("backupDataStep") Step backupDataStep,
                               @Qualifier("truncateDataStep") Step truncateDataStep,
                               @Qualifier("saveDataToDestDataBaseStep") Step saveDataToDestDataBaseStep,
                               @Qualifier("synchroIsFinishedDecider") JobExecutionDecider synchroIsFinishedDecider,
                               @Qualifier("saveDataIsFinishedDecider") JobExecutionDecider saveDataIsFinishedDecider) {
        return jobBuilderFactory.get("synchroJob")
                .incrementer(new RunIdIncrementer())
                .listener(camDataListener)
                .start(getDataToCacheStep)
                .next(synchroIsFinishedDecider)
                .on("GOON")
                .to(getDataToCacheStep)
                .from(synchroIsFinishedDecider)
                .on("COMPLETED")
                .to(backupDataStep)
                .next(truncateDataStep)
                .next(saveDataToDestDataBaseStep)
                .from(saveDataIsFinishedDecider)
                .on("GOON")
                .to(saveDataToDestDataBaseStep)
                .end()
                .build();
    }
}


and

@Configuration
@EnableBatchProcessing
@Slf4j
public class BackUpConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean("backUpJob")
    public Job ceateSynchroJob(@Qualifier("backupDataStep") Step backupDataStep, @Qualifier("backupDataListener") BackupDataListener backupDataListener) {
        return jobBuilderFactory.get("backupJob").incrementer(new RunIdIncrementer()).listener(backupDataListener).start(backupDataStep).build();
    }

    @Bean("backupDataStep")
    public Step backupDataStep(@Qualifier("backUpDataTasklet") Tasklet backUpDataTasklet) {
        return stepBuilderFactory.get("backupDataStep").tasklet(backUpDataTasklet).build();
    }

    @Bean("backUpDataTasklet")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public BackUpDataTasklet createBackUpDataTasklet() {
        return new BackUpDataTasklet();
    }
}

运行第一个任务

JobParametersBuilder jobParametersBuilder = new JobParametersBuilder().addString("keepLatest", "false").addDate("date", new Date()).addLong("currentTime", new Long(System.currentTimeMillis()));
            JobExecution result = jobLauncher.run(synchroJob,
                    jobParametersBuilder.toJobParameters());
            try {
                this.wait();
                ExitStatus exitStatus = result.getExitStatus();
                if (exitStatus.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
                    log.info("任务正常完成");
                    return true;
                } else {
                    log.error("任务失败,exitCode=" + exitStatus.getExitCode());
                    return false;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.error("任务失败,exitCode=" + e.getMessage());
                return false;
            }

运行第二个任务

try {
                JobContext.getInstance().setVersion(new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
                synchronized (this) {


                    JobExecution result = launcher.run(job,
                            new JobParametersBuilder().addString("keepLatest", "true").addDate("date", new Date()).addLong("currentTime", new Long(System.currentTimeMillis())).toJobParameters());
                        this.wait();
                        if(!result.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())){
                            return 100;
                        }
                }
            } catch (JobExecutionAlreadyRunningException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("有任务正在同步中,任务失败" + e.getMessage());
                return 2;
            } catch (JobRestartException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("任务重启失败" + e.getMessage());
                return 3;
            } catch (JobInstanceAlreadyCompleteException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("任务实例已完成");
                return 4;
            } catch (JobParametersInvalidException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("任务启动失败,参数异常" + e.getMessage());
                return 5;
            }catch (InterruptedException e) {
                e.printStackTrace();
                return 7;
            }

加锁的目的是为了防止多次启动造成参数重复

2.线程池优化

往往Job数量比较多时就需要多线程处理,当然我们可以自己在业务中使用线程池处理,但是Spring batch提供了可以配置线程池的参数,将参数注入启动器jobLauncher 中即可实现多线程运行,最大线程数量配置规则为:
如果任务是计算密集型的,线程池大小建议设置为Ncpu + 1
  其中N是CPU数量,

+1 是为了在某一个线程处于暂停阶段时,有新的线程可以用来执行,减少CPU中断时间。

如果是IO密集型,则需要增大线程数大小,避免IO操作占用过多的CPU时间

Nthreads = Ncpu x Ucpu x (1 + W/C),其中

Ncpu = CPU核心数

Ucpu = CPU使用率,0~1

W/C = 等待时间与计算时间的比率

通产而言对于单机器部署的单个服务:
IO密集型配置线程数经验值是:2N,其中N代表CPU核数。
CPU密集型配置线程数经验值是:N + 1,其中N代表CPU核数。

 //配置线程池执行job
    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(200);
        return taskExecutor;
    }

    @Bean("cssJobLauncher")
    public SimpleJobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, DataSource dataSource,
                                         PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }

相关文章

网友评论

      本文标题:spring batch 纯注解学习笔记(八)--多job运行与

      本文链接:https://www.haomeiwen.com/subject/kagloktx.html