美文网首页
使用javaConfig方式来控制spring batch执行流

使用javaConfig方式来控制spring batch执行流

作者: 七玄之主 | 来源:发表于2018-10-31 16:25 被阅读0次

    1、任务顺序执行
    按照先step1再step2的顺序执行。以下是按照step为单位直接按序注册到jobBuilder中。

    @Configuration
    public class TestConfiguration {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Autowired
        public Tasklet1 tasklet1;
    
        @Autowired
        public Tasklet1 tasklet2;
    
        private static final String JOB1_NAME = "job1";
        private static final String STEP1_NAME = "step1";
        private static final String STEP2_NAME = "step2";
    
        @Bean
        public Job job1() {
            return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(step1())
                            .next(step2()).end().build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get(STEP1_NAME).tasklet(tasklet1).build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get(STEP2_NAME).tasklet(tasklet2).build();
        }
    }
    
    

    也可以先把step整合到一个flow中,统一注册到jobBuilder中。

    
    @Bean
    public Job job1() {
        Flow flow = new FlowBuilder<Flow>(FLOW1_NAME)
                        .from(step1())
                        .next(step2())
                        .build();
        return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(flow)
                        .end().build();
    }
    
    

    2、任务条件执行
    通过在条件执行的tasklet中设置该step执行状态来决定下一步需要执行的step是什么。

    @Component
    @StepScope
    public class Tasklet1 implements Tasklet {
    
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
                        throws Exception {
            // 不同的业务分支设置step执行状态
            if (check()) {
                contribution.setExitStatus(ExitStatus.COMPLETED);
            } else {
                contribution.setExitStatus(ExitStatus.FAILED);
            }
    
            return RepeatStatus.FINISHED;
        }
    }
    

    job中通过on方法来设定条件执行逻辑。

    @Bean
    public Job job1() {
        return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(step1())
                        .on(ExitStatus.COMPLETED.getExitCode()).to(step2()).from(step1())
                        .on(ExitStatus.FAILED.getExitCode()).to(step3()).end().build();
    }
    

    3、任务并行执行
    并行执行是通过多线程方式实现的。并行执行可分为不同step的并行及针对同一step处理数据量过大的分区执行。

    不同step的实现

    @Bean
    public Job job1() {
        Flow flow1 = new FlowBuilder<Flow>(FLOW1_NAME)
                        .start(new FlowBuilder<Flow>(STEP1_NAME).from(step1()).end())
                        .split(new SimpleAsyncTaskExecutor())
                        .add(new FlowBuilder<Flow>(STEP2_NAME).from(step2()).end()).build();
    
        return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(flow1)
                        .end().build();
    }
    

    对于上面使用到的SimpleAsyncTaskExecutor执行器来讲,首先该执行器不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。

    step分区的实现
    首先需要为开启的各线程定义分区逻辑,通过此类可以向多线程执行的同一step传递不用的参数,已明确分区范围。

    @Component
    public class TestPartitioner implements Partitioner {
    
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
    
            Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
    
            for (int i = 1; i <= gridSize; i++) {
                ExecutionContext context = new ExecutionContext();
                context.putInt("param1", i);
    
                map.put("partition" + i, context);
            }
            return map;
        }
    }
    

    tasklet中接收到分区逻辑中设定的参数,并将参数传入对应的业务逻辑方法中实现分区处理。

    @Component
    public class Tasklet1 implements Tasklet {
    
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            int param1 = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("param1");
    
            // testService.handle(param1);
            return RepeatStatus.FINISHED;
        }
    }
    

    jobConfig中如下

    @Configuration
    public class TestConfiguration {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Autowired
        public Tasklet1 tasklet1;
    
        @Autowired
        private TestPartitioner testPartitioner;
    
        private static final String JOB1_NAME = "job1";
        private static final String MASTER_STEP_NAME = "mainStep";
        private static final String SLAVE_STEP_NAME = "slaveStep";
    
        @Bean
        public Job job1() {
            return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer())
                            .start(masterStep()).build();
        }
    
        @Bean
        public Step masterStep() {
            return stepBuilderFactory.get(MASTER_STEP_NAME)
                            .partitioner(slaveStep().getName(), testPartitioner)
                            .partitionHandler(handler()).build();
            ;
        }
    
        @Bean
        public Step slaveStep() {
            return stepBuilderFactory.get(SLAVE_STEP_NAME).tasklet(tasklet1).build();
        }
    
        @Bean
        public PartitionHandler handler() {
            TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
            // 根据实际需要设置开启step句柄数量(分区数)
            handler.setGridSize(10);
            handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
            handler.setStep(slaveStep());
            try {
                handler.afterPropertiesSet();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return handler;
        }
    }
    

    相关文章

      网友评论

          本文标题:使用javaConfig方式来控制spring batch执行流

          本文链接:https://www.haomeiwen.com/subject/oweatqtx.html