SpringBatch是一个轻量级的、全面的批处理框架,旨在开发对企业系统的日常运营至关重要的强大批处理应用程序。
一个典型的批处理流程如下图所示:
- 第一步,从数据库、文件或队列中读取大量数据;
- 第二步,对数据进行处理;
- 第三步,对处理后的数据进行回写。
在SpringBatch框架中,有几个概念需要理解清楚:
- Job是批处理运行的基本单位,由Step组成。
- Job执行时可通过JobExecution进行监听,Step执行时可通过StepExecution进行监听,而ExecutionContext是执行的上下文。
- JobLauncher执行指定JobParameters的Job。
- ItemReader为Step读取数据,读取完返回null。
- ItemWriter为Step写入数据,写入完返回null。
- 读取和写入可以是单数,也可以是chunk块。
- ItemProcessor处理读取的数据,处理结束时返回null。
- skipLimit异常数据超过指定数量时,Step执行失败;也可以设置skip跳过异常,或者noskip不跳过异常。
下面看简单应用:
一、pom中引入架包;
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
这里的SpringBoot,我们依旧采用2.1.4.RELEASE版本,使用了MySQL数据库以及mybatis插件,配置文件中包含mysql的配置。
二、application.properties 配置文件;
#数据库简要配置
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
mybatis.mapper-locations=classpath:mapper/*.xml
# 关闭job运行主类就启动事件,默认打开
# spring.batch.job.enabled=false
# 自动创建数据库表
spring.batch.initialize-schema=always
# 打开调试日志输出
logging.level.org.springframework.batch=DEBUG
一定要开启自动建表功能,否则在第一次执行时会报数据库相关表找不到。
三、配置文件;
通过注解@EnableBatchProcessing开启批处理操作;配置文件需要继承默认的批处理配置文件;
/**
* 批处理配置文件
* @author 程就人生
* @date 2022年7月14日
* @Description
* @EnableBatchProcessing 开启批处理操作
*/
@Configuration
@EnableBatchProcessing
public class BatchConfigurer extends DefaultBatchConfigurer{
private static final Logger log = LoggerFactory.getLogger(BatchConfigurer.class);
// Step构建工厂类
@Autowired
public StepBuilderFactory stepBuilderFactory;
// Job构建工厂类
@Autowired
public JobBuilderFactory jobBuilderFactory;
}
四、一个简单应用,读写字符串Step和Job;
/**
* 一个step,读-》处理-》写操作
* 固定字符串读写
* @return
*/
@Bean
publicStep myStep(){
return stepBuilderFactory
// 设置step名称
.get("字符串读写测试")
// 添加监听器
.listener(new StepExecutionListener(){
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("读之前,step name:{}", stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("读之后,step name:{}", stepExecution.getStepName());
return stepExecution.getExitStatus();
}
})
//这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
.<String,String>chunk(3)
// 读操作
.reader(new ItemReader<String>(){
private int num = 0;
@Override
public String read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// 只读10次
if(num<10){
num++;
return "-----MyItemReader-----";
}else{
return null;
}
}
})
// 数据处理,业务逻辑可在这里处理
.processor(new ItemProcessor<String,String>(){
@Override
public String process(String item) throws Exception {
return item + "数据处理中...";
}
})
// 写操作
.writer(new ItemWriter<String>(){
@Override
public void write(List<? extends String> items) throws Exception {
// 打印写入的数据
for (String item : items) {
log.info("itemWriter:{}", item);
}
}
})
.build();
}
/**
* 一个flow类型的job
* @return
*/
@Bean
public Job MyJob(){
return jobBuilderFactory
// job名称
.get(UUID.randomUUID().toString().replaceAll("-", ""))
// 添加监听器
.listener(new JobExecutionListenerSupport(){
//所有处理结束后调用
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED){
log.info("操作完毕~!");
}else{
log.error("操作有误~!");
}
}
})
// 定义作业参数检查器
.validator(new JobParametersValidator(){
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
// TODO Auto-generated method stub
}
})
//
.incrementer(new RunIdIncrementer())
// 创建一个job builder执行一个step or 一系列step
.flow(myStep())
// 执行完毕后结束进程
.end()
.build();
}
两个方法都放在config配置文件中,启动主类便可运行,执行结果如下图所示:
五、从文件读取处理后写入到文件;
/**
* 从文件中读取
* @return
*/
@Bean
public ItemReader<String> textReader(){
FlatFileItemReader<String> reader=new FlatFileItemReader<>();
File file = new File("C:/2020workspace/SpringBoot-Batch/src/main/java/com/example/demo/config/aa.txt");
reader.setResource(new FileSystemResource(file));
reader.setLineMapper(new LineMapper<String>() {
@Override
public String mapLine(String line, int lineNumber) throws Exception {
log.info("{}行,内容:{}", lineNumber, line);
return line;
}
});
return reader;
}
/**
* 写入文件
* @return
*/
@Bean
public FlatFileItemWriter<String> txtItemWriter() {
FlatFileItemWriter<String> txtItemWriter = new FlatFileItemWriter<String>();
File file = new File("C:/2020workspace/SpringBoot-Batch/src/main/java/com/example/demo/config/bb.txt");
txtItemWriter.setAppendAllowed(true);
txtItemWriter.setEncoding("UTF-8");
// new ClassPathResource("/data/sample-data.txt")
txtItemWriter.setResource(new FileSystemResource(file));
txtItemWriter.setLineAggregator(new DelimitedLineAggregator<String>() {
});
return txtItemWriter;
}
/**
* 一个step,读-》处理-》写操作
* 从一个文件读取到另一个文件,每次处理10行
* @return
*/
@Bean
public Step myStep1(){
return stepBuilderFactory
// 设置step名称
.get("文件读写")
// 添加监听器
.listener(new StepExecutionListener(){
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("读之前,step name:{}", stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("读之后,step name:{}", stepExecution.getStepName());
return stepExecution.getExitStatus();
}
})
//这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
.<String,String>chunk(2)
// 需要读的文件,纯粹读取
.reader(textReader())
// 数据处理,业务逻辑可在这里处理
.processor(new ItemProcessor<String,String>(){
@Override
public String process(String item) throws Exception {
return item + "数据处理中...";
}
})
// 写操作
.writer(txtItemWriter())
.build();
}
代码说明:把读取文件的和写入文件的功能单独拎出来,又增加了一个针对读写文件的step,把上面Job中的step替换一下,在aa.txt文件中预先输入三行文字,每次处理2行文字,bb.txt是空白文档,启动主类,查看运行结果:
再打开bb.txt文件,可以看到该文件中有三行文字,这三行文字就是从aa.txt文本中读取处理,经过ItemProcessor处理后又写入到bb.txt的文字。
最后,****让我们看看数据库发生了什么?
看,我们所执行的批处理操作已经被记录到数据库里面了。
网友评论