- spring batch 纯注解学习笔记(八)--多job运行与
- spring batch 纯注解学习笔记(二)--Job配置与运
- spring batch 纯注解学习笔记(三)--Step
- spring batch 纯注解学习笔记(五)--文件读写
- spring batch 纯注解学习笔记(七)--异常处理与容错
- spring batch 纯注解学习笔记(四)--Item概念及
- 《spring batch 批处理框架》第3、4、5章
- Spring学习笔记(八、Spring对AspectJ的支持)
- spring batch 纯注解学习笔记(一)--数据批处理概念
- spring batch 纯注解学习笔记(六)--数据库批量读写
前序文章陆续已经将spring batch所有组件模块介绍完毕,并一一演示了作用,本文将对前面业务做一个补充和优化工作。
1.多Job运行
对于业务复杂的应用,往往是多个Job同时存在的运行,其实处理非常简单,通过以下示例展示如果存在多个Job运行,这里为了直接凸显作用,部分代码没有公开:
@EnableBatchProcessing
@Slf4j
@Import(ScopeConfiguration.class)
public class BatchConfig {
private static JobContext jobContext = JobContext.getInstance();
@Bean("synchroJob")
public Job ceateSynchroJob(@Qualifier("getDataToCacheStep") Step getDataToCacheStep,
@Qualifier("camDataListener") CamDataListener camDataListener,
@Qualifier("backupDataStep") Step backupDataStep,
@Qualifier("truncateDataStep") Step truncateDataStep,
@Qualifier("saveDataToDestDataBaseStep") Step saveDataToDestDataBaseStep,
@Qualifier("synchroIsFinishedDecider") JobExecutionDecider synchroIsFinishedDecider,
@Qualifier("saveDataIsFinishedDecider") JobExecutionDecider saveDataIsFinishedDecider) {
return jobBuilderFactory.get("synchroJob")
.incrementer(new RunIdIncrementer())
.listener(camDataListener)
.start(getDataToCacheStep)
.next(synchroIsFinishedDecider)
.on("GOON")
.to(getDataToCacheStep)
.from(synchroIsFinishedDecider)
.on("COMPLETED")
.to(backupDataStep)
.next(truncateDataStep)
.next(saveDataToDestDataBaseStep)
.from(saveDataIsFinishedDecider)
.on("GOON")
.to(saveDataToDestDataBaseStep)
.end()
.build();
}
}
and
@Configuration
@EnableBatchProcessing
@Slf4j
public class BackUpConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean("backUpJob")
public Job ceateSynchroJob(@Qualifier("backupDataStep") Step backupDataStep, @Qualifier("backupDataListener") BackupDataListener backupDataListener) {
return jobBuilderFactory.get("backupJob").incrementer(new RunIdIncrementer()).listener(backupDataListener).start(backupDataStep).build();
}
@Bean("backupDataStep")
public Step backupDataStep(@Qualifier("backUpDataTasklet") Tasklet backUpDataTasklet) {
return stepBuilderFactory.get("backupDataStep").tasklet(backUpDataTasklet).build();
}
@Bean("backUpDataTasklet")
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public BackUpDataTasklet createBackUpDataTasklet() {
return new BackUpDataTasklet();
}
}
运行第一个任务
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder().addString("keepLatest", "false").addDate("date", new Date()).addLong("currentTime", new Long(System.currentTimeMillis()));
JobExecution result = jobLauncher.run(synchroJob,
jobParametersBuilder.toJobParameters());
try {
this.wait();
ExitStatus exitStatus = result.getExitStatus();
if (exitStatus.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
log.info("任务正常完成");
return true;
} else {
log.error("任务失败,exitCode=" + exitStatus.getExitCode());
return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.error("任务失败,exitCode=" + e.getMessage());
return false;
}
运行第二个任务
try {
JobContext.getInstance().setVersion(new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
synchronized (this) {
JobExecution result = launcher.run(job,
new JobParametersBuilder().addString("keepLatest", "true").addDate("date", new Date()).addLong("currentTime", new Long(System.currentTimeMillis())).toJobParameters());
this.wait();
if(!result.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())){
return 100;
}
}
} catch (JobExecutionAlreadyRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.info("有任务正在同步中,任务失败" + e.getMessage());
return 2;
} catch (JobRestartException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.info("任务重启失败" + e.getMessage());
return 3;
} catch (JobInstanceAlreadyCompleteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.info("任务实例已完成");
return 4;
} catch (JobParametersInvalidException e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.info("任务启动失败,参数异常" + e.getMessage());
return 5;
}catch (InterruptedException e) {
e.printStackTrace();
return 7;
}
加锁的目的是为了防止多次启动造成参数重复
2.线程池优化
往往Job数量比较多时就需要多线程处理,当然我们可以自己在业务中使用线程池处理,但是Spring batch提供了可以配置线程池的参数,将参数注入启动器jobLauncher 中即可实现多线程运行,最大线程数量配置规则为:
如果任务是计算密集型的,线程池大小建议设置为Ncpu + 1
其中N是CPU数量,
+1 是为了在某一个线程处于暂停阶段时,有新的线程可以用来执行,减少CPU中断时间。
如果是IO密集型,则需要增大线程数大小,避免IO操作占用过多的CPU时间
Nthreads = Ncpu x Ucpu x (1 + W/C),其中
Ncpu = CPU核心数
Ucpu = CPU使用率,0~1
W/C = 等待时间与计算时间的比率
通产而言对于单机器部署的单个服务:
IO密集型配置线程数经验值是:2N,其中N代表CPU核数。
CPU密集型配置线程数经验值是:N + 1,其中N代表CPU核数。
//配置线程池执行job
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(200);
return taskExecutor;
}
@Bean("cssJobLauncher")
public SimpleJobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, DataSource dataSource,
PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}
网友评论