spring batch
介绍
批处理的核心场景
-
从某个位置读取大量的记录,位置可以是数据库、文件或者外部推送队列(MQ)。
-
根据业务需要实时处理读取的数据。
-
将处理后的数据写入某个位置,可以是数据库、文件或者推送到队列。
Spring Batch能解决的批处理场景
Spring Batch为批处理提供了一个轻量化的解决方案,它根据批处理的需要迭代处理各种记录,提供事物功能。但是Spring Batch仅仅适用于"脱机"场景,在处理的过程中不能和外部进行任何交互,也不允许有任何输入。
Spring Batch的目标
-
开发人员仅关注业务逻辑,底层框架的交互交由Spring Batch去处理。
-
能够清晰分离业务与框架,框架已经限定了批处理的业务切入点,业务开发只需关注这些切入点(Read、Process、Write)。
-
提供开箱即用的通用接口。
-
快速轻松的融入Spring 框架,基于Spring Framework能够快速扩展各种功能。
-
所有现有核心服务都应易于更换或扩展,而不会对基础架构层产生任何影响。
(Infrastructure)部分主要分为3个部分。
JobLauncher
、Job
以及Step
。每一个Step
又细分为ItemReader
、ItemProcessor
、ItemWirte
。使用Spring Batch主要就是知道每一个基础设置负责的内容,然后在对应的设施中实现对应的业务。
快速编码
可以实现功能
通过CSV文件导入数据到数据库中
配置文件
<!-- spring batch 会自动加载hsqldb驱动 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<exclusions>
<exclusion>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>5.2.4.Final</version>
</dependency>
spring:
batch:
# 开启初始化数据库
initialize-schema: always
# 关闭自动执行
job:
enabled: false
Person.java 与数据库映射
@Data
@Entity
@AllArgsConstructor
@NoArgsConstructor
public class Person {
@Id
@GeneratedValue
private Long id;
@Size(max = 12, min = 2)
private String name;
private Integer age;
private String address;
private String nation;
}
SQL脚本
# spring batch
create table PERSON (ID NUMBER NOT null primary key, name varchar2(20), age number, nation varchar2(20), address varchar2(20) );
create sequence person_seq minvalue 2000 maxvalue 9999999999 start with 2020 increment by 1 cache 20;
要导入的数据
汪某某,11,汉族,合肥
张某某,12,汉族,上海
李某某,13,非汉族,武汉
刘某,14,非汉族,南京
欧阳某某,115,汉族,北京
核心配置类 TriggerBatchConfig
import com.ch9.batch.CsvBeanValidator;
import com.ch9.batch.CsvItemProcessor;
import com.ch9.batch.CsvJobListener;
import com.ch9.domain.Person;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
/**
* @description: 核心配置类
*
* @author: ShenShuaihu
* @version: 1.0
* @data: 2019-11-05 22:22
*/
@Slf4j
@Configuration
@EnableBatchProcessing
public class TriggerBatchConfig {
@Bean
@StepScope
public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception{
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new PathResource(pathToFile));
reader.setLineMapper(new DefaultLineMapper<Person>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] {"name", "age", "nation", "address"});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
});
}
});
return reader;
}
@Bean
public ItemProcessor<Person, Person> processor() {
CsvItemProcessor processor = new CsvItemProcessor();
processor.setValidator(csvBeanValidator());
return processor;
}
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
String sql = "insert into person " + "(id, name, age, nation, address)"
+ "values(hibernate_sequence.nextval, :name, :age, :nation, :address)";
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
@Bean
public JobRepository jobRepository1(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType("oracle");
return jobRepositoryFactoryBean.getObject();
}
/**
* JobListener定义
*
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public SimpleJobLauncher jobLauncher1(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository1(dataSource, transactionManager));
return jobLauncher;
}
/**
* JOB定义
*
* @param jobs
* @param s1
* @return
*/
@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
// 为job指定step 绑定监听器csvJobListener
return jobs.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)
.end()
.listener(csvJobListener())
.build();
}
/**
* Step 定义
* 批量处理数据 每次提交6500数据
*
* @param stepBuilderFactory
* @param reader
* @param writer
* @param processor
* @return
*/
@Bean
public Step step(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
// 给step绑定reader、processor和writer
return stepBuilderFactory
.get("step")
.<Person, Person>chunk(6500)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public CsvJobListener csvJobListener() {
return new CsvJobListener();
}
@Bean
public Validator<Person> csvBeanValidator() {
return new CsvBeanValidator<Person>();
}
}
数据校验类 CsvBeanValidator
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;
/**
* @description: 数据校验类 做基础的数据校验
*
* @author: ShenShuaihu
* @version: 1.0
* @data: 2019-11-05 21:49
*/
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;
@Override
public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
@Override
public void validate(T value) throws ValidationException {
// 使用 Validator的validate方法校验
Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);
if (constraintViolations.size() > 0) {
StringBuffer message = new StringBuffer();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage() + "\n");
}
throw new ValidationException(message.toString());
}
}
}
数据处理类CsvItemProcessor
import com.ch9.domain.Person;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;
/**
* @description: 数据处理 对传入是数据进行处理格式变更等
*
* @author: ShenShuaihu
* @version: 1.0
* @data: 2019-10-20 22:23
*/
public class CsvItemProcessor extends ValidatingItemProcessor<Person> {
@Override
public Person process(Person item) throws ValidationException {
// 执行之后才会调用自定义检验器
super.process(item);
if (item.getNation().equals("汉族")) {
// 如果汉族则存01
item.setNation("01");
} else {
item.setNation("02");
}
return item;
}
}
JOB 监听 CsvJobListener
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
/**
* @description: JOB 监听 目前用于时间的记录
*
* @author: ShenShuaihu
* @version: 1.0
* @data: 2019-11-05 22:12
*/
public class CsvJobListener implements JobExecutionListener {
long startTime;
long endTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
System.out.println("任务开始处理");
}
@Override
public void afterJob(JobExecution jobExecution) {
endTime = System.currentTimeMillis();
System.out.println("任务处理结束");
System.out.println("耗时:" + (endTime - startTime) + "ms");
}
}
调用入口
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @description:
*
* @author: ShenShuaihu
* @version: 1.0
* @data: 2019-10-12 10:45
*/
@RestController
public class CsvBatchController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;
public JobParameters jobParameters;
/**
* 手动执行批量导入
*
* @param fileName
* @return
* @throws Exception
*/
@RequestMapping("/imp")
public String imp(String fileName) throws Exception{
String path = "D:\\MyFile\\Develop\\IDEA\\demo\\spring\\spring-boot\\src\\main\\resources\\csv\\" + fileName + ".csv";
jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", path)
.toJobParameters();
jobLauncher.run(importJob, jobParameters);
return "OK";
}
}
处理时间展示 1w数据
1574304586738.png详细解读
Spring Batch 结构
Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。 一个Job包含很多Step,step就是每个job要执行的单个步骤。
如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。 然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。
Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。
imagespring batch的一个总体的架构如下:
[图片上传失败...(image-4c2597-1574387997219)]
JobRepository
JobRepository
是所有前面介绍的对象实例的持久化机制。他为JobLauncher
、Job
、Step
的实现提供了CRUD操作。当一个Job
第一次被启动时,一个JobExecution
会从数据源中获取到,同时在执行的过程中StepExecution
、JobExecution
的实现都会记录到数据源中。使用@EnableBatchProcessing
注解后JobRepository
会进行自动化配置。
JobLauncher
JobLauncher
为Job
的启动运行提供了一个边界的入口,在启动Job
的同时还可以定制JobParameters
:
批处理任务的主要业务逻辑都是在Step
中去完成的。可以将Job
理解为运行Step
的框架,而Step
理解为业务功能。
Step配置
Step
是Job
中的工作单元,每一个Step
涵盖了单行记录的处理闭环。下图是一个Step
的简要结构:
[图片上传失败...(image-a2a4af-1574387997219)]
一个Step
通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step
都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:1)面向分片的ChunkStep
,2)面向过程的TaskletStep
。但是基本上大部分情况下都是使用面向分片的方式来解决问题。
参考内容
https://blog.csdn.net/wuzhiwei549/article/details/85392128
比较详细的几篇文章
https://my.oschina.net/chkui/blog/3068863
2019/11/21 于成都
网友评论