美文网首页
Java 并发处理任务并汇总结果

Java 并发处理任务并汇总结果

作者: JennTu | 来源:发表于2019-06-10 20:13 被阅读0次

    线程池定义

    @Component("asyncProcessUtil")
    public class AsyncProcessUtil {
    
        private ThreadPoolExecutor dailyDataThreadPoolExecutor;
        private ThreadPoolExecutor dashboardThreadPoolExecutor;
    
        @PostConstruct
        public void init() {
    
            // dailyDataThreadPoolExecutor
            int processors = Runtime.getRuntime().availableProcessors();
            int corePoolSize = processors * 2;
            int maxPoolSize = corePoolSize * 2;
            this.dailyDataThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 1, TimeUnit.MINUTES,
                    new LinkedBlockingQueue<Runnable>(corePoolSize),
                    new ThreadFactory() {
                        private AtomicLong threadNum = new AtomicLong(1);
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "Daily Overview Module Thread-" + threadNum.getAndIncrement());
                        }
                    },
                    new ThreadPoolExecutor.CallerRunsPolicy());
    
            // dashboardThreadPoolExecutor
            int dashboardCorePoolSize = processors * 4;
            int dashboardMaxPoolSize = corePoolSize * 2;
            this.dashboardThreadPoolExecutor = new ThreadPoolExecutor(dashboardCorePoolSize, dashboardMaxPoolSize, 1, TimeUnit.MINUTES,
                    new LinkedBlockingQueue<Runnable>(dashboardCorePoolSize),
                    new ThreadFactory() {
                        private AtomicLong threadNum = new AtomicLong(1);
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "Dashboard Module Thread-" + threadNum.getAndIncrement());
                        }
                    },
                    new ThreadPoolExecutor.CallerRunsPolicy());
    
        }
    
        public <T> Future<T> submitDailyThread(Callable<T> callable) {
            return dailyDataThreadPoolExecutor.submit(callable);
        }
    
        public <T> Future<T> submitDashboardThread(Callable<T> callable) {
            return dashboardThreadPoolExecutor.submit(callable);
        }
    
    }
    

    对于同类型任务

        @Override
        public DadaResponse<FlexContainer> getDashbordData(GatewayRequestContext context) {
    
            List<DashboardCard> elements = new ArrayList<>();
            logger.info("module.getData begin, time : " + LocalDateTime.now().toString());
            List<Future<DashboardCard>> futures = new ArrayList<>();
            for (HomePageModule module : dashboardModules) {
                Future<DashboardCard> future = asyncProcessUtil.submitDashboardThread(new Callable<DashboardCard>() {
                    @Override
                    public DashboardCard call() throws Exception {
                        try {
                            logger.info("module.getData start run, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
                            DashboardCard dashboardCard = (DashboardCard) module.getData(context);
                            logger.info("module.getData success, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
                            return dashboardCard;
                        } catch (Exception e) {
                            logger.error("module.getData exception, clazzName: " + module.getClass().getName(), e);
                            return null;
                        }
                    }
                });
                futures.add(future);
            }
    
            for (Future<DashboardCard> future : futures) {
                try {
                    DashboardCard dashboardCard = future.get();
                    if (dashboardCard != null) {
                        elements.add(dashboardCard);
                    }
                } catch (Exception e) {
                    logger.error("module.getData completionService.take().get() exception", e);
                }
            }
            logger.info("module.getData end, time : " + LocalDateTime.now().toString());
    
            return DadaResponse.success(FlexContainer.builder().elements(DashboardUtil.sort(elements)).build());
        }
    

    对于同类型任务,先完成的优先得到处理

    
            List<DashboardCard> elements = new ArrayList<>();
            ExecutorService threadPool =  Executors.newFixedThreadPool(10);
            CompletionService<DashboardCard> completionService = new ExecutorCompletionService<DashboardCard>(threadPool);
    
            logger.info("module.getData begin, time : " + LocalDateTime.now().toString());
            for (HomePageModule module : dashboardModules) {
                completionService.submit(new Callable<DashboardCard>() {
                    @Override
                    public DashboardCard call() throws Exception {
                        try {
                            DashboardCard dashboardCard = (DashboardCard) module.getData(context);
                            logger.info("module.getData success, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
                            return dashboardCard;
                        } catch (Exception e) {
                            logger.error("module.getData exception, clazzName: " + module.getClass().getName(), e);
                            return null;
                        }
                    }
                });
            }
    
            for(HomePageModule module : dashboardModules){
                try {
                    DashboardCard dashboardCard = completionService.take().get();
                    if (dashboardCard != null) {
                        elements.add(dashboardCard);
                    }
                } catch (Exception e) {
                    logger.error("completionService.take().get() exception", e);
                }
            }
            logger.info("module.getData end, time : " + LocalDateTime.now().toString());
    

    异常处理

    参考:http://www.blogjava.net/xylz/archive/2013/08/05/402405.html

    相关文章

      网友评论

          本文标题:Java 并发处理任务并汇总结果

          本文链接:https://www.haomeiwen.com/subject/jdbtfctx.html