美文网首页多线程Java后端
11、Spring 多线程 ThreadPoolTaskExec

11、Spring 多线程 ThreadPoolTaskExec

作者: 俊果果 | 来源:发表于2019-12-11 08:58 被阅读0次

    继续上集SSM集成swagger 和 log4j,这次需要实现在service里面并行插入1000条数据,在全部完成后返回结果

    一、添加 'ThreadPoolTaskExecutor' Bean

    1、新增配置类ExcutorConfig

    @EnableAsync
    @Configuration
    public class ExcutorConfig {
        private static Logger logger = Logger.getLogger(ExcutorConfig.class);
    
        @Bean(name = "asyncServiceExecutor")
        public Executor asyncServiceExecutor() {
            logger.info("start executor -->");
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //设置核心线程数
            executor.setCorePoolSize(50);
            //设置最大线程数
            executor.setMaxPoolSize(300);
            //设置队列大小
            executor.setQueueCapacity(300);
            //配置线程池的前缀
            executor.setThreadNamePrefix("async-service-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //设置空闲时间
            executor.setKeepAliveSeconds(60);
            //进行加载
            executor.initialize();
            return executor;
        }
    }
    

    这里故意把线程池的数目设置的比较大
    注意: 这里需要在 spring-mvc的配置中把config类的报名注册到component-scan里面,修改后如下:

    image.png

    2、如果不想通过代码,则在spring-service中新增如下配置即可

    <bean id="asyncServiceExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
             <!-- 核心线程数 -->
            <property name="corePoolSize" value="50" />
            <!-- 最大线程数 -->
            <property name="maxPoolSize" value="300" />
            <!-- 队列最大长度 >=mainExecutor.maxSize -->
            <property name="queueCapacity" value="300" />
            <!-- 线程池维护线程所允许的空闲时间 -->
            <property name="keepAliveSeconds" value="60" />
            <!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.  -->
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
            </property>
    </bean>
    

    二、编写service

    1、修改RoleService和实现类,添加如下接口

    @Override
        public int insertRole(Role role) {
            return roleMapper.insert(role);
        }
    

    接受一个 Role 对象,直接插入到数据表中

    2、新增一个TaskExecuterTestService,用于实现并行插入N条记录

    接口

    public interface TaskExecuterTestService {
    
         void insertRoles(List<Role> roles);
    }
    

    实现类

    @Service
    public class TaskExecuterTestServiceImpl implements TaskExecuterTestService {
    
        private Logger logger = Logger.getLogger(this.getClass());
    
        @Resource(name = "asyncServiceExecutor")
        private ThreadPoolTaskExecutor taskExecutor;
    
        @Autowired
        private ApplicationContext applicationContext;
    
        @Override
        public void insertRoles(List<Role> roles) {
            CountDownLatch countDownLatch = new CountDownLatch(1000);
            for (int i = 0; i < roles.size(); i++) {
                Role role = roles.get(i);
                taskExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RoleService roleService = applicationContext.getBean(RoleService.class);
                            int ret = roleService.insertRole(role);
                            System.out.println("插入Role[" + role.getRoleName()+"]结果: " + ret + ",    当前线程id:  " + Thread.currentThread().getId());
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            countDownLatch.countDown();  //这个不管是否异常都需要数量减,否则会被堵塞无法结束
                        }
                    }
                });
            }
            try {
                countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
                // 这样就可以在下面拿到所有线程执行完的集合结果
                System.out.println("all roles insert done..........................................................");
            } catch (Exception e) {
                logger.error("阻塞异常");
            }
        }
    }
    

    注意事项

    • 因为想要每个最终执行插入的逻辑由不同的数据库交互service去做,所以这里需要手动在每个Task里面去获取独立的service bean
      image.png
    • 使用CountDownLatch类来实现主线程的等待,在所有子线程工作完成前,主线程会一直等待在下面位置:
      image.png
      CountDownLatch类的详解可以参考文章CountDownLatch详解
    • 这里用的是@Resource来注入的ThreadPoolTaskExecutorbean,是采用的类配置方式;若使用的xml,可以直接用@Autowired即可

    三、编写Controller

    新增一个TaskExecuterTestController用作入口

    @Controller
    @RequestMapping("/task")
    @Api(value = "/task", tags = {"TaskExecuter测试"})
    public class TaskExecuterTestController {
    
        @Autowired
        private TaskExecuterTestService taskExecuterTestService;
    
        @RequestMapping(value = "/doTest", method = RequestMethod.GET)
        @ResponseBody
        public String doTest() {
            StopWatch sc = new StopWatch();
            sc.start();
            SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
            List<Role> roles = new ArrayList<Role>();
            int totalCount = 100000;
            for (int i = 0; i < totalCount; i++) {
                Role role = new Role();
                role.setId(UUID.randomUUID().toString());
                role.setRoleName("testRole_" + i);
                role.setNote(ft.format(new Date()));
                roles.add(role);
            }
            taskExecuterTestService.insertRoles(roles);
            sc.stop();
            return "测试完成,插入数据 "+totalCount+" 条,总耗时 " + sc.getTotalTimeMillis() + " 毫秒";
        }
    }
    

    逻辑很简单,生成100000个role对象,然后调用service并行插入数据库,返回总耗时

    四、测试

    1、Swagger直接运行

    初始数据库记录如下:


    image.png

    2、Swagger运行结果

    image.png
    插入数据10万条,总耗时 25 s

    3、IDEA log

    image.png

    4、数据库记录

    image.png

    五、本次修改代码变更

    Github-Commit spring多线程 TaskExecuter测试

    相关文章

      网友评论

        本文标题:11、Spring 多线程 ThreadPoolTaskExec

        本文链接:https://www.haomeiwen.com/subject/ouangctx.html