因正常的线程方式Thread、Runnable方式因不能获取返回值,所以采用callable实现该功能。
1、创建对应的线程类实现Callable接口
class CallableThread implements Callable<Map<String, Object>> {
int i;
int total;
String orgId;
String userId;
String queryType;
String quarter;
private CustomerMarketingService marketingService;
private Map<String, Object> map = new HashMap<>();
private final CountDownLatch latch ;
public CallableThread(int i,int total, String orgId,String userId,
String queryType,String quarter,
CustomerMarketingService marketingService, CountDownLatch latch) {
this.i = i;
this.total = total;
this.orgId = orgId;
this.userId = userId;
this.queryType = queryType;
this.quarter = quarter;
this.marketingService = marketingService;
this.latch = latch;
}
@Override
public Map<String, Object> call() throws Exception {
try{
String time="";
String resultTime="";
if("day".equals(queryType)){
time = DateUtils.getDay(i-total+1);
resultTime=time.substring(5,time.length());
}
else if ("month".equals(queryType)) {
time = DateUtils.getLastMonth(total-i-1, new Date());
resultTime=time;
} else if ("quarter".equals(queryType)) {
time = DateUtils.getLastMonth((total-i-1) * 3, new Date());
quarter = DateUtils.getSeason(time);
String year = DateUtils.getYear(time);
resultTime = year + "年" + quarter + "季度";
}else if ("year".equals(queryType)) {
time = DateUtils.getLastMonth((total-i-1) * 12, new Date());
String year = DateUtils.getYear(time);
resultTime = year + "年";
}
//客户营销统计-(新客户/总数+具体客户营销方式统计)
int newCount =0;
int allCount =0;
//EntityWrapper<CustomerMarketingEntity> ew = new EntityWrapper<>();
Map<String,Object> params = new HashMap<>();
if("day".equals(queryType)){
//ew.between("create_time", time+" 00:00:00",time+" 23:59:59");
params.put("beginTime",time+" 00:00:00");
params.put("endTime",time+" 23:59:59");
}else{
Date beginTime = DateUtils.getBeginTime(queryType, time, quarter);
Date endTime = DateUtils.getEndTime(queryType, time, quarter);
params.put("beginTime",DateUtils.format(beginTime)+" 00:00:00");
params.put("endTime",DateUtils.format(endTime)+" 23:59:59");
//ew.between("create_time", DateUtils.format(beginTime)+" 00:00:00",DateUtils.format(endTime)+" 23:59:59");
}
if (UtilValidate.isNotEmpty(orgId)) {
//拼接权限的查询条件
params.put("orgId",orgId);
//ew.addFilter("FIND_IN_SET(create_org_id,getChildrenOrg({0}))", orgId);
}
List<CustomerMarketCount> list = marketingService.selectCountAONList(params);
for(CustomerMarketCount count:list){
if("0".equals(count.getCode())){
newCount=count.getCount();
}
allCount=allCount+count.getCount();
}
//allCount = marketingService.selectCountM(params);
//params.put("isNew","0");
//ew.eq("is_new", "0");
//newCount = marketingService.selectCountM(params);
// 新客户
map.put("resultTime", resultTime);
map.put("time", resultTime);
map.put("allCount", allCount);
map.put("newCount", newCount);
double ratio;
if(allCount!=0){
ratio=(double)newCount/allCount;
map.put("ratio", CountUtil.formatDouble(ratio));
}else{
map.put("ratio", 0.0);
}
}catch(Exception e){
e.printStackTrace();
}finally{
latch.countDown();
}
return map;
}
}
2、具体的业务逻辑中,通过线程池调用该线程
List<Map<String, Object>> mapList = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(total);
ExecutorService pool=Executors.newFixedThreadPool(5);
List list=new ArrayList();
try {
for (int i = 0; i < total; i++) {//生长轨迹
Callable<Map<String, Object>> c1 = new CallableThread(i,total,orgId,userId,queryType,quarter,this,latch);
Future<Map<String,Object>> f1=pool.submit(c1);
list.add(f1);
}
latch.await(); //等待所有线程执行完成后 对返回值进行合并处理
for(Object o:list){
Future<Map<String,Object>> f1=(Future<Map<String, Object>>) o;
mapList.add(f1.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return Result.error("未知异常");
}
通过线程池调用对应的线程,具体某个线程通过传值后处理单独的业务逻辑后,latch.await()等待所有的线程执行完成后,针对返回值进行数据合并后返回给前台。
网友评论