美文网首页IT技术文章收藏我爱编程
Elastic+Spring-batch+Mybatis整合实现

Elastic+Spring-batch+Mybatis整合实现

作者: 行径行 | 来源:发表于2018-05-23 23:17 被阅读669次

    背景

    因为有企业级批处理需求,具体到应用场景就是在三方支付系统中,日切后要进行清分结算的跑批处理。所以使用到成熟的spring batch,依托其强大且灵活的批量处理功能,再加上elastic任务调度整合来实现清分结算业务流程。本文章就是分别说明了elastic和spring-batch + mybatis的常规使用和两者的整合,仅供大家参考。。

    项目整体结构

    就先把elastic和spring-batch+mybatis整合的项目工程结构列出来,该工程实现包括:

    • Elastic多种作业类型的开发和使用
    • 通过Elastic调度spring-batch作业,将数据库中存储行业信息记录按照id的奇偶分别查询出来,写入两个不同的文件中。(也就是通过elastic的分片作业开发实现)
    • Elastic-console管理控制台的界面手动触发操作


      image

    Elastic-Lite-Job

    Elastic是一个分布式任务调度框架,可以利用zookeeper作为调度中心,通过管理控制台对任务进行手动关闭、触发、重启,并支持并行调度和任务分片,对spring支持也友好。具体参考官方文档

    • 整体架构图(图片来源官网)
      image


    • 配置依赖
      不多说,直接参考官网依赖maven进行配置:
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <latest.release.version>2.1.5</latest.release.version>
        </properties>
    
            <!-- 添加elastic-job -->
            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-core</artifactId>
                <version>${latest.release.version}</version>
            </dependency>
            <dependency>
                <groupId>com.dangdang</groupId>
                <artifactId>elastic-job-lite-spring</artifactId>
                <version>${latest.release.version}</version>
            </dependency>
            <!-- end -->
    

    在pom.xml配置即可。

    • 作业开发

    配置了elastic的两种作业类型:简单作业流式处理作业
    SimpleJob对应的实现类:

    @Component
    public class SimpleJobA implements SimpleJob{
    
        @Override
        public void execute(ShardingContext context) {
            System.out.println(String.format("------SimpleJobA: Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
        }
    
    }
    

    数据流作业实现类:DataFlowJobEven和DataFlowJobOdd分别查询数据库行业表Industry的id奇偶记录并输出:

    @Component
    public class DataFlowJobEven implements DataflowJob<Industry>{
    
        @Autowired
        private IndustryService industryService;
        
        @Resource
        private IndustryDAO industry;
        
        @Override
        public List<Industry> fetchData(ShardingContext context) {
            System.out.println(String.format("------DataFlowJobEven: Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
            List<Industry> retList = industry.queryListEven();
            return retList;
        }
    
        @Override
        public void processData(ShardingContext context, List<Industry> list) {
            System.out.println("Even count: " + list.size());
            for(Industry in : list) {
                System.out.println(in.getId() + "--" +in.getName() +"--" + in.getEnname());
            }
             
        }
    
    }
    
    
    @Component
    public class DataFlowJobOdd implements DataflowJob<Industry>{
    
        @Resource
        private IndustryService industryService;
        
        @Resource
        private IndustryDAO industry;
        
        @Override
        public List<Industry> fetchData(ShardingContext context) {
            System.out.println(String.format("------DataFlowJobOdd: Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
            System.out.println(industry);
            List<Industry> retList = industry.queryListOdd();
            return retList;
        }
    
        @Override
        public void processData(ShardingContext context, List<Industry> list) {
            System.out.println("Odd count: " + list.size());
            for(Industry in : list) {
                System.out.println(in.getId() +" --" + in.getName() +"--" + in.getEnname());
            }
            
        }
    
    }
    

    其中作业流的会先调用fetchData方法查询满足条件的数据,再传入processData方法中进行批量处理。

    • 配置作业

    关于配置文件中的注册中心zookeeper需要自行安装,单机安装也挺简单的,在此就不在说明。

    文件名:elastic-job.xml。配置可以参考:elastic-job配置手册

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:job="http://www.dangdang.com/schema/ddframe/job"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.dangdang.com/schema/ddframe/reg
                            http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                            http://www.dangdang.com/schema/ddframe/job
                            http://www.dangdang.com/schema/ddframe/job/job.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd                         
                            ">
        <!-- 这里配置的zookeeper与elastic console有关 -->                   
        <reg:zookeeper server-lists="192.168.7.21:3181" id="goPayCenter"
            namespace="payment-ls" base-sleep-time-milliseconds="1000"
            max-retries="3" />
    
        <!-- 作业配置A -->
        <job:simple id="simpleJobA" sharding-total-count="2"
            cron="5 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobA"
            sharding-item-parameters="0=A,1=B" overwrite="true" description="清分批量处理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 
    
        
        <!-- 流数据作业 -->
        <job:dataflow sharding-total-count="1" cron="2 * * * * ?"
            registry-center-ref="goPayCenter" id="dataFlowJobEven" class="newgo.job.DataFlowJobEven" streaming-process="false" overwrite="true" description="查询偶数id数据"/>
        <job:dataflow sharding-total-count="1" cron="2 * * * * ?"
            registry-center-ref="goPayCenter" id="dataFlowJobOdd" class="newgo.job.DataFlowJobOdd" streaming-process="false" overwrite="true" description="查询奇数id数据"/>
    
    </beans>
    

    通过xml节点就能指定配置简单作业和数据流作业。

    • 控制台调度
      控制台需要的运行脚本:elastic-job-lite-console 密码: ygfj

      在windows中,运行bat脚本,就会运行一个web项目,端口默认8899,本地打开地址:http://localhost:8899即可进入控制台。默认是以root登录

      选择Global Settings --》 Registry center,点击Add按钮,添加zookeeper注册中心,添加参数要与elastic-job.xml中配置zookeeper的信息:

      Name -> id,address:server-list,namespace:namespace
        <reg:zookeeper server-lists="192.168.7.21:3181" id="goPayCenter"
            namespace="payment-ls" base-sleep-time-milliseconds="1000"
            max-retries="3" />
    

    点击submit即可看到记录,点击connect连接,即可看到左边列表Job operation上有所有状态的job作业信息,如下图:

    image
    可以看到,可以对所有任务进行调度,手动触发多种操作。
    • 任务异常监听处理

    可以定义自定义监听器,配置在指定的任务job上,当该job抛出异常的时候,就会进入该拦截器进行处理。配置自定义拦截器MyJobExceptionHandler:

    public class MyJobExceptionHandler implements JobExceptionHandler{
    
        @Override
        public void handleException(String jobName, Throwable cause) {
            //System.out.println("异常方法: " + cause.getStackTrace()[0].getMethodName());
            System.out.println(String.format("任务[%s]调度异常", jobName) + ",异常类型: " + cause.toString());
        }
    
    }
    

    结合job配置:在节点有配置属性,设置即可。

    <job:simple id="simpleJobA" sharding-total-count="2"
            cron="5 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobA"
            sharding-item-parameters="0=A,1=B" overwrite="true" description="清分批量处理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 
    

    Elastic整合SpringBathc

    Elastic主要在任务调度、治理方面有优势,具体的作业操作则是使用spring batch。其中作业读取数据可以从文件、也可以通过mybatis读取数据库来处理数据。

    其中,elastic作业配置到elastic-job.xml中,spring batch配置文件配置在spring-batch.xml中。

    Spring Bath:是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。更多参考官网

    主要关心的是批处理任务中包含的主体对象,每个对象负责一个任务处理中不同的过程,如下图:(图片来自官网)

    image

    对于spring batch中每种不同主体作用,主要关注的是job,对应一个任务单元。该job就可以映射成elastic中的job,两者都是表示一个任务处理,那么就可以将spring batch的job整合到elastic的job中,即可实现elastic对这些job的调用处理。

    从上图中也可以知道一个job可以对应多个step,每个step是由三大组件构成,也是也就对应着IPO(输入-处理-输出)的抽象。我们在对数据的批量处理过程中,就可以通过自定义类,实现这些接口来实现不同的业务需求。

    主要的就是ItemReader、ItemProcessor、ItemWriter三者的实现。我们开发主要关注的也就是数据源数据是如何处理,最后如何输出的

    • 读取文件数据(Flat文件格式)
      Flat类型文件是一种简单文本格式文件类型,通常经过分隔符分割、或者定长字段来描述数据格式。

      本项目的源数据是batch-data-source.csv文件,文件内容:
    1,PENDING
    2,PENDING
    3,PENDING
    4,PENDING
    5,PENDING
    6,PENDING
    7,PENDING
    8,PENDING
    9,PENDING
    10,PENDING
    

    需求就是通过job读取该文件内容,进行处理,将PENDING设置成SENT,写入到新的文件batch-data-target.csv

      • LineMapper

    Linemapper作用就是将文件中每一行的数据,转换成POJO来批量处理。也就是文件内容映射到对象处理器。

    public class HelloLineMapper implements LineMapper<DeviceCommand>{
    
        private static final String FILE_SPLIT_SIGN = ",";
        @Override
        public DeviceCommand mapLine(String line, int lineNumber) throws Exception {
            String[] args = line.split(FILE_SPLIT_SIGN);
            DeviceCommand dc = new DeviceCommand();
            dc.setId(args[0]);
            dc.setStatus(args[1]);
            return dc;
        }
    
    }
    
    

    其中,文件映射成的bean属性定义和文件内容一一对应的:

    public class DeviceCommand {
    
        private String id;
        private String status;
        //getter setter..
    }
    

    该linemapper会在ItemReader使用的时候,通过set设置到FlatFileItemReader属性中进行使用。

      • ItemProcessor

    ItemReader读取到数据后,就会通过ItemProcessor对数据进行处理,很有些Java8的stream流式编程的意思,前置的输出就是下一个阶段的输入,当然两者的操作对象类型必然是一致的。

    public class HelloItemProcessor implements ItemProcessor<DeviceCommand,DeviceCommand>{
    
        @Override
        public DeviceCommand process(DeviceCommand deviceCommand) throws Exception {
            System.out.println("send command to device, id=" + deviceCommand.getId());
             deviceCommand.setStatus("SENT");
            return deviceCommand;
        }
    
    }
    

    处理目地就是就将数据源的PENGDING更新成SENT再返回命令对象。同理,可以将处理后的对象作为输入,传给ItemWriter处理。

      • LineAggregator

    对于FlatItem来说,LineAggregator作用就是制定将处理器处理后的数据如何处理,按照什么样的形式将每个对象写入文件的每一行。该数据输出规则也会通过属性设置到FlatFileItemReader中。

    public class HelloLineAggregator implements LineAggregator<DeviceCommand> {
    
        @Override
        public String aggregate(DeviceCommand deviceCommand) {
            StringBuffer sb = new StringBuffer();
            sb.append(deviceCommand.getId());
            sb.append("|");
            sb.append(deviceCommand.getStatus());
            return sb.toString();
        }
    
    }
    

    按照|分隔符将内容输出到文件中每一行。

      • Job作业开发

    elastic-job.xml中配置了作业B,如下,在其作业实现类中,就是使用了spring-batch的job定义实现:包括使用上面定义的ItemReader、ItemProcessor、ItemWriter。

        <!-- 作业配置B -->
        <job:simple id="simpleJobB" sharding-total-count="2"
            cron="10 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobB"
            sharding-item-parameters="0=test1,1=test2" overwrite="true"
            description="结算批量处理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 
    

    spring-batch可以使用编程式开发,也可以使用xml配置,在此是直接在代码里面构建job,step。作业B中结合batch的具体实现如下:

    @Component
    public class SimpleJobB implements SimpleJob{
        
        @Autowired
        private JobLauncher jobLauncher;
        
        @Autowired
        private JobRepository jobRepository;
        
        @Autowired
        private PlatformTransactionManager transactionManager;
    
        @Override
        public void execute(ShardingContext context) {
            System.out.println(String.format("------SimpleJobB: Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
            
               ClassPathResource resource = new ClassPathResource("batch-data-source.csv");
               FileSystemResource resource2 = new FileSystemResource("E:\\work\\newgo\\src\\main\\resources\\batch-data-target.csv");
               //通过classPathResource不可写入文件数据?
              // ClassPathResource resource2 = new ClassPathResource("batch-data-target.csv");
              
               //1.0 获取任务启动器
               
               //2.0 创建reader(Resource + 文件读取映射器)
               FlatFileItemReader<DeviceCommand> flatFileItemReader = new FlatFileItemReader<>();
               flatFileItemReader.setLineMapper(new HelloLineMapper());
               flatFileItemReader.setResource(resource);
               
               //3.0 创建处理器processor
               HelloItemProcessor processor = new HelloItemProcessor();
               
               //4.0 创建writer
               FlatFileItemWriter<DeviceCommand> flatFileItemWriter = new FlatFileItemWriter<>();
               flatFileItemWriter.setResource(resource2);
               flatFileItemWriter.setLineAggregator(new HelloLineAggregator());
               //5.0 创建作业步骤step
               StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
               //流式编程
               Step step = stepBuilderFactory.get("step").<DeviceCommand,DeviceCommand>chunk(1)
                                                         .reader(flatFileItemReader)
                                                         .processor(processor)
                                                         .writer(flatFileItemWriter)
                                                         .build();
               
                                                                  
               //6.0 创建作业job,关联步骤step
               JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
    //         Job job = jobBuilderFactory.get("job-" + Math.random()).start(step).build();
               Job job = jobBuilderFactory.get("job").start(step).build();
               int i = 5 / 0;
               
               LinkedHashMap<String, JobParameter> parameterMap = new LinkedHashMap<String,JobParameter>();
               JobParameter datetime = new JobParameter(dateFormatter(new Date())); 
               parameterMap.put("datetime", datetime);
               JobParameters paramters = new JobParameters(parameterMap);
               //7.0 启动任务
               try {
                jobLauncher.run(job, paramters);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        
        public String dateFormatter(Date date) {
    //      SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss");
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
            return df.format(date);
        }
    
    }
    
      • 定时器运行

    用常规spring启动方式来启动:

    public class App 
    {
        public static void main( String[] args )
        {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"classpath:/spring/springBeans.xml"});
        }
    }
    

    根据定时器配置时间,作业会进行数据处理。,作业操作完成之后,就可以发现文件batch-data-target.csv中的内容变成了如下内容:

    1|SENT
    2|SENT
    3|SENT
    4|SENT
    5|SENT
    6|SENT
    7|SENT
    8|SENT
    9|SENT
    10|SENT
    
    • 读取数据库数据(关系数据库)

    很多应用场景且大量数据都是存在数据库的,那么spring-batch是如何从数据库中读取数据。再次就介绍两种:JdbcCursorItemReaderMyBatisBatchItemWriter。前者是直接基于原始SQL进行处理,后者是利用mybatis框架进行整合查询。

      • 基于jdbc游标和elastic分片处理

    思路是:分别定义两个job,分别利用JdbcCursorItemReader设置不同的查询SQL来查询不同的数据。利用elastic的分片来分别执行不同的job。

    1.0 先定义两个不同的job[在spring-batch.xml中]

    包括三大核心组件的定义IPO,自定义查询SQL。

        <!-- 配置作业STEP统一拦截器 -->
        <batch:step id="abstractStep" abstract="true">
            <batch:listeners>
                <batch:listener ref="readItemListener" />
                <batch:listener ref="processStepListener" />
            </batch:listeners>
        </batch:step>
        <bean id="abstractCursorReader" abstract="true"
            class="org.springframework.batch.item.database.JdbcCursorItemReader">
            <property name="dataSource" ref="dataSource" />
        </bean>
        <!-- 读取行业偶数列表配置: start -->
        <batch:job id="industryEvenProcessJob" restartable="true">
            <batch:step id="industryEvenStep" parent="abstractStep">
                <batch:tasklet>
                    <batch:chunk reader="industryEvenJdbcReader" processor="industryProcessor" writer="industryEvenJdbcWriter" commit-interval="50"/>
                </batch:tasklet>
            </batch:step>
        </batch:job>
    
        <bean id="industryEvenJdbcReader" scope="step" parent="abstractCursorReader">
            <property name="sql">
                <value><![CDATA[select id, name,en_name, tm_smp from t_industry WHERE id % ? != 0]]></value>
            </property>
            <property name="rowMapper" ref="industryRowMapper"/>
            <property name="preparedStatementSetter" ref="industryPreparedStatementSetter"/>
            <property name="fetchSize" value="20"/>
        </bean>
        
        <bean id="industryProcessor" class="newgo.job.dbmapper.IndustryProcessor"/>
        
        <bean id="industryEvenJdbcWriter"  class="newgo.job.dbmapper.IndustryEvenJdbcWriter"/>
        <bean id="industryRowMapper" class="newgo.job.dbmapper.IndustryRowMapper"/>
        <bean id="industryPreparedStatementSetter" class="newgo.job.dbmapper.IndustryPreparedStatementSetter"/> 
        <!-- 读取行业偶数列表配置: end -->
        
        
        <!-- 读取行业奇数列表配置: start -->
        <batch:job id="industryOddProcessJob" restartable="true">
            <batch:step id="industryOddStep" parent="abstractStep">
                <batch:tasklet>
                    <batch:chunk reader="industryOddJdbcReader" processor="industryProcessor" writer="industryOddJdbcWriter" commit-interval="50"/>
                </batch:tasklet>
            </batch:step>
        </batch:job>
    
        <bean id="industryOddJdbcReader" scope="step" parent="abstractCursorReader">
            <property name="sql">
                <value><![CDATA[select id, name,en_name, tm_smp from t_industry WHERE id % ? = 0]]></value>
            </property>
            <property name="rowMapper" ref="industryRowMapper"/>
            <property name="preparedStatementSetter" ref="industryPreparedStatementSetter"/>
            <property name="fetchSize" value="20"/>
        </bean>
        
        <bean id="industryOddJdbcWriter"  class="newgo.job.dbmapper.IndustryOddJdbcWriter"/>
        <!-- 读取行业偶数列表配置: end -->
    

    2.0 两个作业的IPO具体实现在newgo.job.dbmapper目录下,这里代码就不贴出来了。
    3.0 定义elastic的作业分片[elastic-job.xml]
    分片是通过sharding-item-parameters来设定的。

    <!-- 数据库查询行业作业A : 根据查询任务分片,分别查询奇偶行列 -->
        <job:simple id="elasIndustryJob" sharding-total-count="2" cron="2 * * * * ?" 
        sharding-item-parameters="0=even,1=odd"
        registry-center-ref="goPayCenter" class="newgo.job.IndustryOperatorJob" 
        job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 
    

    4.0 elastic的作业的分片实现:

    @Component
    public class IndustryOperatorJob implements SimpleJob {
        @Autowired
        private JobLauncher jobLauncher;
        
        @Autowired
        @Qualifier("industryEvenProcessJob")
        private Job industryEvenProcessJob;
        
        @Autowired
        @Qualifier("industryOddProcessJob")
        private Job industryOddProcessJob;
        
        @Override
        public void execute(ShardingContext context) {
            System.out.println(String.format("------IndustryQueryJob: Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
            try {
                String shardingParameter = context.getShardingParameter();
                switch (shardingParameter) {
                case "odd":
                    JobExecution oddJobResult = jobLauncher.run(industryOddProcessJob, new JobParameters());
                    System.out.println("IndustryOddProcessJob start result: " + oddJobResult.toString());
                    break;
                case "even":
                    JobExecution evenJobResult = jobLauncher.run(industryEvenProcessJob, new JobParameters());
                    System.out.println("IndustryEvenProcessJob start result: " + evenJobResult.toString());
                    break;
                default:
                    break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
      • 基于mybatis分页查询

    基于mybatis更方便开发和维护,也是我们常规操作数据库的途径。下面例子就是通过mybatis来查询数据,并写入文件中。

    1.0 配置spring-batch的step[spring-batch.xml]

    定义了mybatis的ItemReader实现。其中queryId就是对应mybatis接口方法名。

        <bean id="itemReader" scope="step" class="org.mybatis.spring.batch.MyBatisPagingItemReader">
           <property name="sqlSessionFactory" ref="sqlSessionFactory" />
            <property name="pageSize" value="100"/>
            <property name="queryId" value="" />
        </bean>
    

    2.0 配置elastic的作业定时器[elastic-job.xml]

        <!-- 数据库查询行业作业B : 通过mybatis映射 -->
        <job:simple id="mybatisIndustryJob" sharding-total-count="1" cron="1 * * * * ?" 
        registry-center-ref="goPayCenter" class="newgo.job.MybatisIndustryJob" 
        job-exception-handler="newgo.handler.MyJobExceptionHandler"/>
    

    3.0 作业job具体实现开发

    @Component
    public class MybatisIndustryJob  implements SimpleJob{
        @Autowired
        private JobLauncher jobLauncher;
        
        @Autowired
        private JobRepository jobRepository;
        
        @Autowired
        private SqlSessionFactory sqlSessionFactory;
        @Autowired
        private PlatformTransactionManager transactionManager;
        
        @Autowired
        private MybatisWriter mybatisWriter;
        
        @Override
        public void execute(ShardingContext context) {
            System.out.println(String.format("------MybatisIndustryJob: Thread ID: %s, 任务总片数: %s, 当前分片项: %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
            MyBatisPagingItemReader<Industry> itemReader = new MyBatisPagingItemReader<Industry>();
            Map<String,Object> params = Maps.newHashMap();
            params.put("name", "五金");
            itemReader.setQueryId("newgo.dao.IndustryDAO.queryIndustryByName");
            itemReader.setPageSize(20);
            itemReader.setParameterValues(params);
            itemReader.setSqlSessionFactory(sqlSessionFactory);
            try {
                itemReader.afterPropertiesSet();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
            StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
               //流式编程
               Step step = stepBuilderFactory.get("step-mybats").<Industry,IndustryFull>chunk(1)
                                                         .reader(itemReader)
                                                         .processor(new MybatisIndustryProcessor())
                                                         .writer(mybatisWriter)
                                                         .build();
               
                                                                  
               JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
               Job job = jobBuilderFactory.get("mybatisIndustryJob").start(step).build();
               try {
                JobExecution jobResult = jobLauncher.run(job, new JobParameters());
                System.out.println("MybatisIndustryJob 执行结果:" + jobResult.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
    
    }
    

    这里要注意的就是要设置itemReader.afterPropertiesSet();方法,不然mybatis在读取数据的时候,sqlSessionTemplate未注入抛出NPE。并且在每次通jobLauncher.run启动的时候,可以根据返回值来查看异常信息。在调试mybaatis查询方法doPageRead时候,发现内部是通过CopyOnWriteArrayList存放数据的,也就是利用并发安全的写时复制机制的集合来保存读取到数据,在高并发且读远高于写的时候,可以用该类型集合来保存数据。

    4.0 处理器和输出的实现类都是在newgo.job.mapper包下,再次不在P出。

    5.0 在spring batch的不同阶段,都可以注入自定义的监听器来对每个阶段数据进行预处理,实现在包newgo.listener下。

    更多高级特性还需要在实际应用中不断调试和实践。。。

    elastic+spring-batch+mybatis源代码-- 密码: hvzr

    参考:

    相关文章

      网友评论

      本文标题:Elastic+Spring-batch+Mybatis整合实现

      本文链接:https://www.haomeiwen.com/subject/xbadjftx.html