美文网首页功能整合
SpringBoot 快速整合 SpringBatch

SpringBoot 快速整合 SpringBatch

作者: 程就人生 | 来源:发表于2022-07-21 21:26 被阅读0次

    SpringBatch是一个轻量级的、全面的批处理框架,旨在开发对企业系统的日常运营至关重要的强大批处理应用程序。

    一个典型的批处理流程如下图所示:

    • 第一步,从数据库、文件或队列中读取大量数据;
    • 第二步,对数据进行处理;
    • 第三步,对处理后的数据进行回写。

    在SpringBatch框架中,有几个概念需要理解清楚:

    1. Job是批处理运行的基本单位,由Step组成。
    2. Job执行时可通过JobExecution进行监听,Step执行时可通过StepExecution进行监听,而ExecutionContext是执行的上下文。
    3. JobLauncher执行指定JobParameters的Job。
    4. ItemReader为Step读取数据,读取完返回null。
    5. ItemWriter为Step写入数据,写入完返回null。
    6. 读取和写入可以是单数,也可以是chunk块。
    7. ItemProcessor处理读取的数据,处理结束时返回null。
    8. skipLimit异常数据超过指定数量时,Step执行失败;也可以设置skip跳过异常,或者noskip不跳过异常。

    下面看简单应用:
    一、pom中引入架包;

    <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    

    这里的SpringBoot,我们依旧采用2.1.4.RELEASE版本,使用了MySQL数据库以及mybatis插件,配置文件中包含mysql的配置。

    二、application.properties 配置文件;

    #数据库简要配置
    spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    mybatis.mapper-locations=classpath:mapper/*.xml
    
    # 关闭job运行主类就启动事件,默认打开
    # spring.batch.job.enabled=false
    # 自动创建数据库表
    spring.batch.initialize-schema=always
    # 打开调试日志输出
    logging.level.org.springframework.batch=DEBUG
    

    一定要开启自动建表功能,否则在第一次执行时会报数据库相关表找不到。

    三、配置文件;
    通过注解@EnableBatchProcessing开启批处理操作;配置文件需要继承默认的批处理配置文件;

    /**
     * 批处理配置文件
     * @author 程就人生
     * @date 2022年7月14日
     * @Description 
     * @EnableBatchProcessing 开启批处理操作
     */
    @Configuration
    @EnableBatchProcessing
    public class BatchConfigurer extends DefaultBatchConfigurer{
     
        private static final Logger log = LoggerFactory.getLogger(BatchConfigurer.class);
      
        // Step构建工厂类
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
        
        // Job构建工厂类
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    }
    

    四、一个简单应用,读写字符串Step和Job;

    /**
    * 一个step,读-》处理-》写操作
    * 固定字符串读写
    * @return
    */
    @Bean
    publicStep myStep(){
       return stepBuilderFactory
               // 设置step名称
               .get("字符串读写测试")
               // 添加监听器
               .listener(new StepExecutionListener(){
                @Override
                public void beforeStep(StepExecution stepExecution) {
                  log.info("读之前,step name:{}", stepExecution.getStepName());
                }
          
                @Override
                public ExitStatus afterStep(StepExecution stepExecution) {
                  log.info("读之后,step name:{}", stepExecution.getStepName());
                  return stepExecution.getExitStatus();
                }             
               })
               //这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
               .<String,String>chunk(3)
               // 读操作
               .reader(new ItemReader<String>(){
                   private int num = 0;
                @Override
                public String read()
                    throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                  // 只读10次
                  if(num<10){
                      num++;
                      return "-----MyItemReader-----";
                    }else{
                      return null;
                    }
                }             
               })
               // 数据处理,业务逻辑可在这里处理
               .processor(new ItemProcessor<String,String>(){
                @Override
                public String process(String item) throws Exception {
                  return item + "数据处理中...";
                }
               })
               // 写操作
               .writer(new ItemWriter<String>(){
                @Override
                public void write(List<? extends String> items) throws Exception {
                  // 打印写入的数据
                  for (String item : items) {
                        log.info("itemWriter:{}", item);
                      }
                }             
               })
               .build();
    
    }
    
    /**
    * 一个flow类型的job
    * @return
    */
    @Bean
    public Job MyJob(){
      return jobBuilderFactory
      // job名称
      .get(UUID.randomUUID().toString().replaceAll("-", ""))
      // 添加监听器
      .listener(new JobExecutionListenerSupport(){
          //所有处理结束后调用
          @Override
          public void afterJob(JobExecution jobExecution) {
              if(jobExecution.getStatus() == BatchStatus.COMPLETED){
                log.info("操作完毕~!");
              }else{
                log.error("操作有误~!");
              }
          }
      })
      // 定义作业参数检查器
      .validator(new JobParametersValidator(){
      @Override
      public void validate(JobParameters parameters) throws JobParametersInvalidException {
      // TODO Auto-generated method stub  
      }                  
      })
      // 
      .incrementer(new RunIdIncrementer())
      // 创建一个job builder执行一个step or 一系列step
      .flow(myStep())
      // 执行完毕后结束进程
      .end()
      .build();
      }
    

    两个方法都放在config配置文件中,启动主类便可运行,执行结果如下图所示:


    五、从文件读取处理后写入到文件;

    /**
      * 从文件中读取
      * @return
      */
     @Bean
     public ItemReader<String> textReader(){
         FlatFileItemReader<String> reader=new FlatFileItemReader<>();
         File file = new File("C:/2020workspace/SpringBoot-Batch/src/main/java/com/example/demo/config/aa.txt");
         reader.setResource(new FileSystemResource(file));
         reader.setLineMapper(new LineMapper<String>() {
             @Override
             public String mapLine(String line, int lineNumber) throws Exception {
               log.info("{}行,内容:{}", lineNumber, line);
                 return line;
             }
         });
         return reader;
     }
       
     /**
      * 写入文件
      * @return
      */
     @Bean
     public FlatFileItemWriter<String> txtItemWriter() {
         FlatFileItemWriter<String> txtItemWriter = new FlatFileItemWriter<String>();
         File file = new File("C:/2020workspace/SpringBoot-Batch/src/main/java/com/example/demo/config/bb.txt");
         txtItemWriter.setAppendAllowed(true);
         txtItemWriter.setEncoding("UTF-8");
         // new ClassPathResource("/data/sample-data.txt")
         txtItemWriter.setResource(new FileSystemResource(file));   
         txtItemWriter.setLineAggregator(new DelimitedLineAggregator<String>() {
           
         });
         return txtItemWriter;
     }
     
    /**
     * 一个step,读-》处理-》写操作
     * 从一个文件读取到另一个文件,每次处理10行
     * @return
     */
    @Bean
    public Step myStep1(){
        return stepBuilderFactory
            // 设置step名称
            .get("文件读写")
            // 添加监听器
            .listener(new StepExecutionListener(){
          @Override
          public void beforeStep(StepExecution stepExecution) {
            log.info("读之前,step name:{}", stepExecution.getStepName());
          }
    
          @Override
          public ExitStatus afterStep(StepExecution stepExecution) {
            log.info("读之后,step name:{}", stepExecution.getStepName());
            return stepExecution.getExitStatus();
        }
        })
            //这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
            .<String,String>chunk(2)
            // 需要读的文件,纯粹读取
            .reader(textReader())
            // 数据处理,业务逻辑可在这里处理
            .processor(new ItemProcessor<String,String>(){
          @Override
          public String process(String item) throws Exception {
            return item + "数据处理中...";
          }
          })
          // 写操作
          .writer(txtItemWriter())
          .build();
    }
    

    代码说明:把读取文件的和写入文件的功能单独拎出来,又增加了一个针对读写文件的step,把上面Job中的step替换一下,在aa.txt文件中预先输入三行文字,每次处理2行文字,bb.txt是空白文档,启动主类,查看运行结果:


    再打开bb.txt文件,可以看到该文件中有三行文字,这三行文字就是从aa.txt文本中读取处理,经过ItemProcessor处理后又写入到bb.txt的文字。


    最后,****让我们看看数据库发生了什么?

    看,我们所执行的批处理操作已经被记录到数据库里面了。

    官网:https://spring.io/projects/spring-batch

    相关文章

      网友评论

        本文标题:SpringBoot 快速整合 SpringBatch

        本文链接:https://www.haomeiwen.com/subject/zcjpirtx.html