1、任务顺序执行
按照先step1再step2的顺序执行。以下是按照step为单位直接按序注册到jobBuilder中。
@Configuration
public class TestConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public Tasklet1 tasklet1;
@Autowired
public Tasklet1 tasklet2;
private static final String JOB1_NAME = "job1";
private static final String STEP1_NAME = "step1";
private static final String STEP2_NAME = "step2";
@Bean
public Job job1() {
return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(step1())
.next(step2()).end().build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get(STEP1_NAME).tasklet(tasklet1).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get(STEP2_NAME).tasklet(tasklet2).build();
}
}
也可以先把step整合到一个flow中,统一注册到jobBuilder中。
@Bean
public Job job1() {
Flow flow = new FlowBuilder<Flow>(FLOW1_NAME)
.from(step1())
.next(step2())
.build();
return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(flow)
.end().build();
}
2、任务条件执行
通过在条件执行的tasklet中设置该step执行状态来决定下一步需要执行的step是什么。
@Component
@StepScope
public class Tasklet1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
// 不同的业务分支设置step执行状态
if (check()) {
contribution.setExitStatus(ExitStatus.COMPLETED);
} else {
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
}
job中通过on方法来设定条件执行逻辑。
@Bean
public Job job1() {
return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(step1())
.on(ExitStatus.COMPLETED.getExitCode()).to(step2()).from(step1())
.on(ExitStatus.FAILED.getExitCode()).to(step3()).end().build();
}
3、任务并行执行
并行执行是通过多线程方式实现的。并行执行可分为不同step的并行及针对同一step处理数据量过大的分区执行。
不同step的实现
@Bean
public Job job1() {
Flow flow1 = new FlowBuilder<Flow>(FLOW1_NAME)
.start(new FlowBuilder<Flow>(STEP1_NAME).from(step1()).end())
.split(new SimpleAsyncTaskExecutor())
.add(new FlowBuilder<Flow>(STEP2_NAME).from(step2()).end()).build();
return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(flow1)
.end().build();
}
对于上面使用到的SimpleAsyncTaskExecutor执行器来讲,首先该执行器不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。
step分区的实现
首先需要为开启的各线程定义分区逻辑,通过此类可以向多线程执行的同一step传递不用的参数,已明确分区范围。
@Component
public class TestPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
for (int i = 1; i <= gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("param1", i);
map.put("partition" + i, context);
}
return map;
}
}
tasklet中接收到分区逻辑中设定的参数,并将参数传入对应的业务逻辑方法中实现分区处理。
@Component
public class Tasklet1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
int param1 = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("param1");
// testService.handle(param1);
return RepeatStatus.FINISHED;
}
}
jobConfig中如下
@Configuration
public class TestConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public Tasklet1 tasklet1;
@Autowired
private TestPartitioner testPartitioner;
private static final String JOB1_NAME = "job1";
private static final String MASTER_STEP_NAME = "mainStep";
private static final String SLAVE_STEP_NAME = "slaveStep";
@Bean
public Job job1() {
return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer())
.start(masterStep()).build();
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get(MASTER_STEP_NAME)
.partitioner(slaveStep().getName(), testPartitioner)
.partitionHandler(handler()).build();
;
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get(SLAVE_STEP_NAME).tasklet(tasklet1).build();
}
@Bean
public PartitionHandler handler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
// 根据实际需要设置开启step句柄数量(分区数)
handler.setGridSize(10);
handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
handler.setStep(slaveStep());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}
}
网友评论