背景
在后台服务开发中, 经常要用到多线程技术进行加速执行, 每家公司都有内部多线程的框架, 这些框架不是文档不规范, 就是只能适用特定场景.
基于这些原因, spring batch带来了更易用, 性能更好的解决方案.
基本概念
JobRepository
job仓库, 提供了JobLauncher, Job, Setp的CRUD实现
JobLauncher
job的启动器, 可以传入job所需参数
Job
一个任务概念, 可以包含多个step, 且对step的执行顺序进行编排
Step
具体步骤, 基本包含reader, writer, reader后可选processor, 或者使用tesklet
下面用一个图来说明他们之间的关系
spring-batch-reference-model.png概念还是挺简单的, 就是框架有点复杂, 用起来坑不少
实践代码
我这里使用java config形式使用spring batch, 需要额外注意的是, 所有带有@Bean的方法名不要重复
- build.gradle
buildscript {
ext {
springBootVersion = '1.5.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'war'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-batch')
}
这里引用了spring boot starter batch, 只是为了解决jar包依赖问题, 实际使用时没有使用spring boot.
创建任务使用的obj, TestObj.java
public class TestObj {
private String id;
private int index;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
}
主逻辑BatchConfiguration.java
@EnableBatchProcessing
public class BatchConfiguration {
Object lock = new Object();
Logger logger = Logger.getRootLogger();
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.afterPropertiesSet();
return stepBuilderFactory.get("step1")
.<TestObj, TestObj> chunk(10)
.reader(new ItemReader<TestObj>() {
private List<TestObj> list = null;
@Override
public synchronized TestObj read() throws Exception {
if (list == null) {
list = new ArrayList<TestObj>();
for (int i = 0; i < 10000; i++) {
TestObj obj = new TestObj();
obj.setId(UUID.randomUUID().toString());
obj.setIndex(i);
list.add(obj);
}
System.out.println("----------------"+list.size());
}
if (!this.list.isEmpty()) {
TestObj t = this.list.remove(0);
logger.info("step1==========read data:" + t.getIndex()));
return t;
}
return null;
}
})
.processor(new ItemProcessor<TestObj, TestObj>() {
public TestObj process(TestObj item) {
logger.debug("step1==============process: " + item.getIndex());
return item;
}
})
.writer(new ItemWriter<TestObj>() {
@Override
public void write(List<? extends TestObj> items) throws Exception {
logger.debug("step1=============write batch start: " + items.size());
for (TestObj item : items) {
logger.debug("step1=============write: " + item.getIndex());
}
logger.info("step1=============write batch end: " + items.size());
}
})
.taskExecutor(taskExecutor)
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(new Tasklet() {
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
logger.debug("step2========================Tasklet");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job job1(Step step1, Step step2) throws Exception {
return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).next(step2).build();
}
}
最后是启动类Main.java
public class Main {
public static void main(String[] args) {
Logger logger = Logger.getRootLogger();
logger.setLevel(Level.INFO);
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(BatchConfiguration.class);
ctx.refresh();
JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
Job job = (Job)ctx.getBean("job1");
try {
jobLauncher.run(job, new JobParameters());
logger.debug("------job1 finished");
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中需要注意以下几点
- @EnableBatchProcessing 会默认给出一些基础配置
- JobRepository - bean name "jobRepository"
- JobLauncher - bean name "jobLauncher"
- JobRegistry - bean name "jobRegistry"
- PlatformTransactionManager - bean name "transactionManager"
- JobBuilderFactory - bean name "jobBuilders"
StepBuilderFactory - bean name "stepBuilders"
- 此处使用了多线程进行执行任务, 其中taskExecutor.setAllowCoreThreadTimeOut(true);表示当没有任务时(默认为60s), 线程池中线程会自动销毁
- 自定义的ItemReader实现类中的read()方法需要加synchronized(多线程环境一定要加), 在官方文档上有提过一嘴, 如果不是使用多线程可以不加, 在官方很多默认实现中, 有一些是线程安全的, 有一些则不是, 如果非线程安全, 使用时都需要加上synchronized关键字
- 如果read()方法, 返回null, 则整个任务结束.
- chunk(10)表示当每次传入write的list的个数为10时, write执行一次, 为主要的调优方法
- 实际使用中, processor可以去掉
一些说明
spring batch本身有很多功能以及高级特性(比如监听, 任务流, spring batch admin), 本文中不做展开, 这里只针对最常用情况给出一个可用版本, 在我实际使用过程中, 发现大多数文章的例子基本都无法使用或者是使用xml或者不能单独执行.
很多时候还是要多看官方文档, 只是官方文档有点太平铺直叙了
网友评论