美文网首页功能整合
SpringBoot 快速整合 SpringBatch

SpringBoot 快速整合 SpringBatch

作者: 程就人生 | 来源:发表于2022-07-21 21:26 被阅读0次

SpringBatch是一个轻量级的、全面的批处理框架,旨在开发对企业系统的日常运营至关重要的强大批处理应用程序。

一个典型的批处理流程如下图所示:

  • 第一步,从数据库、文件或队列中读取大量数据;
  • 第二步,对数据进行处理;
  • 第三步,对处理后的数据进行回写。

在SpringBatch框架中,有几个概念需要理解清楚:

  1. Job是批处理运行的基本单位,由Step组成。
  2. Job执行时可通过JobExecution进行监听,Step执行时可通过StepExecution进行监听,而ExecutionContext是执行的上下文。
  3. JobLauncher执行指定JobParameters的Job。
  4. ItemReader为Step读取数据,读取完返回null。
  5. ItemWriter为Step写入数据,写入完返回null。
  6. 读取和写入可以是单数,也可以是chunk块。
  7. ItemProcessor处理读取的数据,处理结束时返回null。
  8. skipLimit异常数据超过指定数量时,Step执行失败;也可以设置skip跳过异常,或者noskip不跳过异常。

下面看简单应用:
一、pom中引入架包;

<dependency>
  <groupId>org.mybatis.spring.boot</groupId>
  <artifactId>mybatis-spring-boot-starter</artifactId>
  <version>2.2.2</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
</dependency>

这里的SpringBoot,我们依旧采用2.1.4.RELEASE版本,使用了MySQL数据库以及mybatis插件,配置文件中包含mysql的配置。

二、application.properties 配置文件;

#数据库简要配置
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
mybatis.mapper-locations=classpath:mapper/*.xml

# 关闭job运行主类就启动事件,默认打开
# spring.batch.job.enabled=false
# 自动创建数据库表
spring.batch.initialize-schema=always
# 打开调试日志输出
logging.level.org.springframework.batch=DEBUG

一定要开启自动建表功能,否则在第一次执行时会报数据库相关表找不到。

三、配置文件;
通过注解@EnableBatchProcessing开启批处理操作;配置文件需要继承默认的批处理配置文件;

/**
 * 批处理配置文件
 * @author 程就人生
 * @date 2022年7月14日
 * @Description 
 * @EnableBatchProcessing 开启批处理操作
 */
@Configuration
@EnableBatchProcessing
public class BatchConfigurer extends DefaultBatchConfigurer{
 
    private static final Logger log = LoggerFactory.getLogger(BatchConfigurer.class);
  
    // Step构建工厂类
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    // Job构建工厂类
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
}

四、一个简单应用,读写字符串Step和Job;

/**
* 一个step,读-》处理-》写操作
* 固定字符串读写
* @return
*/
@Bean
publicStep myStep(){
   return stepBuilderFactory
           // 设置step名称
           .get("字符串读写测试")
           // 添加监听器
           .listener(new StepExecutionListener(){
            @Override
            public void beforeStep(StepExecution stepExecution) {
              log.info("读之前,step name:{}", stepExecution.getStepName());
            }
      
            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
              log.info("读之后,step name:{}", stepExecution.getStepName());
              return stepExecution.getExitStatus();
            }             
           })
           //这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
           .<String,String>chunk(3)
           // 读操作
           .reader(new ItemReader<String>(){
               private int num = 0;
            @Override
            public String read()
                throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
              // 只读10次
              if(num<10){
                  num++;
                  return "-----MyItemReader-----";
                }else{
                  return null;
                }
            }             
           })
           // 数据处理,业务逻辑可在这里处理
           .processor(new ItemProcessor<String,String>(){
            @Override
            public String process(String item) throws Exception {
              return item + "数据处理中...";
            }
           })
           // 写操作
           .writer(new ItemWriter<String>(){
            @Override
            public void write(List<? extends String> items) throws Exception {
              // 打印写入的数据
              for (String item : items) {
                    log.info("itemWriter:{}", item);
                  }
            }             
           })
           .build();

}

/**
* 一个flow类型的job
* @return
*/
@Bean
public Job MyJob(){
  return jobBuilderFactory
  // job名称
  .get(UUID.randomUUID().toString().replaceAll("-", ""))
  // 添加监听器
  .listener(new JobExecutionListenerSupport(){
      //所有处理结束后调用
      @Override
      public void afterJob(JobExecution jobExecution) {
          if(jobExecution.getStatus() == BatchStatus.COMPLETED){
            log.info("操作完毕~!");
          }else{
            log.error("操作有误~!");
          }
      }
  })
  // 定义作业参数检查器
  .validator(new JobParametersValidator(){
  @Override
  public void validate(JobParameters parameters) throws JobParametersInvalidException {
  // TODO Auto-generated method stub  
  }                  
  })
  // 
  .incrementer(new RunIdIncrementer())
  // 创建一个job builder执行一个step or 一系列step
  .flow(myStep())
  // 执行完毕后结束进程
  .end()
  .build();
  }

两个方法都放在config配置文件中,启动主类便可运行,执行结果如下图所示:


五、从文件读取处理后写入到文件;

/**
  * 从文件中读取
  * @return
  */
 @Bean
 public ItemReader<String> textReader(){
     FlatFileItemReader<String> reader=new FlatFileItemReader<>();
     File file = new File("C:/2020workspace/SpringBoot-Batch/src/main/java/com/example/demo/config/aa.txt");
     reader.setResource(new FileSystemResource(file));
     reader.setLineMapper(new LineMapper<String>() {
         @Override
         public String mapLine(String line, int lineNumber) throws Exception {
           log.info("{}行,内容:{}", lineNumber, line);
             return line;
         }
     });
     return reader;
 }
   
 /**
  * 写入文件
  * @return
  */
 @Bean
 public FlatFileItemWriter<String> txtItemWriter() {
     FlatFileItemWriter<String> txtItemWriter = new FlatFileItemWriter<String>();
     File file = new File("C:/2020workspace/SpringBoot-Batch/src/main/java/com/example/demo/config/bb.txt");
     txtItemWriter.setAppendAllowed(true);
     txtItemWriter.setEncoding("UTF-8");
     // new ClassPathResource("/data/sample-data.txt")
     txtItemWriter.setResource(new FileSystemResource(file));   
     txtItemWriter.setLineAggregator(new DelimitedLineAggregator<String>() {
       
     });
     return txtItemWriter;
 }
 
/**
 * 一个step,读-》处理-》写操作
 * 从一个文件读取到另一个文件,每次处理10行
 * @return
 */
@Bean
public Step myStep1(){
    return stepBuilderFactory
        // 设置step名称
        .get("文件读写")
        // 添加监听器
        .listener(new StepExecutionListener(){
      @Override
      public void beforeStep(StepExecution stepExecution) {
        log.info("读之前,step name:{}", stepExecution.getStepName());
      }

      @Override
      public ExitStatus afterStep(StepExecution stepExecution) {
        log.info("读之后,step name:{}", stepExecution.getStepName());
        return stepExecution.getExitStatus();
    }
    })
        //这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
        .<String,String>chunk(2)
        // 需要读的文件,纯粹读取
        .reader(textReader())
        // 数据处理,业务逻辑可在这里处理
        .processor(new ItemProcessor<String,String>(){
      @Override
      public String process(String item) throws Exception {
        return item + "数据处理中...";
      }
      })
      // 写操作
      .writer(txtItemWriter())
      .build();
}

代码说明:把读取文件的和写入文件的功能单独拎出来,又增加了一个针对读写文件的step,把上面Job中的step替换一下,在aa.txt文件中预先输入三行文字,每次处理2行文字,bb.txt是空白文档,启动主类,查看运行结果:


再打开bb.txt文件,可以看到该文件中有三行文字,这三行文字就是从aa.txt文本中读取处理,经过ItemProcessor处理后又写入到bb.txt的文字。


最后,****让我们看看数据库发生了什么?

看,我们所执行的批处理操作已经被记录到数据库里面了。

官网:https://spring.io/projects/spring-batch

相关文章

网友评论

    本文标题:SpringBoot 快速整合 SpringBatch

    本文链接:https://www.haomeiwen.com/subject/zcjpirtx.html