这里使用CountDownLatch 来使主线程等待线程池中的线程执行完毕。
/**
* 操作日志线程类
* @author liguobao
*
*/
@Service("operationHistoryData")
public class OperationHistoryData {
private static final Logger log = LoggerFactory.getLogger(OperationHistoryService.class);
private CountDownLatch threadsSignal;
//每个线程处理的数据量
private static final int count=1000;
@Autowired
OperationHistoryMapper operationHistoryMapper;
//定义线程池数量为8,每个线程处理1000条数据
private static ExecutorService execPool = Executors.newFixedThreadPool(8);
/**
* 多线程批量执行插入,百万数据需要大约不到20秒 64位4核处理
* @param request
* @return
*/
public String batchAddData(OperationHistortRequest request) {
//存放每个线程的执行数据
//List<OperationHistoryModel> newlist = null;
OperationHistortResponse response=new OperationHistortResponse();
//需要插入数据库的数据
List<OperationHistoryModel> limodel=request.getOperationHistoryLi();
try {
if(limodel.size()<=count) {
threadsSignal=new CountDownLatch(1);
execPool.submit(new InsertDate(limodel));
}else {
List<List<OperationHistoryModel>> li=createList(limodel, count);
threadsSignal=new CountDownLatch(li.size());
for(List<OperationHistoryModel> liop:li) {
execPool.submit(new InsertDate(liop));
}
}
threadsSignal.await();
response.setCode(0);
response.setMsg("success");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
response.setCode(0);
response.setMsg(e.getMessage());
log.error(e.toString() + " 错误所在行数:" + e.getStackTrace()[0].getLineNumber());
}
return JSON.toJSONString(response, SerializerFeature.WriteMapNullValue);
}
/**
* 数据拆分
* @param targe
* @param size
* @return
*/
public static List<List<OperationHistoryModel>> createList(List<OperationHistoryModel> targe,int size) {
List<List<OperationHistoryModel>> listArr = new ArrayList<List<OperationHistoryModel>>();
//获取被拆分的数组个数
int arrSize = targe.size()%size==0?targe.size()/size:targe.size()/size+1;
for(int i=0;i<arrSize;i++) {
List<OperationHistoryModel> sub = new ArrayList<OperationHistoryModel>();
//把指定索引数据放入到list中
for(int j=i*size;j<=size*(i+1)-1;j++) {
if(j<=targe.size()-1) {
sub.add(targe.get(j));
}
}
listArr.add(sub);
}
return listArr;
}
/**
* 内部类,开启线程批量保存数据
* @author liguobao
*
*/
class InsertDate extends Thread{
List<OperationHistoryEntity> lientity=new ArrayList<OperationHistoryEntity>();
public InsertDate(List<OperationHistoryModel> limodel){
limodel.forEach((model)->{
OperationHistoryEntity oper=model.getOperationHistoryEntity(model);
lientity.add(oper);
});
}
public void run() {
operationHistoryMapper.addOperationHistory(lientity);
threadsSignal.countDown();
}
}
}
网友评论