美文网首页
线上并发问题

线上并发问题

作者: Tomy_Jx_Li | 来源:发表于2020-07-26 17:22 被阅读0次

背景

最近一次上线,对原有的同步任务执行加入了队列挤压、任务执行时长等监控。导致了上线之后出现了偶发性的多线程问题。具体排查过程如下

具体问题

一句话概括:差不多一万次调用中,会出现一两次的任务执行超时。那么为什么会超时呢,这个需要先明白原有的任务执行调度模型。

任务的执行调度模型

任务执行图
任务调度图
开始部分为用户线程,由于公司业务为金融业务,QPS只有30左右,所以我们的接口暂时采用了同步调用模型。压测单台并发为900,完全满足要求。所以接口暂时采用同步调用。
第三方调用本系统时,会根据入参会构建出一个任务的执行图。而任务的自动执行需要将任务入队。待执行任务,也就是前导任务已执行完成,可以开始执行的任务。待执行任务都会放入到Task queue中。而执行完成之后的任务,为待处理任务。待处理任务会放入到callback Queue中。待执行任务被消费之后,会根据任务类型(IO或计算),到对应的池中进行执行。执行完成之后,会将任务放入到callback队列中。而callback队列的任务被消费之后会有多种情况。
  • 1 会判断当前任务是否有后继任务:
  • 1.1 无 不处理
  • 1.2 有的话对应两种情况。
  • 2 后继任务是否为TailTask特殊节点,
  • 2.1 是TailTask就进行任务完成统计,统计完之后,比对整个调用的整个任务是否完成(创建任务的时候,会记录一个尾结点,也就是tailTask。tailTask上会记录所有的直接跟尾任务相关的任务个数,通过这个数量进行判断),如果完成唤醒调用线程,返回结果。未完成,那就任务完成数加1
  • 2.2 不是TailTask,就将该任务放入到task队列中,等待执行。

代码对比

代码对比

图中,右侧为新的代码,根据对比可发现新增的代码,主要是添加了四个方法。根据新增代码,并没有看出有什么问题。这里也经过测试,发现就是新增监控代码导致的问题。上图只是一小部分。所以肯定问题还是新增代码的问题,然而为什么会出现这个问题,还是需要查找到原因滴。

问题分析

查看日志,发现如果任务的超时时间是2s,那么在50ms之内,会将大部分任务执行完成。然后不会再执行其他任务,直到2s超时。那么问题就是部分任务未执行。导致了callback消费者判断那一块一直不能判断为任务结束。初步分析任务调度异常,导致callback消费者未将待执行任务放入task队列。

callback队列消费线程

问题排查

在上图代码中,添加日志,查看后继节点的执行情况


添加日志

通过该日志,发现,确实存在部分任务未执行到,而且都是二级任务。也就是如下图


二级任务

日志示例

如果有taskId为,1,2,3,4四个task。而headTask的taskId为0,tailTask的taskId为5。有如下依赖关系:


依赖关系

那么日志就会打印如下:
依赖关系:
0:1,0:3,1:2,3:4,2:5,4:5
而调度日志为:
currTaskId:0, nextTaskId:1
currTaskId:0, nextTaskId:3
currTaskId:1, nextTaskId:2
所以根据日志,判断3:4这条线出现了问题。因为3已经被放入了task队列中,所以只有4没有被执行到,导致任务一直等待超时。而且3本身并没有放入到callback中,所以问题应该是3未放入callback队列。
继续在任务放入callback队列时,添加日志,但是并没有执行到这里。所以在任务调用放入callback队列的方法之前,也就是run方法中又加入了日志如下:


添加日志
结果,最后的打印为3 begin,然后就没有然后了。所以至此问题排查到了,就是这个新增的方法的问题。然后查看代码,发现子类实现的beforeExecute可能存在并发问题,而产生异常。但是beforeExecute方法又没有进行try catch。导致了run任务的失败。从而最终导致了上述问题。

问题修复

package com.bj58.fbu.risk.feature.jobsupport.task;

import com.bj58.fbu.risk.feature.core.scheduler.SchedulerFactory;
import com.bj58.fbu.risk.feature.jobsupport.Executable;
import com.bj58.fbu.risk.feature.jobsupport.JobContext;
import com.bj58.fbu.risk.feature.jobsupport.callback.TaskCallBack;
import com.bj58.fbu.rsik.feature.data.DataSet;
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.List;

/**
 * @author wangwenchang
 * @date 2019/06/17
 * @decription
 */
@Getter
@Setter
public abstract class AbstractSyncTask extends AbstractTask {

    public AbstractSyncTask(Long jobId, Long taskId, int retryTime, TaskStatusEnum status, List<Executable> executables, JobContext jobContext, DataSet input, DataSet out) {
        super(jobId, taskId, retryTime, status, executables, jobContext, input, out);
    }

    public AbstractSyncTask(long taskId, JobContext context, int retryTime, ArrayList<Executable> list) {
        super(taskId, context, retryTime, list);
    }

    @Override
    public void run() {
        boolean success = true;
        status = TaskStatusEnum.RUNNING;
        try {
            try {
                beforeExecute();
            } catch (Exception e) {
                logger.error("处理前置事件出错!", e);
            }
            if (System.currentTimeMillis() > (jobGenerater.getRequestTime() + jobGenerater.getOutTime())) {
                getLogger().warn("调用超时,任务丢弃!{}", this.getClass().getName());
                _callback(TaskStatusEnum.TIME_OUT);
                try {
                    whenTimeOut();
                } catch (Exception e) {
                    logger.error("处理超时事件出错!", e);
                }
                return;
            }
            setTraceId();
            out = _run();
            if (needCallback()) {
                _callback(success ? TaskStatusEnum.SUCCESS : TaskStatusEnum.FAIL);
            }
            try {
                afterExecute();
            } catch (Exception e) {
                logger.error("处理后置事件出错!", e);
            }
        } catch (Exception e) {
            logger.error("执行任务出错!taskid:" + this.getTaskId(), e);
            _callback(TaskStatusEnum.FAIL);
        } finally {
            try {
                finishExecute();
            } catch (Exception e) {
                logger.error("处理finally事件出错!", e);
            }
        }
    }

    @Override
    public void _callback(TaskStatusEnum status) {
        this.status = status;
        TaskCallBack callBack = new TaskCallBack(this, status);
        SchedulerFactory.getScheduler().callback(callBack);
    }

    /**
     * 前置事件
     *
     * @throws Exception 调用方处理异常
     */
    public void beforeExecute() throws Exception {
    }

    /**
     * 超时事件
     *
     * @throws Exception 调用方处理异常
     */
    protected void whenTimeOut() throws Exception {
    }

    /**
     * 后置事件
     *
     * @throws Exception 调用方处理异常
     */
    protected void afterExecute() throws Exception {
    }

    /**
     * 结束事件
     *
     * @throws Exception 调用方处理异常
     */
    protected void finishExecute() throws Exception {
    }
}

最终进行评估,这几个新增的方法,不应该影响业务的正常执行,所以都有加入try catch。如果有异常,打印日志,进行异常捕获告警。

总结

1.所有牵扯到并发的代码,开发新功能时,都需要进行充分考虑。这个开发中却往往容易忽略掉
2.对于线程池中的任务,在进行excute或者sumit之前,将一些必要的成员变量先进行赋值,之后再执行任务。本次的问题就是子类实现的方法因为,先进行了执行,后进行的赋值,导致了NPE。

相关文章

  • 线上并发问题

    背景 最近一次上线,对原有的同步任务执行加入了队列挤压、任务执行时长等监控。导致了上线之后出现了偶发性的多线程问题...

  • mybatis3.2.8踩坑记录之.size()

    关于mybatis的xml标签使用问题和单元测试模拟高并发场景 标签使用问题 线上问题复现 查询对应的xml 问题...

  • Arthas - Java 线上问题定位处理利器

    对于电商等高并发系统,线上经常出现很多问题,如 CPU 飙升、线程数突高、线上卡顿等。 通常,我们会用一些工具:比...

  • 不改一行代码定位线上性能问题

    背景 最近时运不佳,几乎天天被线上问题骚扰。前几天刚解决了一个 HashSet 的并发问题,周六又来了一个性能问题...

  • 不改一行代码定位线上性能问题

    背景 最近时运不佳,几乎天天被线上问题骚扰。前几天刚解决了一个HashSet 的并发问题,周六又来了一个性能问题。...

  • solr 7.0 与spring-data 3.0整合--(5)

    SolrCloud 对于线上的应用,都会存在大规模的高并发访问,为了应用的高效性,稳定性,可靠性都会将高并发的线上...

  • 2018-04-25

    线上账务系统余额并发更新问题记录 物联网技术周报第 134 期: 智能音箱 Alexa 与 Arduino 制作家...

  • 学好并发编程必须要理解的三个核心问题

    一些读者朋友留言说,并发编程很难,学习了很多的知识,但是在实际工作中却无从下手。对于一个线上产生的并发问题,又不知...

  • 记一次线上事务并发问题

    今天一同事线上遇到一个问题,程序不明原因的进入了死循环。最后通过一步步分析代码的线程运行情况,定位出是事务产生的并...

  • 从sync.map看并发问题 2022-05-24

    1.一般意义下的并发问题 并发读写的问题,其实都出在写上。并发读一点问题都没有 并发读写2大问题如果写是更新操作,...

网友评论

      本文标题:线上并发问题

      本文链接:https://www.haomeiwen.com/subject/ygkekktx.html