通过枚举类来维护项目中的线程池信息,每次调用时,并不new对象,而是使用缓存的线程池对象。
代码实现:
@Slf4j
public enum ThreadPoolEnum {
PRE_SIGN_URL_THREAD_POOL(new ThreadPoolExecutor(10,
10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),
new NamedThreadFactory("pre-sign"), new ThreadPoolExecutor.CallerRunsPolicy()), "的线程池");
public static Map<ThreadPoolEnum, ExecutorService> cache = new ConcurrentHashMap<>();
static {
//放入缓存
for (ThreadPoolEnum threadPoolEnum : values()) {
cache.put(threadPoolEnum, threadPoolEnum.executor);
}
cache = Collections.unmodifiableMap(cache);
}
private ExecutorService executor;
private String desc;
ThreadPoolEnum(ExecutorService executor, String desc) {
this.executor = executor;
this.desc = desc;
}
/**
* 获取线程池。
* 被sw装饰的线程池
*/
public ExecutorService getExecutor() {
return SkyWalkingExecutors.getSkyWalkingExecutorService(cache.get(this));
}
/**
* 任务的批量处理,此方法会对异常进行处理,保证输入和输出的数据是有序的。
*
* @param sources 源数据
* @param function 策略方法
* @param executor 线程池
* @param <T> 请求数据的泛型
* @param <R> 响应数据的泛型
* @return 并发执行的最终结果
*/
public static <T, R> List<BatchFutureResult<T, R>> futureAllFunction(List<T> sources,
Function<T, R> function,
ExecutorService executor) {
return futureAllFunction(sources, function, null, executor);
}
/**
*
* 任务的批量处理,此方法会对异常进行处理,保证输入和输出的数据是有序的。
*
* @param sources 源数据
* @param function 策略方法
* @param timeout 失效时间,为空即不设置
* @param executor 线程池
* @param <T> 请求数据的泛型
* @param <R> 响应数据的泛型
* @return 并发执行的最终结果
*/
public static <T, R> List<BatchFutureResult<T, R>> futureAllFunction(List<T> sources,
Function<T, R> function,
Long timeout,
ExecutorService executor) {
/**
* 获取FutureTask任务,并放入到线程池中。
*/
List<FutureTask<BatchFutureResult<T, R>>> futureTasks = sources.stream().map(t -> {
FutureTask<BatchFutureResult<T, R>> resultFutureTask = new FutureTask<>(() -> {
BatchFutureResult<T, R> batchFutureResult = new BatchFutureResult<>();
R apply = null;
Integer status = StatusEnum.SUCCESS.getStatus();
try {
apply = function.apply(t);
} catch (Exception e) {
log.error("批量任务异常", e);
batchFutureResult.setMessage(ExceptionUtil.getLogErrorMessage(e));
status = StatusEnum.ERROR.getStatus();
}
batchFutureResult.setSource(t);
batchFutureResult.setResult(apply);
batchFutureResult.setStatus(status);
return batchFutureResult;
});
//放入线程池等待处理
executor.submit(resultFutureTask);
return resultFutureTask;
}).collect(Collectors.toList());
List<BatchFutureResult<T, R>> results = new ArrayList<>();
//循环获取结果
for (int i = 0; i < sources.size(); i++) {
//获取数据
T source = sources.get(i);
FutureTask<BatchFutureResult<T, R>> futureTask = futureTasks.get(i);
//抓取结果
BatchFutureResult<T, R> batchFutureResult = new BatchFutureResult<>();
try {
if (timeout == null) {
batchFutureResult = futureTask.get();
} else {
batchFutureResult = futureTask.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("", e);
batchFutureResult.setStatus(StatusEnum.ERROR.getStatus());
batchFutureResult.setMessage(ExceptionUtil.getLogErrorMessage(e));
batchFutureResult.setSource(source);
}
//放入数据
results.add(batchFutureResult);
}
return results;
}
}
项目启动后,将枚举维护的线程池对象放入到缓存中,后续调用均在缓存中抓取。
网友评论