美文网首页
spring batch 纯注解学习笔记(二)--Job配置与运

spring batch 纯注解学习笔记(二)--Job配置与运

作者: 小偷阿辉 | 来源:发表于2020-12-30 15:13 被阅读0次

    该配置使用spring boot集成的纯注解方式,直接引入starter即可,涵盖了Spring Framework、Datasource以及Spring Batch。

    1.job配置

    Job接口有多种多样的实现类,通常我们使用configuration类来构建获取一个Job,如果想实现其他的job实现类来扩展业务可参考API官方文档:https://docs.spring.io/spring-batch/4.1.x/api/index.html

    @Autowired
        public JobBuilderFactory jobBuilderFactory;
     @Bean
    public Job footballJob() {
        return this.jobBuilderFactory.get("footballJob") //Job名称
                         .start(playerLoad()) //Job Step 随意取的名字,往后会讲如何定义
                         .next(gameLoad()) //Job Step 随意取的名字,往后会讲如何定义
                         .next(playerSummarization()) //Job Step 随意取的名字,往后会讲如何定义
                         .end()
                         .build();
    }
    
    

    上面的代码定义了一个非常简单的Job实例,并且在这个实例中包含了三个Step实例,可根据step的执行过程决定step的执行顺序

    2.重启配置

    批处理的一个核心问题是需要定义重启(启动)时的一些行为。当指定的JobInstance被JobExecution执行时候即认为某个Job已经重启(启动)。理想状态下,所有的任务都应该可以从它们之前中断的位置启动,但是某些情况下这样做是无法实现的。开发人员可以关闭重启机制或认为每次启动都是新的JobInstance:

    @Autowired
        public JobBuilderFactory jobBuilderFactory;
    @Bean
    public Job footballJob() {
        return this.jobBuilderFactory.get("footballJob")
                         .preventRestart() //防止重启
                         ...
                         .build();
    }
    

    3.监听Job执行状态

    当任务执行完毕或开始执行时,需要执行一些处理工作如日志处理。这个时候可以使用实现JobExecutionListener接口方式,除此以外直接实现接口还可以用 @BeforeJob 和 @AfterJob 注解,需要注意的是afterJob方法无论批处理任务成功还是失败都会被执行,所以程序处理时需要判断任务执行状态:

    @Component("backupDataListener")
    @Slf4j
    @SuppressWarnings("unchecked")
    public class BackupDataListener implements JobExecutionListener{
        private long startTime=0L;
        @Autowired
        BackupService backupService;
        @Override
        public void beforeJob(JobExecution jobExecution) {
            
            startTime=System.currentTimeMillis();
            System.out.println("--- Job trigger start ---");
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            if(jobExecution.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())){
                log.info("sucessed");
            }else{
                log.info("failed");
            }
            log.info("--- Job trigger end cost:{}ms ---",System.currentTimeMillis()-startTime);
        }
    
    }
    

    4.Java配置

    在Spring Batch 2.2.0版本之后(Spring 3.0+)支持纯Java配置。其核心是@EnableBatchProcessing注解和两个构造器。@EnableBatchProcessing的作用类似于Spring中的其他@Enable*,使用@EnableBatchProcessing之后会提供一个基本的配置用于执行批处理任务。对应的会有一系列StepScope实例被注入到Ioc容器中:JobRepository、JobLauncher、JobRegistry、PlatformTransactionManager、JobBuilderFactory以及StepBuilderFactory。

    配置的核心接口是BatchConfigurer,默认情况下需要在容器中指定DataSource,该数据源用于JobRepository相关的表。开发的过程中可以使用自定义的BatchConfigurer实现来提供以上所有的Bean。通常情况下可以扩展重载DefaultBatchConfigurer类中的Getter方法用于实现部分自定义功能,Spring-batch使用的默认数据源是HSQL作为内存数据库,足以进行本地开发。但是实际生产环境中使用的都是多数据源的场景较多,后面会讲如何在Reader中切换数据源:

    修改application.yml

    spring:
      #配置允许重构batch
      main:
        allow-bean-definition-overriding: true
      #设置job不默认执行
      batch:
        job:
          enabled: false
        initializer:
          enabled: false
    

    基础配置

    
    @Bean
    public BatchConfigurer batchConfigurer() {
        return new DefaultBatchConfigurer() {
            @Override
            public PlatformTransactionManager getTransactionManager() {
                return new MyTransactionManager();
            }
        };
    }
    
    
    

    使用了@EnableBatchProcessing之后开发人员可以使用以下的方法来配置一个Job:

    @Configuration
    @EnableBatchProcessing
    @Import(DataSourceConfiguration.class)
    public class AppConfig {
    
       @Autowired
       private JobBuilderFactory jobs;
    
       @Autowired
       private StepBuilderFactory steps;
    
       @Bean
       public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
           return jobs.get("myJob").start(step1).next(step2).build();
       }
    
       @Bean
       protected Step step1(ItemReader<Person> reader,
                            ItemProcessor<Person, Person> processor,
                            ItemWriter<Person> writer) {
           return steps.get("step1")
               .<Person, Person> chunk(10)
               .reader(reader)
               .processor(processor)
               .writer(writer)
               .build();
       }
    
       @Bean
       protected Step step2(Tasklet tasklet) {
           return steps.get("step2")
               .tasklet(tasklet)
               .build();
       }
    }
    
    

    5.JobRepository配置

    一旦使用了@EnableBatchProcessing 注解,JobRepository即会被注入到IoCs容器中并自动使用容器中的DataSource。JobRepository用于处理批处理表的CURD,整个Spring Batch的运行都会使用到它。除了使用容器中默认的DataSoruce以及其他组件,还可以在BatchConfigurer中进行配置:

    @Bean
    public JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(transactionManager);
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        factory.setTablePrefix("BATCH_");
        factory.setMaxVarCharLength(1000);
        return factory.getObject();
    }
    
    

    在代码中可以看到,设置JobRepository需要DataSource和TransactionManager,如果没有指定将会使用容器中的默认配置

    5.1.JobRepository的事务配置

    默认情况下框架为JobRepository提供了默认PlatformTransactionManager事物管理。它用于确保批处理执行过程中的元数据正确的写入到指定数据源中。如果缺乏事物,那么框架产生元数据就无法和整个处理过程完全契合。

    如下图,在BatchConfigurer中的setIsolationLevelForCreate方法中可以指定事物的隔离等级:

    @Bean
    public  JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(transactionManager);
        factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
        return factory.getObject();
    }
    
    

    5.2.修改表名称

    默认情况下,JobRepository管理的表都以BATCH_开头。需要时可以修改前缀:

    // This would reside in your BatchConfigurer implementation
    @Override
    protected JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(transactionManager);
        factory.setTablePrefix("SYSTEM.TEST_"); //修改前缀
        return factory.getObject();
    }
    

    5.3.内存级存储

    Spring Batch支持将运行时的状态数据(元数据)仅保存在内存中。重载JobRepository不设置DataSource即可:

    @Override
    protected JobRepository createJobRepository() throws Exception {
        MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
        factory.setTransactionManager(transactionManager);
        return factory.getObject();
    }
    

    需要注意的是,内存级存储无法满足分布式系统。

    6.JobLauncher

    启用了@EnableBatchProcessing之后JobLauncher会自动注入到容器中以供使用。基于JobLauncher目前有以下实现类,此外可以自行进行配置
    | Class | Description |
    | CommandLineJobRunner |

    Basic launcher for starting jobs from the command line.

    |
    | JobRegistryBackgroundJobRunner |

    Command line launcher for registering jobs with a JobRegistry.

    |
    | JvmSystemExiter |

    Implementation of the SystemExiter interface that calls the standards System.exit method.

    |
    | RunIdIncrementer |

    This incrementer increments a "run.id" parameter of type Long from the given job parameters.

    |
    | RuntimeExceptionTranslator | |
    | ScheduledJobParametersFactory | |
    | SimpleJobLauncher |

    Simple implementation of the JobLauncher interface.

    |
    | SimpleJobOperator |

    Simple implementation of the JobOperator interface.

    |
    | SimpleJvmExitCodeMapper |

    An implementation of ExitCodeMapper that can be configured through a map from batch exit codes (String) to integer results.

    |
    这里使用SimpleJobLauncher 举例:

    @Beaen
    public JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
    
    

    JobLauncher唯一的必要依赖只有JobRepository。如下图,Job的执行通常是一个同步过程:


    job执行过程

    可以通过修改TaskExecutor来指定Job的执行过程:

    Bean
    public JobLauncher jobLauncher() {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository());
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); //转换为异步任务
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
    

    这样执行过程变为:


    job执行过程

    6.运行一个Job

    这里以Http为例

    @Controller
    public class JobLauncherController {
    
        @Autowired
        JobLauncher jobLauncher;
    
        @Autowired
        Job job;
    
        @RequestMapping("/jobLauncher.html")
        public void handle() throws Exception{
            jobLauncher.run(job, new JobParameters());
        }
    }
    
    

    7.JobParametersIncrementer

    值得注意的是,当第一次运行的时候是不会有问题的,但是多次调用后就会产生冲突,因为每个JobInstance是基于一个ID全局标识进行区分的,JobParametersIncrementer主要用于JobOperator接口的startNextInstance等方法启动job的情况下。同一个Job在batch启动后被多次调用的场合,startNextInstance方法将会非常有用,因为它将使用JobParametersIncrementer与Job绑定,创建一个新实例。因为JobParametersIncrementer有一个getNext方法,可以在此方法中为parameters添加一个自增的值,以区分不同的Job实例,当然,这个值在job的其他的地方并不会用到,仅仅是为了标示不同JobInstance。当然SpringBatch框架也为我们提供了一个JobParametersIncrementer的实现类RunIdIncrementer 。如此还需要对Job进行如下配置:

        @Bean
        public Job ceateJob(@Qualifier("backupDataStep") Step backupDataStep, @Qualifier("backupDataListener") BackupDataListener backupDataListener) {
            return jobBuilderFactory.get("backupJob").incrementer(new RunIdIncrementer()).listener(backupDataListener).start(backupDataStep).build();
        }
    

    单单是配置好Job是肯定无法执行的,还需要对Step进行配置。后面会陆续介绍

    相关文章

      网友评论

          本文标题:spring batch 纯注解学习笔记(二)--Job配置与运

          本文链接:https://www.haomeiwen.com/subject/atabpctx.html