工具类:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
public class MulitThreadUtils {
Logger logger = LoggerFactory.getLogger(MulitThreadUtils.class);
/**
* 线程资源池,io频繁线程
*/
public static ThreadLocal<ExecutorService> ANSYNCHTHREADPOOL = new ThreadLocal<ExecutorService>() {
public ExecutorService get() {
if (super.get() == null) {
super.set(new ThreadPoolExecutor(0, 20, 1, TimeUnit.HOURS, new SynchronousQueue<>()));
}
return super.get();
}
};
/**
* 提交异步执行
*
* @param callables
* @param oneThreadTimeoutSecond
* @return
*/
public static <T> List<T> submits(Collection<Callable<T>> callables, int oneThreadTimeoutSecond) {
try {
if (callables == null || callables.isEmpty()) {
return Collections.emptyList();
}
//单线程
if (callables.size() == 1) {
return Arrays.asList(callables.iterator().next().call());
}
List<T> result = new ArrayList<>();
if (oneThreadTimeoutSecond == 0) {//串行执行
callables.forEach((item) -> {
try {
T call = item.call();
result.add(call);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("seralizable execute");
} else {//并发执行
ExecutorService executorService = ANSYNCHTHREADPOOL.get();
List<Future<T>> futures = executorService.invokeAll(callables, oneThreadTimeoutSecond, TimeUnit.SECONDS);
futures.stream().forEach(item -> {
try {
result.add(item.get());
} catch (Exception e) {
result.add(null);
e.printStackTrace();
}
});
}
return result;
} catch (Exception e) {
e.printStackTrace();
}
return Collections.emptyList();
}
}
使用方式:
class QueryMediaKb implements Callable<PageInfo<Map<String, String>>> {
private MediaKbSearch mks;
private Map<String, Object> searchCondition;
public QueryMediaKb(MediaKbSearch mks,Map<String, Object> searchCondition) {
this.mks = mks;
this.searchCondition = searchCondition;
}
@Override
public PageInfo<Map<String, String>> call() throws Exception {
return mks.searchMediaKb(searchCondition);
}
}
List<Callable<PageInfo<Map<String, String>>>> callables=new ArrayList<>();
for (MediaKbSearch mks : kbSearchList) {
callables.add(new QueryMediaKb(mks,searchCondition));
}
List<PageInfo<Map<String, String>>> mediaKbSearchResultList = MulitThreadUtils.submits(callables,5);
网友评论