背景
最近一次上线,对原有的同步任务执行加入了队列挤压、任务执行时长等监控。导致了上线之后出现了偶发性的多线程问题。具体排查过程如下
具体问题
一句话概括:差不多一万次调用中,会出现一两次的任务执行超时。那么为什么会超时呢,这个需要先明白原有的任务执行调度模型。
任务的执行调度模型
任务执行图任务调度图
开始部分为用户线程,由于公司业务为金融业务,QPS只有30左右,所以我们的接口暂时采用了同步调用模型。压测单台并发为900,完全满足要求。所以接口暂时采用同步调用。
第三方调用本系统时,会根据入参会构建出一个任务的执行图。而任务的自动执行需要将任务入队。待执行任务,也就是前导任务已执行完成,可以开始执行的任务。待执行任务都会放入到Task queue中。而执行完成之后的任务,为待处理任务。待处理任务会放入到callback Queue中。待执行任务被消费之后,会根据任务类型(IO或计算),到对应的池中进行执行。执行完成之后,会将任务放入到callback队列中。而callback队列的任务被消费之后会有多种情况。
- 1 会判断当前任务是否有后继任务:
- 1.1 无 不处理
- 1.2 有的话对应两种情况。
- 2 后继任务是否为TailTask特殊节点,
- 2.1 是TailTask就进行任务完成统计,统计完之后,比对整个调用的整个任务是否完成(创建任务的时候,会记录一个尾结点,也就是tailTask。tailTask上会记录所有的直接跟尾任务相关的任务个数,通过这个数量进行判断),如果完成唤醒调用线程,返回结果。未完成,那就任务完成数加1
- 2.2 不是TailTask,就将该任务放入到task队列中,等待执行。
代码对比
代码对比图中,右侧为新的代码,根据对比可发现新增的代码,主要是添加了四个方法。根据新增代码,并没有看出有什么问题。这里也经过测试,发现就是新增监控代码导致的问题。上图只是一小部分。所以肯定问题还是新增代码的问题,然而为什么会出现这个问题,还是需要查找到原因滴。
问题分析
查看日志,发现如果任务的超时时间是2s,那么在50ms之内,会将大部分任务执行完成。然后不会再执行其他任务,直到2s超时。那么问题就是部分任务未执行。导致了callback消费者判断那一块一直不能判断为任务结束。初步分析任务调度异常,导致callback消费者未将待执行任务放入task队列。
问题排查
在上图代码中,添加日志,查看后继节点的执行情况
添加日志
通过该日志,发现,确实存在部分任务未执行到,而且都是二级任务。也就是如下图
二级任务
日志示例
如果有taskId为,1,2,3,4四个task。而headTask的taskId为0,tailTask的taskId为5。有如下依赖关系:
依赖关系
那么日志就会打印如下:
依赖关系:
0:1,0:3,1:2,3:4,2:5,4:5
而调度日志为:
currTaskId:0, nextTaskId:1
currTaskId:0, nextTaskId:3
currTaskId:1, nextTaskId:2
所以根据日志,判断3:4这条线出现了问题。因为3已经被放入了task队列中,所以只有4没有被执行到,导致任务一直等待超时。而且3本身并没有放入到callback中,所以问题应该是3未放入callback队列。
继续在任务放入callback队列时,添加日志,但是并没有执行到这里。所以在任务调用放入callback队列的方法之前,也就是run方法中又加入了日志如下:
添加日志
结果,最后的打印为3 begin,然后就没有然后了。所以至此问题排查到了,就是这个新增的方法的问题。然后查看代码,发现子类实现的beforeExecute可能存在并发问题,而产生异常。但是beforeExecute方法又没有进行try catch。导致了run任务的失败。从而最终导致了上述问题。
问题修复
package com.bj58.fbu.risk.feature.jobsupport.task;
import com.bj58.fbu.risk.feature.core.scheduler.SchedulerFactory;
import com.bj58.fbu.risk.feature.jobsupport.Executable;
import com.bj58.fbu.risk.feature.jobsupport.JobContext;
import com.bj58.fbu.risk.feature.jobsupport.callback.TaskCallBack;
import com.bj58.fbu.rsik.feature.data.DataSet;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
/**
* @author wangwenchang
* @date 2019/06/17
* @decription
*/
@Getter
@Setter
public abstract class AbstractSyncTask extends AbstractTask {
public AbstractSyncTask(Long jobId, Long taskId, int retryTime, TaskStatusEnum status, List<Executable> executables, JobContext jobContext, DataSet input, DataSet out) {
super(jobId, taskId, retryTime, status, executables, jobContext, input, out);
}
public AbstractSyncTask(long taskId, JobContext context, int retryTime, ArrayList<Executable> list) {
super(taskId, context, retryTime, list);
}
@Override
public void run() {
boolean success = true;
status = TaskStatusEnum.RUNNING;
try {
try {
beforeExecute();
} catch (Exception e) {
logger.error("处理前置事件出错!", e);
}
if (System.currentTimeMillis() > (jobGenerater.getRequestTime() + jobGenerater.getOutTime())) {
getLogger().warn("调用超时,任务丢弃!{}", this.getClass().getName());
_callback(TaskStatusEnum.TIME_OUT);
try {
whenTimeOut();
} catch (Exception e) {
logger.error("处理超时事件出错!", e);
}
return;
}
setTraceId();
out = _run();
if (needCallback()) {
_callback(success ? TaskStatusEnum.SUCCESS : TaskStatusEnum.FAIL);
}
try {
afterExecute();
} catch (Exception e) {
logger.error("处理后置事件出错!", e);
}
} catch (Exception e) {
logger.error("执行任务出错!taskid:" + this.getTaskId(), e);
_callback(TaskStatusEnum.FAIL);
} finally {
try {
finishExecute();
} catch (Exception e) {
logger.error("处理finally事件出错!", e);
}
}
}
@Override
public void _callback(TaskStatusEnum status) {
this.status = status;
TaskCallBack callBack = new TaskCallBack(this, status);
SchedulerFactory.getScheduler().callback(callBack);
}
/**
* 前置事件
*
* @throws Exception 调用方处理异常
*/
public void beforeExecute() throws Exception {
}
/**
* 超时事件
*
* @throws Exception 调用方处理异常
*/
protected void whenTimeOut() throws Exception {
}
/**
* 后置事件
*
* @throws Exception 调用方处理异常
*/
protected void afterExecute() throws Exception {
}
/**
* 结束事件
*
* @throws Exception 调用方处理异常
*/
protected void finishExecute() throws Exception {
}
}
最终进行评估,这几个新增的方法,不应该影响业务的正常执行,所以都有加入try catch。如果有异常,打印日志,进行异常捕获告警。
总结
1.所有牵扯到并发的代码,开发新功能时,都需要进行充分考虑。这个开发中却往往容易忽略掉
2.对于线程池中的任务,在进行excute或者sumit之前,将一些必要的成员变量先进行赋值,之后再执行任务。本次的问题就是子类实现的方法因为,先进行了执行,后进行的赋值,导致了NPE。
网友评论