痛点
在实际工作当中,我们经常会用到集合来处理数据,不可避免的要在内存中去遍历这些集合,如果集合中数据量小则计算很快,但是如果数据量很大我们就会发现遍历一次稍微加点自己的业务逻辑,就慢的不行,正是因为有这样的痛点,所以才出现了本篇文章,主要是将集合分割之后利用多线程来异步处理自己的业务逻辑。
最终目的
根据使用者给出的行数自动分出线程来执行,例如500条数据1个线程, 实现分割集合和多线程处理并不难,这里我将处理集合和自定义的业务逻辑分离了出来,使用者只需要去实现自己业务逻辑接口
具体实现:
- ThreadFactoryUtil 线程池的一个工具类
- ListUtil 集合工具类,里面有具体的工具方法
- IThreadSplitList 定义的实现自定义逻辑的接口
- TestThreadSplictListImpl 随便写的一个表示能处理自定义逻辑的接口实现类
废话不多说,直接上代码,先写个线程池
ThreadFactoryUtil 线程池工具类
package com.platform.cfs.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
public class ThreadFactoryUtil {
private static ExecutorService pool = null;
private ThreadFactoryUtil() {}
public static ExecutorService getThreadFactoryUtilInstance(int threadNum){
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("splitlist-pool-%d").build();
if(pool == null){
pool = new ThreadPoolExecutor(threadNum,threadNum,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),factory,new ThreadPoolExecutor.AbortPolicy());
}
return pool;
}
}
listUtil 工具类的代码实现:
package com.platform.cfs.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@Slf4j
public class listUtil {
public static void threadSplitList(IThreadSplitList threadSplitList, List<?> list, int size, Object... objs){
if(Utils.isEmpty(list)){
return;
}
// 如果size为空则默认给个数值 默认500条数据开启一个线程
if(size<=0){
size = 500;
}
// 记录开始时间
long time = System.currentTimeMillis();
// 总数据条数
int dataSize = list.size();
// 线程数
int threadNum = dataSize / size + 1;
// 定义标记
boolean special = dataSize % size == 0;
// 创建线程池
ExecutorService exec = ThreadFactoryUtil.getThreadFactoryUtilInstance(threadNum);
try {
// 定义任务集合
List<Callable<Object>> tasks = new ArrayList<>();
Callable task = null;
List<?> cutList = null;
// 分割集合数据,放入对应的线程中
for (int i = 0; i < threadNum; i++) {
// 判断是否是最后一个线程
if(i == threadNum-1){
if(special){
break;
}
cutList = list.subList(size * i,dataSize);
}else{
cutList = list.subList(size * i,size * (i + 1));
}
final List<?> tempList = cutList;
// 创建任务
task = new Callable() {
@Override
public Object call() throws Exception {
return threadSplitList.execMethod(tempList,objs);
}
};
tasks.add(task);
}
exec.invokeAll(tasks);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(log.isDebugEnabled()){
log.debug("线程任务执行结束,执行任务消耗了:{}毫秒",(System.currentTimeMillis()-time));
}
}
}
}
通过上述的代码大家也注意到了在具体的任务中有这么一行代码:threadSplitList.execMethod(tempList,objs)
目的就是让调用方自己去实现自己要处理的业务逻辑实现类。
TestThreadSplictListImpl是我写的一个简单的业务逻辑实现类
package com.platform.cfs.test;
import com.platform.cfs.utils.IThreadSplitList;
import java.util.List;
import java.util.Map;
public class TestThreadSplictListImpl implements IThreadSplitList<String> {
@Override
public Object execMethod(List<String> list, Object... objs) {
System.out.println(Thread.currentThread().getName() + "线程:" + list);
// 意思一下 表示处理自己的业务逻辑
// 可以通过传递过来的参数来做自己想要处理的业务,注意需要考虑并发场景,map最好用线程安全的map
Map<String,String> map = (Map<String,String>)objs[0];
for (String s : list) {
map.put(s,s);
}
return null;
}
}
最后,直接看一下如何调用的吧
package com.platform.cfs.test;
import com.platform.cfs.utils.listUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws Exception {
// 假设这个map是处理自己业务逻辑的参数,可以写很多个 注意:如果是容器的话需要考虑并发场景
Map<String,String> map = new ConcurrentHashMap<>();
// 模拟数据List
List<String> list = new ArrayList<>();
for (int i = 1; i <= 3200; i++) {
list.add(i + "");
}
// 实际业务可以进行注入不同的接口实现类 来处理不同的业务逻辑
TestThreadSplictListImpl testThreadSplictList = new TestThreadSplictListImpl();
// 调用分割集合方法,第一个参数是处理业务逻辑的实现类,第二个是要分割的集合,第三个多少数据进行分割,后面的是处理自己业务用的参数
listUtil.threadSplitList(testThreadSplictList,list,500,map);
}
}
有图有真相.png
最后再来个比对图,看看同样是一千万条数据 单线程跑和多线程跑的差距。
正常处理.png
多线程处理.png
当然实际项目当中要处理的业务逻辑肯定不会这么简单只是放入map中,估计大多数情况不是动缓存就是动数据库或者构建某些特殊的对象等....肯定会更慢,有了这个应该可以提高一些代码执行效率的。
网友评论