该配置使用spring boot集成的纯注解方式,直接引入starter即可,涵盖了Spring Framework、Datasource以及Spring Batch。
1.job配置
Job接口有多种多样的实现类,通常我们使用configuration类来构建获取一个Job,如果想实现其他的job实现类来扩展业务可参考API官方文档:https://docs.spring.io/spring-batch/4.1.x/api/index.html
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob") //Job名称
.start(playerLoad()) //Job Step 随意取的名字,往后会讲如何定义
.next(gameLoad()) //Job Step 随意取的名字,往后会讲如何定义
.next(playerSummarization()) //Job Step 随意取的名字,往后会讲如何定义
.end()
.build();
}
上面的代码定义了一个非常简单的Job实例,并且在这个实例中包含了三个Step实例,可根据step的执行过程决定step的执行顺序
2.重启配置
批处理的一个核心问题是需要定义重启(启动)时的一些行为。当指定的JobInstance被JobExecution执行时候即认为某个Job已经重启(启动)。理想状态下,所有的任务都应该可以从它们之前中断的位置启动,但是某些情况下这样做是无法实现的。开发人员可以关闭重启机制或认为每次启动都是新的JobInstance:
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.preventRestart() //防止重启
...
.build();
}
3.监听Job执行状态
当任务执行完毕或开始执行时,需要执行一些处理工作如日志处理。这个时候可以使用实现JobExecutionListener接口方式,除此以外直接实现接口还可以用 @BeforeJob 和 @AfterJob 注解,需要注意的是afterJob方法无论批处理任务成功还是失败都会被执行,所以程序处理时需要判断任务执行状态:
@Component("backupDataListener")
@Slf4j
@SuppressWarnings("unchecked")
public class BackupDataListener implements JobExecutionListener{
private long startTime=0L;
@Autowired
BackupService backupService;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime=System.currentTimeMillis();
System.out.println("--- Job trigger start ---");
}
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())){
log.info("sucessed");
}else{
log.info("failed");
}
log.info("--- Job trigger end cost:{}ms ---",System.currentTimeMillis()-startTime);
}
}
4.Java配置
在Spring Batch 2.2.0版本之后(Spring 3.0+)支持纯Java配置。其核心是@EnableBatchProcessing注解和两个构造器。@EnableBatchProcessing的作用类似于Spring中的其他@Enable*,使用@EnableBatchProcessing之后会提供一个基本的配置用于执行批处理任务。对应的会有一系列StepScope实例被注入到Ioc容器中:JobRepository、JobLauncher、JobRegistry、PlatformTransactionManager、JobBuilderFactory以及StepBuilderFactory。
配置的核心接口是BatchConfigurer,默认情况下需要在容器中指定DataSource,该数据源用于JobRepository相关的表。开发的过程中可以使用自定义的BatchConfigurer实现来提供以上所有的Bean。通常情况下可以扩展重载DefaultBatchConfigurer类中的Getter方法用于实现部分自定义功能,Spring-batch使用的默认数据源是HSQL作为内存数据库,足以进行本地开发。但是实际生产环境中使用的都是多数据源的场景较多,后面会讲如何在Reader中切换数据源:
修改application.yml
spring:
#配置允许重构batch
main:
allow-bean-definition-overriding: true
#设置job不默认执行
batch:
job:
enabled: false
initializer:
enabled: false
基础配置
@Bean
public BatchConfigurer batchConfigurer() {
return new DefaultBatchConfigurer() {
@Override
public PlatformTransactionManager getTransactionManager() {
return new MyTransactionManager();
}
};
}
使用了@EnableBatchProcessing之后开发人员可以使用以下的方法来配置一个Job:
@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
return jobs.get("myJob").start(step1).next(step2).build();
}
@Bean
protected Step step1(ItemReader<Person> reader,
ItemProcessor<Person, Person> processor,
ItemWriter<Person> writer) {
return steps.get("step1")
.<Person, Person> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
protected Step step2(Tasklet tasklet) {
return steps.get("step2")
.tasklet(tasklet)
.build();
}
}
5.JobRepository配置
一旦使用了@EnableBatchProcessing 注解,JobRepository即会被注入到IoCs容器中并自动使用容器中的DataSource。JobRepository用于处理批处理表的CURD,整个Spring Batch的运行都会使用到它。除了使用容器中默认的DataSoruce以及其他组件,还可以在BatchConfigurer中进行配置:
@Bean
public JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
factory.setTablePrefix("BATCH_");
factory.setMaxVarCharLength(1000);
return factory.getObject();
}
在代码中可以看到,设置JobRepository需要DataSource和TransactionManager,如果没有指定将会使用容器中的默认配置
5.1.JobRepository的事务配置
默认情况下框架为JobRepository提供了默认PlatformTransactionManager事物管理。它用于确保批处理执行过程中的元数据正确的写入到指定数据源中。如果缺乏事物,那么框架产生元数据就无法和整个处理过程完全契合。
如下图,在BatchConfigurer中的setIsolationLevelForCreate方法中可以指定事物的隔离等级:
@Bean
public JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
return factory.getObject();
}
5.2.修改表名称
默认情况下,JobRepository管理的表都以BATCH_开头。需要时可以修改前缀:
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setTablePrefix("SYSTEM.TEST_"); //修改前缀
return factory.getObject();
}
5.3.内存级存储
Spring Batch支持将运行时的状态数据(元数据)仅保存在内存中。重载JobRepository不设置DataSource即可:
@Override
protected JobRepository createJobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(transactionManager);
return factory.getObject();
}
需要注意的是,内存级存储无法满足分布式系统。
6.JobLauncher
启用了@EnableBatchProcessing之后JobLauncher会自动注入到容器中以供使用。基于JobLauncher目前有以下实现类,此外可以自行进行配置
| Class | Description |
| CommandLineJobRunner |
Basic launcher for starting jobs from the command line.
|
| JobRegistryBackgroundJobRunner |
Command line launcher for registering jobs with a JobRegistry
.
|
| JvmSystemExiter |
Implementation of the SystemExiter
interface that calls the standards System.exit method.
|
| RunIdIncrementer |
This incrementer increments a "run.id" parameter of type Long
from the given job parameters.
|
| RuntimeExceptionTranslator | |
| ScheduledJobParametersFactory | |
| SimpleJobLauncher |
Simple implementation of the JobLauncher
interface.
|
| SimpleJobOperator |
Simple implementation of the JobOperator interface.
|
| SimpleJvmExitCodeMapper |
An implementation of ExitCodeMapper
that can be configured through a map from batch exit codes (String) to integer results.
|
这里使用SimpleJobLauncher 举例:
@Beaen
public JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
JobLauncher唯一的必要依赖只有JobRepository。如下图,Job的执行通常是一个同步过程:
job执行过程
可以通过修改TaskExecutor来指定Job的执行过程:
Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
这样执行过程变为:
job执行过程
6.运行一个Job
这里以Http为例
@Controller
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/jobLauncher.html")
public void handle() throws Exception{
jobLauncher.run(job, new JobParameters());
}
}
7.JobParametersIncrementer
值得注意的是,当第一次运行的时候是不会有问题的,但是多次调用后就会产生冲突,因为每个JobInstance是基于一个ID全局标识进行区分的,JobParametersIncrementer主要用于JobOperator接口的startNextInstance等方法启动job的情况下。同一个Job在batch启动后被多次调用的场合,startNextInstance方法将会非常有用,因为它将使用JobParametersIncrementer与Job绑定,创建一个新实例。因为JobParametersIncrementer有一个getNext方法,可以在此方法中为parameters添加一个自增的值,以区分不同的Job实例,当然,这个值在job的其他的地方并不会用到,仅仅是为了标示不同JobInstance。当然SpringBatch框架也为我们提供了一个JobParametersIncrementer的实现类RunIdIncrementer 。如此还需要对Job进行如下配置:
@Bean
public Job ceateJob(@Qualifier("backupDataStep") Step backupDataStep, @Qualifier("backupDataListener") BackupDataListener backupDataListener) {
return jobBuilderFactory.get("backupJob").incrementer(new RunIdIncrementer()).listener(backupDataListener).start(backupDataStep).build();
}
单单是配置好Job是肯定无法执行的,还需要对Step进行配置。后面会陆续介绍
网友评论