美文网首页
Java 7 Fork/Join 框架使用

Java 7 Fork/Join 框架使用

作者: PrimaryKeyEnoch | 来源:发表于2019-08-23 23:20 被阅读0次

    最近有个业务是批量导出cognos报表, 由于未开发此功能, 人工导出需要大量的时间消耗, 奔着珍惜时间的使命写了一个导出工具类, 至此在导出的过程中用到了并发请求数, 比如:每秒并发10次,20次等. 工作中使用的Java8并发语法, 在此之前先介绍一下Java7 Fork/Join的框架使用方式.

    之前使用的此框架是一个查询SQL时, 当时一个SQL有28个子语句通过left join 拼接而成, 查询速度为20s,经常卡死, 最后写成并发,28个子语句,分成3批次,每次10个SQL,并行查询,最后通过Java算法拼接成List,从20S变为1.4S左右,性能大大提升.

    下面就开始今天的内容:

    简介

    从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
    这种思想和MapReduce很像(input --> split --> map --> reduce --> output)

    主要有两步:
    第一、任务切分;
    第二、结果合并

    刚刚我介绍的SQL其实就是这样的原理.

    API 介绍

    ForkJoinPool 池子

    ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

    ForkJoinTask 任务

    ForkJoinTask代表运行在ForkJoinPool中的任务。

    主要方法:

    fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
    join() 当任务完成的时候返回计算结果。
    invoke() 开始执行任务,如果必要,等待计算完成。
    子类:

    RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
    RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)

    例子

    private static final ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(100),
            new ThreadFactoryBuilder().setNameFormat("Reports-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.AbortPolicy());
    

    这里先创建了一个多线程任务,意思为:
    这里核心线程数5
    最大线程数5
    blockingQueue 最大size 100, 解释: workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择: ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    reject策略 java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy,意思是由调用线程处理该任务

    另外的策略

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    

    方法调用,模拟一下并发执行SQL拼接表的过程

    public Page<ReportVo> getRepairTaskReport() {
            List<String>ids = Lists.newArrayList("1","2","3","4","5","6","7","8","9");
            //多线程查询列数据
            Set<String> setIds = new HashSet<>();
            for (String id : ids) {
                setIds.add(id);
            }
    
            //整理要查询的列,以后可以做成由前端指定查询哪些column
            List<T> columns = new ArrayList<>();
            columns.add("ID1相关");
            columns.add("ID2相关");
            ...
            columns.add("ID9相关");
    
    
            Map<T, Future<List<Object[]>>> futureMap = new HashMap<>();
            Map<T, Callable<List<Object[]>>> columnCallableMap = getColumnCallableMap(appId, staff, columns, params, queryConditionVo, departIdSet);
            for (Map.Entry<T, Callable<List<Object[]>>> entry : columnCallableMap.entrySet()) {
                futureMap.put(entry.getKey(), executorService.submit(entry.getValue()));
            }
    
            //合并报表
            for (RepairTaskReportColumn column : columns) {
                try {
                    //列数据
                    List<Object[]> columnDataList = futureMap.get(column).get();
                    for (Object[] objects : columnDataList) {
                        String departId = objects[0].toString();
                        String columnData = objects[1].toString();
                        //匹配行数据
                        for (String reportVo : ids) {
                            if (departId.equals(reportVo.getDepartId())) {
                                switch (column) {
                                    case "ID1相关":
                                        // 并行返回ID1相关数据
                                        break;
                                    case "ID2相关":
                                          // 并行返回ID2相关数据
                                        break;
                                    ...
                                     //    
                                }
                            }
                        }
    
                    }
                } catch (Exception e) {
                    e.printStackTrace();
          
                }
            }
            return "最终结果";
        }
    
    
        /**
         * 根据请求的column数据,生成Callable
         *
         * @param columns
         * @param queryConditionVo
         * @return
         */
        private Map<T, Callable<List<Object[]>>> getColumnCallableMap(String ID, List<T> columns, Set<String> departIdSet) {
            Map<T, Callable<List<Object[]>>> columnMap = new HashMap<>();
            for (T column : columns) {
                switch (column) {
                    case "ID1相关SQL查询":
                        columnMap.put("ID1相关", getSQL(ID,T,departIdSet));
                        break;
                    case  "ID2相关SQL查询":
                        columnMap.put("ID2相关", getSQL(ID,T,departIdSet));
                        break;
                    ....
                    default:
                        break;
    
                }
            }
    
            return columnMap;
        }
    
        
            private Callable<List<Object[]>> getSQL(final String ID,  final T column, final Set<String> departIdSet) {
                return new Callable<List<Object[]>>() {
                    @Override
                    public List<Object[]> call() throws Exception {
                        //todo sql query
                        //返回格式:object[0]为departId,object[1]为需要的数据
                        String sql = "select * from table xxxxx";
                        List<Object[]> result = dao.getResult(sql);
                        return result;
                    }
                };
            }
    
    

    以上代码为伪代码,实现的逻辑其实很简单.
    大致逻辑如下
    我有一条SQL,为N个left join 拼接而成,那么我现在就是吧N个left 拆分成N个小SQL,并发执行,那么执行时间缩短为N倍, 然后通过N个SQL查询出的结果,通过相同的属性 再次拼接成业务正确的数据

    就是这样的一个图:


    image.png image.png

    哈哈, 大致就是这样,通过并发执行任务,人工点击的8八小时缩短为10分钟! 是不是很秀呢

    下次讲解JDK8中并发执行的例子,更为简洁

    欢迎小伙伴们留言哦

    相关文章

      网友评论

          本文标题:Java 7 Fork/Join 框架使用

          本文链接:https://www.haomeiwen.com/subject/ukvxectx.html