美文网首页
spring batch--> processor

spring batch--> processor

作者: 刘小刀tina | 来源:发表于2020-04-28 15:27 被阅读0次
    /**
     * @program: demo-spring-batch
     * @description 从数据库读到的数据 处理之后输出到文件里,用到processor处理
     * @author: tina.liu
     * @create: 2020-04-28 09:49
     **/
    @Configuration
    @EnableBatchProcessing
    public class ItemProcessor {
    
        @Autowired
        private DataSource dataSource ; //数据源
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory ;
    
        @Autowired
        @Qualifier(value = "nameUpperProcessor")
        private org.springframework.batch.item.ItemProcessor<User,User>  nameUpperProcessor;
    
        @Autowired
        @Qualifier(value = "idFilterProcessor")
        private org.springframework.batch.item.ItemProcessor<User,User>  idFilterProcessor;
    
    
        @Autowired
        @Qualifier(value = "dbFileWriter")
        private ItemWriter<? super User> dbFileWriter;
    
        /**
         * 创建Job
         * @return
         */
        @Bean(value = "itemProcessorJob2")
        public Job itemProcessorJob2(){
            return jobBuilderFactory.get("itemProcessorJob2")
                    .start(itemProcessorStep2())
                    .build();
        }
    
        /**
         * 创建step
         * @return
         */
        @Bean(value = "itemProcessorStep2")
        public Step itemProcessorStep2() {
            return stepBuilderFactory.get("itemProcessorStep2")
                    .<User,User>chunk(2)
                    .reader(fromdbReader())
                    .processor(process())
                    .writer(dbFileWriter)
                    .build();
        }
    
    
    
        ///有多种处理方式,对从数据库读取的数据进行一些处理
        @Bean
        public CompositeItemProcessor<User,User> process(){
            CompositeItemProcessor<User,User> processor = new CompositeItemProcessor<>();
            List<org.springframework.batch.item.ItemProcessor<User, User>> delegates = new ArrayList<>();
            delegates.add(nameUpperProcessor);
            delegates.add(idFilterProcessor);
            processor.setDelegates(delegates);
            return processor;
        }
    
    
        /**
         * 定义一个reader,从数据库读取数据
         * @return
         */
        @Bean(value = "fromdbReader")
        public JdbcPagingItemReader<User> fromdbReader() {
            JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
            //注入数据源
            reader.setDataSource(dataSource);
            //设置一次读取几条数据
            reader.setFetchSize(2);
            //将读取的记录转成user对象
            reader.setRowMapper(new RowMapper<User>() {
                @Override
                public User mapRow(ResultSet rs, int i) throws SQLException {
                    User user = new User();
                    user.setId(rs.getString(1));
                    user.setName(rs.getString(2));
                    user.setAge(rs.getString(3));
                    return user;
                }
            });
            //如何指定sql语句
            MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
            provider.setSelectClause("id,name,age");
            provider.setFromClause("from t_user");
            //如何根据哪个字段排序
            Map<String, Order> sort = new LinkedHashMap(1);
            sort.put("id", Order.ASCENDING); // 根据ID升序
            provider.setSortKeys(sort);
            reader.setQueryProvider(provider);
            return reader;
        }
    
    
    }//类的打括号
    
    //创建一个类
    @Data
    class User{
        private String id;
        private String name;
        private String age;
    }
    
    //创建一个处理类(将name字母转成大写)
    @Component(value = "nameUpperProcessor")
    class NameUpperProcessor implements org.springframework.batch.item.ItemProcessor<User,User>{
        @Override
        public User process(User item) throws Exception {
            User u= new User();
            u.setId(item.getId());
            u.setName(item.getName().toUpperCase());
            u.setAge(item.getAge());
            return u;
        }
    }
    
    
    //创建一个处理类 (对ID进行过滤)
    @Component(value = "idFilterProcessor")
    class IdFilterProcessor implements org.springframework.batch.item.ItemProcessor<User,User>{
    
        @Override
        public User process(User item) throws Exception {
            int id = Integer.parseInt(item.getId());
            if(id%2 ==0){
                return item;
             }
            return null;
        }
    
    }
    
    
    //输出的类,将从数据库读取的信息 输出到指定文件
    @Configuration
    class FileItemWriterConfig{
    
        @Bean(value = "dbFileWriter")
        public FlatFileItemWriter<User> dbFileWriter () throws Exception {
    
            //将对象User转成字符串输出到文件
            FlatFileItemWriter<User> writer = new FlatFileItemWriter<User>();
            String path="/Users/lvxiaokai/Desktop/wangke-project2020/demo-spring-batch/src/main/resources/user.txt";
            writer.setResource(new FileSystemResource(path));
    
            //把user对象转换成字符串
            writer.setLineAggregator(new LineAggregator<User>() {
              ObjectMapper mapper =  new ObjectMapper();
                @Override
                public String aggregate(User item) {
                    String str = null;
                    try {
                        str =  mapper.writeValueAsString(item);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    return str;
                }
            });
            writer.afterPropertiesSet();
            System.out.println("将数据库读取的数据写入指定的文档success");
            return writer;
        }
    
    }
    

    相关文章

      网友评论

          本文标题:spring batch--> processor

          本文链接:https://www.haomeiwen.com/subject/vdavwhtx.html