背景
招聘系统中的人才库列表查询接口经常被监控系统监控到,耗时约3500ms,从服务端到web端极端情况下超过5000ms,由于该接口涉及多个表的查询同时查询数据量比较大,查询sql也相对复杂。由于最近线上出现了一次长时间的FGC停顿,导致不得不对整体的逻辑进行梳理。
优化目标
1.人才库列表查询耗时降低到500~1000ms以内
2.涉及到堆内存的缓存统一管理,缓存对象复用
优化方案&技术:多线程+缓存+sql改写
优化思路
1.先进行并行化处理,构建线程池和处理任务进行并发查询。
2.在很多需要进行数据获取和计算的地方进行整合处理,构建本地缓存+异步刷新缓存机制.
3.sql上可能有问题,看能否从其他方面进行sql的改写而不影响原有逻辑
当前现状
服务端耗时统计:
图片1.png
浏览器端耗时统计(中间有一层web端服务,只做参数传递和结果集返回,不做性能统计):
图片2.png
当前代码逻辑service层(伪代码):
由于代码较多,这里只抽取了核心流程来说明当前代码的问题,原代码其实有200行。
public ResultDTO<SplitPageDTO<IntervieweeDTO>> listIntervieweeDTO(
SplitPageDTO<IntervieweeDTO> dtoSplitPage) {
ResultDTO<SplitPageDTO<IntervieweeDTO>> returnDto = new ResultDTO<SplitPageDTO<IntervieweeDTO>>();
try {
Long userId = dtoSplitPage.get("userId"));
String status = dtoSplitPage.get("status");
SplitPageDTO<IntervieweeDTO> listdata ;
if(role == RoleEnum.ADMIN){
query();//这里没有性能问题
}
else if(role == RoleEnum.DEPTMANAGERHRBP.getCode()){
// 如果是部门HR管理员查看所有
//1.第一次与db交互
List<IntervieweeEntity> storage = db.query();
//2.第二次与db交互
List<SetupDeptHrEntity> setupDeptHrEntityList = db.query();
//3.第三次与db交互
List<PositionEntity> positionIdList_1 = db.query();
//4.第四次与db交互
List<PositionEntity> positionIdList_2 = db.query();
//5.第五次与db交互
List<PositionShareEntity> positionIdList_3 = db.query();
//6.第六次与数据库交互
List<DeliveryRecordEntity> intervieweeIdEntityList = db.listIntervieweeIdByPositionIds(StringUtils.join(allPositionId,","));
intervieweeSet.addAll(intervieweeIdEntityList.collect(Collectors.toSet()));
//intervieweeSet为最终的人才ID集合
listdata = db.pageListQuery(dtoSplitPage,StringUtils.join(intervieweeSet,","));
long endTime = System.currentTimeMillis();
logger.info("部门HR管理员查询耗时 = "+(endTime - startTime) +"ms");
}
else{
//这里是部门hr角色需要处理的逻辑,与上面的代码差不多,
}
List<IntervieweeDTO> rows = listdata.getRows();
if (CollectionUtils.isNotEmpty(rows)){
for (IntervieweeDTO intervieweeDTO : rows){
//这里会根据intervieweeDTO中的状态循环读取db
}
}
returnDto.setData(listdata);
return returnDto;
} catch (Exception e) {
logger.error("查询出现异常", e);
returnDto.setErrorCodeMsg("查询失败");
return returnDto;
}
}
当前代码逻辑DAO层(伪代码):
同上,这里只抽取了核心流程来说明当前代码的问题,原代码其实有100行。
public SplitPageDTO<IntervieweeDTO> pageListQuery(
SplitPageDTO<IntervieweeDTO> splitDto, String intervieweeIds)
throws Exception {
String sqlData = " select a.x,,,,,, from " + tableName +" a where 1=1 ";
String sqlCount = " select count(1) from "+ tableName +" a where 1=1 ";
Map<String,String> param=splitDto.getParamMap();
//根据params和Stringbuilder动态拼接多个查询条件
//最后一个拼接条件:这里是最大的坑,后面会讲到
if(StringUtils.isNotBlank(intervieweeIds)){
sqlBuilder.append(" and a.id in (" + intervieweeIds + ")");
sqlCountBuilder.append(" and a.id in (" + intervieweeIds + ")");
}
//7.第七次与db交互
int total = db.getCountBySQL(sqlCountBuilder.toString());
//8.第八次与db交互
List<IntervieweeEntity> returnlist = db.getListBySQL(sqlBuilder.toString());
//bean拷贝
splitDto.setRows(BeanUtil.copyPropertities(IntervieweeDTO.class, returnlist));
//9.第九次与db交互
List<IntervieweeEntity> idleftlist = db.getListBySQL(sqlLeftId);
//10.第十次与db交互
List<IntervieweeEntity> idRightlist = db.getListBySQL(sqlRightId);
//这里查id集合的需求是每次列表页返回的时候要根据列表页中间的那条数据的id往前查5000条,往后查5000条
splitDto.getParamMap().put("totalIds",StringUtils.join(longList,","));
}else {
splitDto.getParamMap().put("totalIds","");
}
return splitDto;
}
进行优化
第一阶段优化(3200ms->2700ms)
优化内容:通过上面的现状可知,一次分页查询会导致最少10次db交互,这就明显的拖慢了查询的速度。因此这一阶段的重点任务是进行并行化,同时将可以缓存的列表对象进行缓存,但是也只缓存重要的字段。这样在每次进行分页查询的时候会减少跟db的交互,直接拿数据计算生成sql需要的数据。
具体操作如下:
- service层具体查询dao层接口之前进行并行化查询
新建人才职位查询服务类(InterviewPositionSearchService),加上spring service注解
创建线程池对象并引入对应的DAO操作对象:
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
创建查询任务类 SearchPositionTask:
/**
* service层查询的任务
* @param <Set>
*/
class SearchPositionTask<Set> implements Callable<java.util.Set<Long>> {
private Long userId;
private int type;
public SearchPositionTask(Long userId,int type) {
this.userId = userId;
this.type = type;
}
@Override
public java.util.Set<Long> call() throws Exception {
if(this.type == 2){
//2、展示一级部门职位下的人才
List<SetupDeptHrEntity> setupDeptHrEntityList = db.query();
}
else if(this.type == 3){
//3、展示我发布的职位人才
List<PositionEntity> positionIdList_2 = db.query();
}
else if(this.type == 4){
//4、分享给我的职位下的人才
List<PositionShareEntity> positionIdList_3 = db.query();
}
else if(this.type == 5){
List<IntervieweeEntity> listPush = db.query();
}
else if(this.type == 6){
//6、部门HR----分享的
List<IntervieweeEntity> listShare = db.query();
}
return null;
}
}
- 创建service层查询的具体方法
/**
* 搜索职位id集合
* @param userId
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public Set<Long> searchForHRManager(Long userId) throws ExecutionException, InterruptedException {
Set<Long> resultSet = new HashSet<>();
Future<Set<Long>> future1 = executor.submit(new SearchPositionTask(userId,2));
Future<Set<Long>> future2 = executor.submit(new SearchPositionTask(userId,3));
Future<Set<Long>> future3 = executor.submit(new SearchPositionTask(userId,4));
resultSet.addAll(future1.get());
resultSet.addAll(future2.get());
resultSet.addAll(future3.get());
return resultSet;
}
/**
* 搜索职位id集合
* @param userId
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public Set<Long> searchForHR(Long userId) throws ExecutionException, InterruptedException {
Set<Long> resultSet = new HashSet<>();
Future<Set<Long>> future5 = executor.submit(new SearchPositionTask(userId,5));
Future<Set<Long>> future6 = executor.submit(new SearchPositionTask(userId,6));
resultSet.addAll(future5.get());
resultSet.addAll(future6.get());
logger.info("searchForHR.resultSet.size = "+resultSet.size());
return resultSet;
}
- 将查询的数据源(职位id,人才id,投递记录id等)进行本地缓存
/**
* 根据状态获取人才id集合
* @param status
* @return
* @throws Exception
*/
public Set<Long> getIntervieweeSet(int status) throws Exception {
Set<Long> interviewIDSet = cache.getInterviewIdMap();
}
/**
* 根据职位id过滤人才id列表
* @param allPositionIdSet
* @return
* @throws Exception
*/
public Set<Long> getIntervieweeIdByPositionIds(Set<Long> allPositionIdSet) throws Exception {
List<KVLongEntity> kvLongEntities = cache.getDeliveryInterviewIDList();
}
- 查询的结果集中包含访问db的逻辑,进行提取
/**
* 人才分页结果集数据整理
* @param rows
* @param status
* @return
* @throws Exception
*/
private List<IntervieweeDTO> getResultList(List<IntervieweeDTO> rows,String status) throws Exception {
long startTime = System.currentTimeMillis();
Set<Long> interviewIdSet = new HashSet<>();
//第一次循环获取要查询db的id
for (IntervieweeDTO intervieweeDTO : rows) {
if (StringUtils.isEmpty(status) && IntervieweeEnum.INPROCESS.getCode() == intervieweeDTO.getStatus()) {
interviewIdSet.add(intervieweeDTO.getId());
}
}
Map<Long,List<DeliveryRecordEntity>> resultMap = new HashMap<>();
if(CollectionUtils.isNotEmpty(interviewIdSet)){
resultMap = db.getHeadhunterDeliveryByInterVieweeIds(StringUtils.join(interviewIdSet,","));
}
for (IntervieweeDTO intervieweeDTO : rows){
if(StringUtils.isEmpty(status)&&IntervieweeEnum.INPROCESS.getCode()==intervieweeDTO.getStatus()){
//List<DeliveryRecordEntity> deliveryRecordEntityLists= deliveryRecordNewDao.getHeadhunterDeliveryByInterVieweeId(intervieweeDTO.getId());
List<DeliveryRecordEntity> deliveryRecordEntityLists = resultMap.get(intervieweeDTO.getId());
if(deliveryRecordEntityLists!=null&&!deliveryRecordEntityLists.isEmpty()){
intervieweeDTO.setStatusDesc(IntervieweeStatusEnum.getName(deliveryRecordEntityLists.get(0).getStatus()));
}
}
}
long endTime = System.currentTimeMillis();
logger.info("数据整理耗时="+(endTime - startTime)+"ms");
return rows;
}
第一阶段服务端统计耗时.png 第一阶段浏览器端耗时统计.png通过上面的4步操作将查询耗时降低了500ms+, 得益于将数据缓存,同时线程池和执行任务对需要并行的地方进行并行化查询,提高查询效率,下面看一下效果:
第二阶段优化(2700ms->1500ms)
优化内容:dao层具体查询db的接口进行并行化查询,并去除bean拷贝逻辑。根据第一阶段优化可知并行化确实可以提高查询效率,由于db也存在并行查询,因此可以对原有的db方法进行重构,满足并行化查询的目的。
在实施此计划之前需要对原有执行逻辑进行拆解,第一步新建InterviewQuerySqlFactory类,专门用来生成动态的sql。这里涉及到一些代码重构,因此也会减少一些代码量。
- 创建InterviewQuerySqlFactory类拆解生成sql的逻辑
/**
* 生成查询sql
* @param splitDto
* @param intervieweeIds
* @return
* @throws Exception
*/
public Map<String, String> getPageQuerySql(
SplitPageDTO<IntervieweeDTO> splitDto, String intervieweeIds) throws Exception {
Map<String, String> resultSqlMap = new HashMap<>(8);
//第一次sql改写,将查询出的字段直接映射到dto上,避免bean拷贝
String sqlData = " select a.x as X ,,," +
"a.position_name as detailChannelName from t_interviewee a where 1=1 ";
String sqlCount = " select count(1) from t_interviewee a where 1=1 ";
Map<String, String> param = splitDto.getParamMap();
StringBuilder sqlBuilder = new StringBuilder(sqlData);
StringBuilder sqlCountBuilder = new StringBuilder(sqlCount);
if (!splitDto.getParamMap().isEmpty()) {
//根据ParamMap动态拼接sql
}
if (StringUtils.isNotBlank(intervieweeIds)) {
sqlBuilder.append(" and a.id in (" + intervieweeIds + ")");
sqlCountBuilder.append(" and a.id in (" + intervieweeIds + ")");
}
resultSqlMap.put("countSql", sqlCountBuilder.toString());
resultSqlMap.put("querySql", sqlBuilder.toString());
//将构造的sql通过map返回
return resultSqlMap;
}
由于还有别的执行sql,因此该类有两个生成sql的工厂方法
/**
* 用于生成查询上一页下一页的sql
* @param countSql 这里的参数依赖了上面的countsql,不过没关系不影响并行性
* @param middleId 这个参数是导致dao层无法并行的决定性因素,因为需要结果集查询出来之后才能确定这个中间的id是多少
* @return
*/
public Map<String,String> getIdRangeSql(String countSql,long middleId){
String tmpSql = countSql;
Map<String,String> resultSqlMap = new HashMap<>();
String sqlLeft = tmpSql .replace("count(1)", "a.id")+" and a.id < "+middleId;
resultSqlMap.put("sqlLeft",sqlLeft);
String sqlRight = tmpSql .replace("count(1)", "a.id")+" and a.id >= "+middleId;
resultSqlMap.put("sqlRight",sqlRight);
return resultSqlMap;
}
- 在InterviewPositionSearchService类中增加ExecuteSqlTask<Map>类
/**
* dao层查询的任务
* @param <Map>
*/
class ExecuteSqlTask<Map> implements Callable<java.util.Map>{
private String sql;
private int type;
public ExecuteSqlTask(String sql,int type){
this.sql = sql;
this.type = type;
}
@Override
public java.util.Map call() throws Exception {
java.util.Map map = new HashMap();
//查询总条数
if(type == 1){
int count = db.getCountBySQL(sql);
}
//查询分页数据
else if(type == 2){
List<IntervieweeDTO> intervieweeDTOList = db.getPageListBySql(sql)
long middleId;
if (intervieweeDTOList.size() == 1) {
middleId = intervieweeDTOList.get(0).getId();
} else {
middleId = intervieweeDTOList.get(intervieweeDTOList.size() / 2).getId();
}
}
}
//查询上一页/下一页数据id集合
else if(type == 3){
long startT = System.currentTimeMillis();
List<Long> longList = new ArrayList<>(1500);
List<IntervieweeEntity> idRangelist = db.getListBySQL(sql);
}
return map;
}
}
3.同样新建一个方法供上层service服务调用
/**
* 并行查询dao接口
* @param splitDto
* @param intervieweeIds
* @return
*/
public SplitPageDTO<IntervieweeDTO> searchPageListParallel(SplitPageDTO<IntervieweeDTO> splitDto, String intervieweeIds) throws Exception {
Map<String,String> sqlMap = interviewQuerySqlFactory.getPageQuerySql(splitDto, intervieweeIds);
String countSql = sqlMap.get("countSql");
String querySql = sqlMap.get("querySql");
long startT = System.currentTimeMillis();
Future<Map> queryCountFuture = executor.submit(new ExecuteSqlTask(countSql,1));
Future<Map> queryPageFuture = executor.submit(new ExecuteSqlTask(querySql,2));
Map<String,List<IntervieweeDTO>> pageFutureMap = queryPageFuture.get();
//设置结果集
splitDto.setRows(pageFutureMap.get("queryList"));
List<IntervieweeDTO> middleIdList = pageFutureMap.get("middleIdList");
Long middleid = null;
if(CollectionUtils.isNotEmpty(middleIdList)){
middleid = middleIdList.get(0).getId();
}
long endT = System.currentTimeMillis();
logger.info("pageListQuery.hr.DAO 查询耗时endT - startT = "+(endT - startT)+"ms");
StringBuilder builder = new StringBuilder("");
if(middleid != null){
long startT2 = System.currentTimeMillis();
Map<String,String> rangeSqlMap = interviewQuerySqlFactory.getIdRangeSql(countSql,middleid);
String leftSql = rangeSqlMap.get("sqlLeft");
String rightSql = rangeSqlMap.get("sqlRight");
Future<Map> queryLeftIDFuture = executor.submit(new ExecuteSqlTask(leftSql,3));
Future<Map> queryRightIDFuture = executor.submit(new ExecuteSqlTask(rightSql,3));
Map<String,String> resultLeftMap = queryLeftIDFuture.get();
builder.append(resultLeftMap.get("resultIds"));
Map<String,String> resultRightMap = queryRightIDFuture.get();
String rightIds = resultRightMap.get("resultIds");
if(!StringUtils.isEmpty(rightIds)){
builder.append(",");
builder.append(rightIds);
}
long endT2 = System.currentTimeMillis();
logger.info("pageIDRangeQuery.hr.DAO 查询耗时endT2 - startT2 = "+(endT2 - startT2)+"ms");
}
//设置上下页id集合
splitDto.getParamMap().put("totalIds",builder.toString());
Map<String,Integer> countFutureMap = queryCountFuture.get();
//设置总数
splitDto.setTotal(countFutureMap.get("queryCount"));
return splitDto;
}
这一阶段优化之后整体代码已经非常清晰了,原有的sql查询方法没动,对本应该在dao层中进行查询的逻辑迁移到了service层,service层中对原有代码和逻辑进行了拆解和重新梳理,使其满足并行化需求,另外也加了一些底层方法来满足缓存列表的复用,下面看一下优化后的效果:
第二阶段服务端耗时统计.png这一阶段搞完之后在service层,dao层,缓存层等都几乎进行了优化,但是为啥还是1500ms,左右呢,而且通过监控平台可知仅仅是报了该接口的慢查询,但是并没有慢sql,所以当前的sql内容肯定有问题,因此还有优化空间。后面又进行了第三阶段的优化工作。
第三阶段优化(1500ms->330ms)
优化内容:这次的优化内容主要有两个方面,下面进行简单的分析一下
- 将上述dao层代码提到的最大的坑sqlBuilder.append(" and a.id in (" + intervieweeIds + ")");进行改写,改为sqlBuilder.append(" and a.id not in (" + intervieweeIds + ")");.
- 前两阶段的优化我们已经将耗时降低了一半,离我们的目标不远了,但是仍然有优化空间,根据之前打印的sql和逻辑可以知道intervieweeIds 这个字符串是interviewee表的id集合(多个逗号分割),而且数量很多。目前我们的线上数据量是19W+,打印出的set<intervieweeId>集合也几乎19w了,因为人才库里面很多90%以上的人才都是已入库的状态(部门hr管理员,部门hr两种角色都可以查已入库的),处于其他状态的人才记录特别少,也就是说原先的sql是这样的:
select a.id,,,,, from t_interviewee as a left join t_delivery_record as b on a.id = b.interviewid where.... and a.id in (19w a.id) and ...limit x,y;
select count(1) from t_interviewee as a left join t_delivery_record as b on a.id = b.interviewid where....and a.id in (19w a.id) and ... ;
由于dao实现层执行了4次查询,所以每次都会有19w a.id 被传输到mysql服务端。上面的sql在线上打印过,因此打印sql可能也是有点耗时的,笑哭。这样的话即使mysql能撑住,人肯定看着受不了。那么这么长的sql执行时间肯定跟网络传输有关,因为sql本身没有报慢查询。我本地进行了一次模拟动态sql,大概上面的sql生成之后有1m多点,也就是说一条sql会发1m的字符串到Mysql服务端。由于实际上dao层面有4条sql,所以一次分页查询,2~3s内会产生4m的流量。
因此可以对sql进行第二次改写来降低网络传输的效率,同时提高mysql的处理速度。下面使用了取反的操作,将缓存好的所有intervieweeId集合与之前并行查出的intervieweeId集合进行差集运算即可得到需要not in的intervieweeId集合。
代码如下:
/**
* 通过id求差集,改写sql
* @param intervieweeSet
* @return
* @throws Exception
*/
public Set<Long> getNotInIdSet(Set<Long> intervieweeSet) throws Exception {
Map<Integer,List<KVEntity>> interviewKVMap = cacheServiceHelper.getInterviewIdMap();
Set<Long> allIdSet = new HashSet<>(intervieweeSet.size()+10000);
interviewKVMap.forEach((k,v)->{
allIdSet.addAll(v.stream().map(KVEntity::getV).collect(Collectors.toSet()));
});
logger.info("allIdSet.size = "+allIdSet.size());
allIdSet.removeAll(intervieweeSet);
logger.info("allIdSet2222.size = "+allIdSet.size());
return allIdSet;
}
那么相应的判定逻辑和生成逻辑也需要简单改写即可:
if(CollectionUtils.isNotEmpty(setList)){
Set<Long> notInIdSet = interviewPositionSearchService.getNotInIdSet(setList);
if(notInIdSet.size() < 5000){
//设置改写sql标识
dtoSplitPage.getParamMap().put("notIn","true");
dtoSplitPage.getParamMap().put("notInIdSet",StringUtils.join(notInIdSet,","));
}
}
String notIn = param.get("notIn");
if(!StringUtils.isEmpty(notIn) && "true".equals(notIn)){
String notInIdStr = param.get("notInIdSet");
sqlBuilder.append(" and a.id not in (" + notInIdStr + ")");
sqlCountBuilder.append(" and a.id not in (" + notInIdStr + ")");
}else {
//走原有逻辑
if (StringUtils.isNotBlank(intervieweeIds)) {
sqlBuilder.append(" and a.id in (" + intervieweeIds + ")");
sqlCountBuilder.append(" and a.id in (" + intervieweeIds + ")");
}
}
第三阶段的改动内容主要有两方面
1.对sql进行改写将a.id in(19w a.id)的部分改成a.id not in(1k a.id)。
2.在serviceimpl层通过split.paramMap对象增加标识表示是否需要进行sql改写。
下面总结一下优化之前与优化之后哪些得到了并行处理,哪些进行了缓存处理,哪些进行了sql的改写
优化前后对比.png
优化后的总结:
1.优化后的代码量(service+dao)相较于优化前的代码量少了一半,因此在写代码的过程中要时刻警惕代码膨胀和代码职责不够单一的情况。
2.确定有性能优化的必要之后要确定性能瓶颈在哪,确定优化目标,出优化方案和相关的技术,评估影响。
3.实施过程中重点打印上下文耗时和参数,同时尽量少改原有代码和逻辑避免出现不必要的错误。
4.类似这种查询可以进行产品层面的优化
- 去掉总条数计算
- 每次分页大小从20改为100
5.在正常进行CRUD(增删改查)来实现业务的同时,我们还可以用多线程并发,缓存等技术来保障复杂业务接口的高性能和稳定性。
网友评论