线程池定义
@Component("asyncProcessUtil")
public class AsyncProcessUtil {
private ThreadPoolExecutor dailyDataThreadPoolExecutor;
private ThreadPoolExecutor dashboardThreadPoolExecutor;
@PostConstruct
public void init() {
// dailyDataThreadPoolExecutor
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = processors * 2;
int maxPoolSize = corePoolSize * 2;
this.dailyDataThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(corePoolSize),
new ThreadFactory() {
private AtomicLong threadNum = new AtomicLong(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Daily Overview Module Thread-" + threadNum.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy());
// dashboardThreadPoolExecutor
int dashboardCorePoolSize = processors * 4;
int dashboardMaxPoolSize = corePoolSize * 2;
this.dashboardThreadPoolExecutor = new ThreadPoolExecutor(dashboardCorePoolSize, dashboardMaxPoolSize, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(dashboardCorePoolSize),
new ThreadFactory() {
private AtomicLong threadNum = new AtomicLong(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Dashboard Module Thread-" + threadNum.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy());
}
public <T> Future<T> submitDailyThread(Callable<T> callable) {
return dailyDataThreadPoolExecutor.submit(callable);
}
public <T> Future<T> submitDashboardThread(Callable<T> callable) {
return dashboardThreadPoolExecutor.submit(callable);
}
}
对于同类型任务
@Override
public DadaResponse<FlexContainer> getDashbordData(GatewayRequestContext context) {
List<DashboardCard> elements = new ArrayList<>();
logger.info("module.getData begin, time : " + LocalDateTime.now().toString());
List<Future<DashboardCard>> futures = new ArrayList<>();
for (HomePageModule module : dashboardModules) {
Future<DashboardCard> future = asyncProcessUtil.submitDashboardThread(new Callable<DashboardCard>() {
@Override
public DashboardCard call() throws Exception {
try {
logger.info("module.getData start run, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
DashboardCard dashboardCard = (DashboardCard) module.getData(context);
logger.info("module.getData success, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
return dashboardCard;
} catch (Exception e) {
logger.error("module.getData exception, clazzName: " + module.getClass().getName(), e);
return null;
}
}
});
futures.add(future);
}
for (Future<DashboardCard> future : futures) {
try {
DashboardCard dashboardCard = future.get();
if (dashboardCard != null) {
elements.add(dashboardCard);
}
} catch (Exception e) {
logger.error("module.getData completionService.take().get() exception", e);
}
}
logger.info("module.getData end, time : " + LocalDateTime.now().toString());
return DadaResponse.success(FlexContainer.builder().elements(DashboardUtil.sort(elements)).build());
}
对于同类型任务,先完成的优先得到处理
List<DashboardCard> elements = new ArrayList<>();
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletionService<DashboardCard> completionService = new ExecutorCompletionService<DashboardCard>(threadPool);
logger.info("module.getData begin, time : " + LocalDateTime.now().toString());
for (HomePageModule module : dashboardModules) {
completionService.submit(new Callable<DashboardCard>() {
@Override
public DashboardCard call() throws Exception {
try {
DashboardCard dashboardCard = (DashboardCard) module.getData(context);
logger.info("module.getData success, clazzName: " + module.getClass().getName() + ", time s: " + LocalDateTime.now().toString());
return dashboardCard;
} catch (Exception e) {
logger.error("module.getData exception, clazzName: " + module.getClass().getName(), e);
return null;
}
}
});
}
for(HomePageModule module : dashboardModules){
try {
DashboardCard dashboardCard = completionService.take().get();
if (dashboardCard != null) {
elements.add(dashboardCard);
}
} catch (Exception e) {
logger.error("completionService.take().get() exception", e);
}
}
logger.info("module.getData end, time : " + LocalDateTime.now().toString());
异常处理
参考:http://www.blogjava.net/xylz/archive/2013/08/05/402405.html
网友评论