美文网首页
Java 并发处理任务并汇总结果

Java 并发处理任务并汇总结果

作者: JennTu | 来源:发表于2019-06-10 20:13 被阅读0次

线程池定义

@Component("asyncProcessUtil")
public class AsyncProcessUtil {

    private ThreadPoolExecutor dailyDataThreadPoolExecutor;
    private ThreadPoolExecutor dashboardThreadPoolExecutor;

    @PostConstruct
    public void init() {

        // dailyDataThreadPoolExecutor
        int processors = Runtime.getRuntime().availableProcessors();
        int corePoolSize = processors * 2;
        int maxPoolSize = corePoolSize * 2;
        this.dailyDataThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 1, TimeUnit.MINUTES,
                new LinkedBlockingQueue<Runnable>(corePoolSize),
                new ThreadFactory() {
                    private AtomicLong threadNum = new AtomicLong(1);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "Daily Overview Module Thread-" + threadNum.getAndIncrement());
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy());

        // dashboardThreadPoolExecutor
        int dashboardCorePoolSize = processors * 4;
        int dashboardMaxPoolSize = corePoolSize * 2;
        this.dashboardThreadPoolExecutor = new ThreadPoolExecutor(dashboardCorePoolSize, dashboardMaxPoolSize, 1, TimeUnit.MINUTES,
                new LinkedBlockingQueue<Runnable>(dashboardCorePoolSize),
                new ThreadFactory() {
                    private AtomicLong threadNum = new AtomicLong(1);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "Dashboard Module Thread-" + threadNum.getAndIncrement());
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy());

    }

    public <T> Future<T> submitDailyThread(Callable<T> callable) {
        return dailyDataThreadPoolExecutor.submit(callable);
    }

    public <T> Future<T> submitDashboardThread(Callable<T> callable) {
        return dashboardThreadPoolExecutor.submit(callable);
    }

}

对于同类型任务

    @Override
    public DadaResponse<FlexContainer> getDashbordData(GatewayRequestContext context) {

        List<DashboardCard> elements = new ArrayList<>();
        logger.info("module.getData begin, time : " + LocalDateTime.now().toString());
        List<Future<DashboardCard>> futures = new ArrayList<>();
        for (HomePageModule module : dashboardModules) {
            Future<DashboardCard> future = asyncProcessUtil.submitDashboardThread(new Callable<DashboardCard>() {
                @Override
                public DashboardCard call() throws Exception {
                    try {
                        logger.info("module.getData start run, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
                        DashboardCard dashboardCard = (DashboardCard) module.getData(context);
                        logger.info("module.getData success, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
                        return dashboardCard;
                    } catch (Exception e) {
                        logger.error("module.getData exception, clazzName: " + module.getClass().getName(), e);
                        return null;
                    }
                }
            });
            futures.add(future);
        }

        for (Future<DashboardCard> future : futures) {
            try {
                DashboardCard dashboardCard = future.get();
                if (dashboardCard != null) {
                    elements.add(dashboardCard);
                }
            } catch (Exception e) {
                logger.error("module.getData completionService.take().get() exception", e);
            }
        }
        logger.info("module.getData end, time : " + LocalDateTime.now().toString());

        return DadaResponse.success(FlexContainer.builder().elements(DashboardUtil.sort(elements)).build());
    }

对于同类型任务,先完成的优先得到处理


        List<DashboardCard> elements = new ArrayList<>();
        ExecutorService threadPool =  Executors.newFixedThreadPool(10);
        CompletionService<DashboardCard> completionService = new ExecutorCompletionService<DashboardCard>(threadPool);

        logger.info("module.getData begin, time : " + LocalDateTime.now().toString());
        for (HomePageModule module : dashboardModules) {
            completionService.submit(new Callable<DashboardCard>() {
                @Override
                public DashboardCard call() throws Exception {
                    try {
                        DashboardCard dashboardCard = (DashboardCard) module.getData(context);
                        logger.info("module.getData success, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
                        return dashboardCard;
                    } catch (Exception e) {
                        logger.error("module.getData exception, clazzName: " + module.getClass().getName(), e);
                        return null;
                    }
                }
            });
        }

        for(HomePageModule module : dashboardModules){
            try {
                DashboardCard dashboardCard = completionService.take().get();
                if (dashboardCard != null) {
                    elements.add(dashboardCard);
                }
            } catch (Exception e) {
                logger.error("completionService.take().get() exception", e);
            }
        }
        logger.info("module.getData end, time : " + LocalDateTime.now().toString());

异常处理

参考:http://www.blogjava.net/xylz/archive/2013/08/05/402405.html

相关文章

网友评论

      本文标题:Java 并发处理任务并汇总结果

      本文链接:https://www.haomeiwen.com/subject/jdbtfctx.html