美文网首页
多线程实现批量更新

多线程实现批量更新

作者: c_gentle | 来源:发表于2022-06-26 21:08 被阅读0次

    相信不少开发者在遇到项目对数据进行批量操作的时候,都会有不少的烦恼,尤其是针对数据量极大的情况下,效率问题就直接提上了菜板。

    因此,开多线程来执行批量任务是十分重要的一种批量操作思路,其实这种思路实现起来也十分简单,就拿批量更新的操作举例。
    整体流程如下:


    image.png

    步骤如下:

    • 获取需要进行批量更新的大集合 A,对大集合进行拆分操作,分成 N 个小集合 A-1 ~ A-N 。
    • 开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作。
    • 对流程进行控制,控制线程执行顺序。
      按照指定大小拆分集合的工具类:
     public static <T> List<List<T>> split(List<T> resList, int subListLength) {
            if (resList.isEmpty() || subListLength <= 0) {
                return new ArrayList<>();
            }
            List<List<T>> ret = new ArrayList<>();
            int size = resList.size();
            if (size <= subListLength) {
                // 数据量不足 subListLength 指定的大小
                ret.add(resList);
            } else {
                int pre = size / subListLength;
                int last = size % subListLength;
                // 前面pre个集合,每个大小都是 subListLength 个元素
                for (int i = 0; i < pre; i++) {
                    List<T> itemList = new ArrayList<>();
                    for (int j = 0; j < subListLength; j++) {
                        itemList.add(resList.get(i * subListLength + j));
                    }
                    ret.add(itemList);
                }
                //last的处理
                if (last > 0) {
                    List<T> itemList = new ArrayList<>();
                    for (int i = 0; i < last; i++) {
                        itemList.add(resList.get(pre * subListLength + i));
                    }
                    ret.add(itemList);
                }
    
            }
            return ret;
        }
    

    开启异步执行任务的线程池:

    
        //开启多线程
        public void threadMethod() {
            List<T> updateList = new ArrayList();
            //初始化线程池
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
            // 大集合拆分成N个小集合, 这里集合的size可以稍微小一些(这里我用100刚刚好), 以保证多线程异步执行, 过大容易回到单线程
            List<T> splitNList = SplitListUtils.split(totalList, 100);
            // 记录单个任务的执行次数
            CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
            // 对拆分的集合进行批量处理, 先拆分的集合, 再多线程执行
            splitNList.stream().forEach(o -> {
                //线程池执行
                threadPool.execute(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (Entity yangshiwen : singleList) {
                            // 将每一个对象进行数据封装, 并添加到一个用于存储更新数据的list
                            // ......
                        }
                    }
                }));
                // 任务个数 - 1, 直至为0时唤醒await()
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            // 通过mybatis的批量插入的方式来进行数据的插入, 这一步还是要做判空
            if (!updateList.isEmpty()) {
                batchUpdateEntity(updateList);
            }
    
        }
    

    相关文章

      网友评论

          本文标题:多线程实现批量更新

          本文链接:https://www.haomeiwen.com/subject/cnumvrtx.html