封装了个工具类,用于多线程处理列表型数据任务
/**
*
*
* @param threadWorkSize 每个线程处理的数据条数
* @param data 数据列表
* @param function 构建任务的方法
* @param <T>
* @return
*/
public <T> List<Future<Collection<T>>> doTask(Integer threadWorkSize, List<T> data,
Function<List<T>, Callable<Collection<T>>> function) {
log.info("开始执行多线程数据处理任务");
//数据条数
int dataSize = data.size();
// 线程数
int threadNum = dataSize / threadWorkSize + 1;
log.info("线程数为{},数据条数为{}", threadNum, dataSize);
// 定义标记,过滤dataSize / threadSize为整数 (当其为整数时最后一个线程无实际作用)
boolean special = dataSize % threadWorkSize == 0;
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
// 任务集合
List<Callable<Collection<T>>> tasks = new ArrayList<>();
List<T> cutList = null;
for (int i = 0; i < threadNum; i++) {
if (i == threadNum - 1) {
if (special) {
break;
}
cutList = data.subList(threadWorkSize * i, dataSize);
} else {
cutList = data.subList(threadWorkSize * i, threadWorkSize * (i + 1));
}
final List<T> finalList = cutList;
Callable<Collection<T>> task = function.apply(finalList);
// 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
tasks.add(task);
}
List<Future<Collection<T>>> futures = new ArrayList<>();
try {
futures = exec.invokeAll(tasks);
} catch (InterruptedException e) {
log.error("线程池处理失败", e);
}
exec.shutdown();
return futures;
}
网友评论