spring batch

作者: elijah777 | 来源:发表于2019-11-22 10:12 被阅读0次

    spring batch

    介绍

    批处理的核心场景

    • 从某个位置读取大量的记录,位置可以是数据库、文件或者外部推送队列(MQ)。

    • 根据业务需要实时处理读取的数据。

    • 将处理后的数据写入某个位置,可以是数据库、文件或者推送到队列。

    Spring Batch能解决的批处理场景

    Spring Batch为批处理提供了一个轻量化的解决方案,它根据批处理的需要迭代处理各种记录,提供事物功能。但是Spring Batch仅仅适用于"脱机"场景,在处理的过程中不能和外部进行任何交互,也不允许有任何输入。

    Spring Batch的目标

    • 开发人员仅关注业务逻辑,底层框架的交互交由Spring Batch去处理。

    • 能够清晰分离业务与框架,框架已经限定了批处理的业务切入点,业务开发只需关注这些切入点(Read、Process、Write)。

    • 提供开箱即用的通用接口。

    • 快速轻松的融入Spring 框架,基于Spring Framework能够快速扩展各种功能。

    • 所有现有核心服务都应易于更换或扩展,而不会对基础架构层产生任何影响。

    (Infrastructure)部分主要分为3个部分。JobLauncherJob以及Step。每一个Step又细分为ItemReaderItemProcessorItemWirte。使用Spring Batch主要就是知道每一个基础设置负责的内容,然后在对应的设施中实现对应的业务。

    快速编码

    可以实现功能

    通过CSV文件导入数据到数据库中

    配置文件
     <!-- spring batch 会自动加载hsqldb驱动  -->
     <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-batch</artifactId>
     <exclusions>
     <exclusion>
     <groupId>org.hsqldb</groupId>
     <artifactId>hsqldb</artifactId>
     </exclusion>
     </exclusions>
     </dependency>
    ​
     <dependency>
     <groupId>org.hibernate</groupId>
     <artifactId>hibernate-validator</artifactId>
     <version>5.2.4.Final</version>
     </dependency>
    
    spring:
     batch:
     # 开启初始化数据库
     initialize-schema: always
     # 关闭自动执行
     job:
     enabled: false
    
    Person.java 与数据库映射
    @Data
    @Entity
    @AllArgsConstructor
    @NoArgsConstructor
    public class Person {
    ​
     @Id
     @GeneratedValue
     private Long id;
     @Size(max = 12, min = 2)
     private String name;
     private Integer age;
     private String address;
     private String nation;
    }
    
    SQL脚本
    # spring batch
    create table PERSON (ID NUMBER NOT null primary key, name varchar2(20), age number, nation varchar2(20), address varchar2(20) );
    create sequence person_seq  minvalue 2000  maxvalue 9999999999  start with 2020  increment by 1  cache 20;
    

    要导入的数据

    汪某某,11,汉族,合肥
    张某某,12,汉族,上海
    李某某,13,非汉族,武汉
    刘某,14,非汉族,南京
    欧阳某某,115,汉族,北京
    
    核心配置类 TriggerBatchConfig
    ​
    import com.ch9.batch.CsvBeanValidator;
    import com.ch9.batch.CsvItemProcessor;
    import com.ch9.batch.CsvJobListener;
    import com.ch9.domain.Person;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.launch.support.SimpleJobLauncher;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.batch.item.validator.Validator;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.PathResource;
    import org.springframework.transaction.PlatformTransactionManager;
    ​
    import javax.sql.DataSource;
    ​
    /**
     * @description: 核心配置类
     *
     * @author: ShenShuaihu
     * @version: 1.0
     * @data: 2019-11-05 22:22
     */
    @Slf4j
    @Configuration
    @EnableBatchProcessing
    public class TriggerBatchConfig {
    ​
     @Bean
     @StepScope
     public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception{
    ​
     FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
     reader.setResource(new PathResource(pathToFile));
     reader.setLineMapper(new DefaultLineMapper<Person>() {
     {
     setLineTokenizer(new DelimitedLineTokenizer() {
     {
     setNames(new String[] {"name", "age", "nation", "address"});
     }
     });
     setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
     {
     setTargetType(Person.class);
     }
     });
     }
     });
     return reader;
     }
    
     @Bean
     public ItemProcessor<Person, Person> processor() {
     CsvItemProcessor processor = new CsvItemProcessor();
     processor.setValidator(csvBeanValidator());
     return processor;
     }
    ​
     @Bean
     public ItemWriter<Person> writer(DataSource dataSource) {
     JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
     writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
     String sql = "insert into person " + "(id, name, age, nation, address)"
     + "values(hibernate_sequence.nextval, :name, :age, :nation, :address)";
     writer.setSql(sql);
     writer.setDataSource(dataSource);
     return writer;
     }
    ​
     @Bean
     public JobRepository jobRepository1(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
     JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
     jobRepositoryFactoryBean.setDataSource(dataSource);
     jobRepositoryFactoryBean.setTransactionManager(transactionManager);
     jobRepositoryFactoryBean.setDatabaseType("oracle");
     return jobRepositoryFactoryBean.getObject();
     }
    ​
     /**
     * JobListener定义
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
     @Bean
     public SimpleJobLauncher jobLauncher1(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
     SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
     jobLauncher.setJobRepository(jobRepository1(dataSource, transactionManager));
     return jobLauncher;
     }
    ​
     /**
     * JOB定义
     *
     * @param jobs
     * @param s1
     * @return
     */
     @Bean
     public Job importJob(JobBuilderFactory jobs, Step s1) {
     // 为job指定step 绑定监听器csvJobListener
     return jobs.get("importJob")
     .incrementer(new RunIdIncrementer())
     .flow(s1)
     .end()
     .listener(csvJobListener())
     .build();
     }
    ​
     /**
     * Step 定义
     * 批量处理数据  每次提交6500数据
     *
     * @param stepBuilderFactory
     * @param reader
     * @param writer
     * @param processor
     * @return
     */
     @Bean
     public Step step(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
     // 给step绑定reader、processor和writer
     return stepBuilderFactory
     .get("step")
     .<Person, Person>chunk(6500)
     .reader(reader)
     .processor(processor)
     .writer(writer)
     .build();
     }
    ​
     @Bean
     public CsvJobListener csvJobListener() {
     return new CsvJobListener();
     }
    ​
     @Bean
     public Validator<Person> csvBeanValidator() {
     return new CsvBeanValidator<Person>();
     }
    }
    
    数据校验类 CsvBeanValidator
    import org.springframework.batch.item.validator.ValidationException;
    import org.springframework.batch.item.validator.Validator;
    import org.springframework.beans.factory.InitializingBean;
    ​
    import javax.validation.ConstraintViolation;
    import javax.validation.Validation;
    import javax.validation.ValidatorFactory;
    import java.util.Set;
    ​
    /**
     * @description:  数据校验类 做基础的数据校验
     *
     * @author: ShenShuaihu
     * @version: 1.0
     * @data: 2019-11-05 21:49
     */
    public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
    ​
     private javax.validation.Validator validator;
    ​
    ​
     @Override
     public void afterPropertiesSet() throws Exception {
    ​
     ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
     validator = validatorFactory.usingContext().getValidator();
     }
     @Override
     public void validate(T value) throws ValidationException {
    ​
     // 使用 Validator的validate方法校验
     Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);
    ​
     if (constraintViolations.size() > 0) {
     StringBuffer message = new StringBuffer();
     for (ConstraintViolation<T> constraintViolation : constraintViolations) {
     message.append(constraintViolation.getMessage() + "\n");
     }
     throw new ValidationException(message.toString());
     }
     }
    ​
    }
    
    数据处理类CsvItemProcessor
    import com.ch9.domain.Person;
    import org.springframework.batch.item.validator.ValidatingItemProcessor;
    import org.springframework.batch.item.validator.ValidationException;
    ​
    /**
     * @description: 数据处理  对传入是数据进行处理格式变更等
     *
     * @author: ShenShuaihu
     * @version: 1.0
     * @data: 2019-10-20 22:23
     */
    public class CsvItemProcessor extends ValidatingItemProcessor<Person> {
    ​
     @Override
     public Person process(Person item) throws ValidationException {
    ​
     // 执行之后才会调用自定义检验器
     super.process(item);
    ​
     if (item.getNation().equals("汉族")) {
     // 如果汉族则存01
     item.setNation("01");
     } else {
     item.setNation("02");
     }
     return item;
     }
    }
    
    JOB 监听 CsvJobListener
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    ​
    /**
     * @description: JOB 监听  目前用于时间的记录
     *
     * @author: ShenShuaihu
     * @version: 1.0
     * @data: 2019-11-05 22:12
     */
    public class CsvJobListener implements JobExecutionListener {
    ​
     long startTime;
     long endTime;
    ​
     @Override
     public void beforeJob(JobExecution jobExecution) {
     startTime = System.currentTimeMillis();
     System.out.println("任务开始处理");
     }
    ​
     @Override
     public void afterJob(JobExecution jobExecution) {
    ​
     endTime = System.currentTimeMillis();
     System.out.println("任务处理结束");
     System.out.println("耗时:" + (endTime - startTime) + "ms");
     }
    }
    

    调用入口

    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    ​
    /**
     * @description:
     *
     * @author: ShenShuaihu
     * @version: 1.0
     * @data: 2019-10-12 10:45
     */
    @RestController
    public class CsvBatchController {
    ​
     @Autowired
     private JobLauncher jobLauncher;
    ​
     @Autowired
     private Job importJob;
     public JobParameters jobParameters;
    ​
     /**
     * 手动执行批量导入
     *
     * @param fileName
     * @return
     * @throws Exception
     */
     @RequestMapping("/imp")
     public String imp(String fileName) throws Exception{
     String path = "D:\\MyFile\\Develop\\IDEA\\demo\\spring\\spring-boot\\src\\main\\resources\\csv\\" + fileName + ".csv";
     jobParameters = new JobParametersBuilder()
     .addLong("time", System.currentTimeMillis())
     .addString("input.file.name", path)
     .toJobParameters();
     jobLauncher.run(importJob, jobParameters);
    ​
     return "OK";
     }
    ​
    ​
    }
    
    处理时间展示 1w数据
    1574304586738.png

    详细解读

    Spring Batch 结构

    Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。 一个Job包含很多Step,step就是每个job要执行的单个步骤。

    如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。 然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。

    Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。

    image

    spring batch的一个总体的架构如下:

    [图片上传失败...(image-4c2597-1574387997219)]

    JobRepository

    JobRepository是所有前面介绍的对象实例的持久化机制。他为JobLauncherJobStep的实现提供了CRUD操作。当一个Job第一次被启动时,一个JobExecution会从数据源中获取到,同时在执行的过程中StepExecutionJobExecution的实现都会记录到数据源中。使用@EnableBatchProcessing注解后JobRepository会进行自动化配置。

    JobLauncher

    JobLauncherJob的启动运行提供了一个边界的入口,在启动Job的同时还可以定制JobParameters

    批处理任务的主要业务逻辑都是在Step中去完成的。可以将Job理解为运行Step的框架,而Step理解为业务功能。

    Step配置

    StepJob中的工作单元,每一个Step涵盖了单行记录的处理闭环。下图是一个Step的简要结构:

    [图片上传失败...(image-a2a4af-1574387997219)]

    一个Step通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:1)面向分片的ChunkStep,2)面向过程的TaskletStep。但是基本上大部分情况下都是使用面向分片的方式来解决问题。

    参考内容

    https://blog.csdn.net/wuzhiwei549/article/details/85392128

    比较详细的几篇文章

    https://my.oschina.net/chkui/blog/3068863

    2019/11/21 于成都

    相关文章

      网友评论

        本文标题:spring batch

        本文链接:https://www.haomeiwen.com/subject/hljfwctx.html