相信不少开发者在遇到项目对数据进行批量操作的时候,都会有不少的烦恼,尤其是针对数据量极大的情况下,效率问题就直接提上了菜板。
因此,开多线程来执行批量任务是十分重要的一种批量操作思路,其实这种思路实现起来也十分简单,就拿批量更新的操作举例。
整体流程如下:
image.png
步骤如下:
- 获取需要进行批量更新的大集合 A,对大集合进行拆分操作,分成 N 个小集合 A-1 ~ A-N 。
- 开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作。
- 对流程进行控制,控制线程执行顺序。
按照指定大小拆分集合的工具类:
public static <T> List<List<T>> split(List<T> resList, int subListLength) {
if (resList.isEmpty() || subListLength <= 0) {
return new ArrayList<>();
}
List<List<T>> ret = new ArrayList<>();
int size = resList.size();
if (size <= subListLength) {
// 数据量不足 subListLength 指定的大小
ret.add(resList);
} else {
int pre = size / subListLength;
int last = size % subListLength;
// 前面pre个集合,每个大小都是 subListLength 个元素
for (int i = 0; i < pre; i++) {
List<T> itemList = new ArrayList<>();
for (int j = 0; j < subListLength; j++) {
itemList.add(resList.get(i * subListLength + j));
}
ret.add(itemList);
}
//last的处理
if (last > 0) {
List<T> itemList = new ArrayList<>();
for (int i = 0; i < last; i++) {
itemList.add(resList.get(pre * subListLength + i));
}
ret.add(itemList);
}
}
return ret;
}
开启异步执行任务的线程池:
//开启多线程
public void threadMethod() {
List<T> updateList = new ArrayList();
//初始化线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
// 大集合拆分成N个小集合, 这里集合的size可以稍微小一些(这里我用100刚刚好), 以保证多线程异步执行, 过大容易回到单线程
List<T> splitNList = SplitListUtils.split(totalList, 100);
// 记录单个任务的执行次数
CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
// 对拆分的集合进行批量处理, 先拆分的集合, 再多线程执行
splitNList.stream().forEach(o -> {
//线程池执行
threadPool.execute(new Thread(new Runnable() {
@Override
public void run() {
for (Entity yangshiwen : singleList) {
// 将每一个对象进行数据封装, 并添加到一个用于存储更新数据的list
// ......
}
}
}));
// 任务个数 - 1, 直至为0时唤醒await()
countDownLatch.countDown();
});
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
// 通过mybatis的批量插入的方式来进行数据的插入, 这一步还是要做判空
if (!updateList.isEmpty()) {
batchUpdateEntity(updateList);
}
}
网友评论