美文网首页
线上并发问题

线上并发问题

作者: Tomy_Jx_Li | 来源:发表于2020-07-26 17:22 被阅读0次

    背景

    最近一次上线,对原有的同步任务执行加入了队列挤压、任务执行时长等监控。导致了上线之后出现了偶发性的多线程问题。具体排查过程如下

    具体问题

    一句话概括:差不多一万次调用中,会出现一两次的任务执行超时。那么为什么会超时呢,这个需要先明白原有的任务执行调度模型。

    任务的执行调度模型

    任务执行图
    任务调度图
    开始部分为用户线程,由于公司业务为金融业务,QPS只有30左右,所以我们的接口暂时采用了同步调用模型。压测单台并发为900,完全满足要求。所以接口暂时采用同步调用。
    第三方调用本系统时,会根据入参会构建出一个任务的执行图。而任务的自动执行需要将任务入队。待执行任务,也就是前导任务已执行完成,可以开始执行的任务。待执行任务都会放入到Task queue中。而执行完成之后的任务,为待处理任务。待处理任务会放入到callback Queue中。待执行任务被消费之后,会根据任务类型(IO或计算),到对应的池中进行执行。执行完成之后,会将任务放入到callback队列中。而callback队列的任务被消费之后会有多种情况。
    • 1 会判断当前任务是否有后继任务:
    • 1.1 无 不处理
    • 1.2 有的话对应两种情况。
    • 2 后继任务是否为TailTask特殊节点,
    • 2.1 是TailTask就进行任务完成统计,统计完之后,比对整个调用的整个任务是否完成(创建任务的时候,会记录一个尾结点,也就是tailTask。tailTask上会记录所有的直接跟尾任务相关的任务个数,通过这个数量进行判断),如果完成唤醒调用线程,返回结果。未完成,那就任务完成数加1
    • 2.2 不是TailTask,就将该任务放入到task队列中,等待执行。

    代码对比

    代码对比

    图中,右侧为新的代码,根据对比可发现新增的代码,主要是添加了四个方法。根据新增代码,并没有看出有什么问题。这里也经过测试,发现就是新增监控代码导致的问题。上图只是一小部分。所以肯定问题还是新增代码的问题,然而为什么会出现这个问题,还是需要查找到原因滴。

    问题分析

    查看日志,发现如果任务的超时时间是2s,那么在50ms之内,会将大部分任务执行完成。然后不会再执行其他任务,直到2s超时。那么问题就是部分任务未执行。导致了callback消费者判断那一块一直不能判断为任务结束。初步分析任务调度异常,导致callback消费者未将待执行任务放入task队列。

    callback队列消费线程

    问题排查

    在上图代码中,添加日志,查看后继节点的执行情况


    添加日志

    通过该日志,发现,确实存在部分任务未执行到,而且都是二级任务。也就是如下图


    二级任务

    日志示例

    如果有taskId为,1,2,3,4四个task。而headTask的taskId为0,tailTask的taskId为5。有如下依赖关系:


    依赖关系

    那么日志就会打印如下:
    依赖关系:
    0:1,0:3,1:2,3:4,2:5,4:5
    而调度日志为:
    currTaskId:0, nextTaskId:1
    currTaskId:0, nextTaskId:3
    currTaskId:1, nextTaskId:2
    所以根据日志,判断3:4这条线出现了问题。因为3已经被放入了task队列中,所以只有4没有被执行到,导致任务一直等待超时。而且3本身并没有放入到callback中,所以问题应该是3未放入callback队列。
    继续在任务放入callback队列时,添加日志,但是并没有执行到这里。所以在任务调用放入callback队列的方法之前,也就是run方法中又加入了日志如下:


    添加日志
    结果,最后的打印为3 begin,然后就没有然后了。所以至此问题排查到了,就是这个新增的方法的问题。然后查看代码,发现子类实现的beforeExecute可能存在并发问题,而产生异常。但是beforeExecute方法又没有进行try catch。导致了run任务的失败。从而最终导致了上述问题。

    问题修复

    package com.bj58.fbu.risk.feature.jobsupport.task;
    
    import com.bj58.fbu.risk.feature.core.scheduler.SchedulerFactory;
    import com.bj58.fbu.risk.feature.jobsupport.Executable;
    import com.bj58.fbu.risk.feature.jobsupport.JobContext;
    import com.bj58.fbu.risk.feature.jobsupport.callback.TaskCallBack;
    import com.bj58.fbu.rsik.feature.data.DataSet;
    import lombok.Getter;
    import lombok.Setter;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author wangwenchang
     * @date 2019/06/17
     * @decription
     */
    @Getter
    @Setter
    public abstract class AbstractSyncTask extends AbstractTask {
    
        public AbstractSyncTask(Long jobId, Long taskId, int retryTime, TaskStatusEnum status, List<Executable> executables, JobContext jobContext, DataSet input, DataSet out) {
            super(jobId, taskId, retryTime, status, executables, jobContext, input, out);
        }
    
        public AbstractSyncTask(long taskId, JobContext context, int retryTime, ArrayList<Executable> list) {
            super(taskId, context, retryTime, list);
        }
    
        @Override
        public void run() {
            boolean success = true;
            status = TaskStatusEnum.RUNNING;
            try {
                try {
                    beforeExecute();
                } catch (Exception e) {
                    logger.error("处理前置事件出错!", e);
                }
                if (System.currentTimeMillis() > (jobGenerater.getRequestTime() + jobGenerater.getOutTime())) {
                    getLogger().warn("调用超时,任务丢弃!{}", this.getClass().getName());
                    _callback(TaskStatusEnum.TIME_OUT);
                    try {
                        whenTimeOut();
                    } catch (Exception e) {
                        logger.error("处理超时事件出错!", e);
                    }
                    return;
                }
                setTraceId();
                out = _run();
                if (needCallback()) {
                    _callback(success ? TaskStatusEnum.SUCCESS : TaskStatusEnum.FAIL);
                }
                try {
                    afterExecute();
                } catch (Exception e) {
                    logger.error("处理后置事件出错!", e);
                }
            } catch (Exception e) {
                logger.error("执行任务出错!taskid:" + this.getTaskId(), e);
                _callback(TaskStatusEnum.FAIL);
            } finally {
                try {
                    finishExecute();
                } catch (Exception e) {
                    logger.error("处理finally事件出错!", e);
                }
            }
        }
    
        @Override
        public void _callback(TaskStatusEnum status) {
            this.status = status;
            TaskCallBack callBack = new TaskCallBack(this, status);
            SchedulerFactory.getScheduler().callback(callBack);
        }
    
        /**
         * 前置事件
         *
         * @throws Exception 调用方处理异常
         */
        public void beforeExecute() throws Exception {
        }
    
        /**
         * 超时事件
         *
         * @throws Exception 调用方处理异常
         */
        protected void whenTimeOut() throws Exception {
        }
    
        /**
         * 后置事件
         *
         * @throws Exception 调用方处理异常
         */
        protected void afterExecute() throws Exception {
        }
    
        /**
         * 结束事件
         *
         * @throws Exception 调用方处理异常
         */
        protected void finishExecute() throws Exception {
        }
    }
    

    最终进行评估,这几个新增的方法,不应该影响业务的正常执行,所以都有加入try catch。如果有异常,打印日志,进行异常捕获告警。

    总结

    1.所有牵扯到并发的代码,开发新功能时,都需要进行充分考虑。这个开发中却往往容易忽略掉
    2.对于线程池中的任务,在进行excute或者sumit之前,将一些必要的成员变量先进行赋值,之后再执行任务。本次的问题就是子类实现的方法因为,先进行了执行,后进行的赋值,导致了NPE。

    相关文章

      网友评论

          本文标题:线上并发问题

          本文链接:https://www.haomeiwen.com/subject/ygkekktx.html