美文网首页
03.记一次奇妙的线程池故障

03.记一次奇妙的线程池故障

作者: 乐活骑士 | 来源:发表于2020-07-05 12:40 被阅读0次

    写在前面

            咱们程序员都知道,多线程是个好东西,可以异步或者并行执行任务,提高程序性能,然而多线程又好比一把双刃剑,用的好可以大幅提高程序性能,用的不好,就有可能导致程序异常,甚至崩溃。前段时间,就不慎踩坑,记录分享一下,大家一起来避坑。由于业务代码复杂,我这里按照当时业务场景的模式,写了一个可执行的demo。

    出了啥问题?

            程序上线后不久,便出现假死,无法正常工作。重启后涛声依旧。

    问题排查与分析

            上服务器首先topdffree三连击,结果CPU正常,磁盘正常,内存正常。并没有发现什么异常,然后通过阿里神器arthas进行分析。

    arthas 分析

    Arthas 用户文档 https://alibaba.github.io/arthas/

    1. 进入arthasjava -jar arthas-boot.jar
    2. 选择java pid
    3. 打开dashboard
    4. 检查线程 thread

    打开arthas

    进入arthas,选择java进程

    image.png

    dashboard

    打开dashboard看看

    image.png

    看看线程

    thread -b 看看有无死锁,并没发现死锁

    [arthas@68541]$ thread -b
    No most blocking thread found!
    Affect(row-cnt:0) cost in 21 ms.
    

    thread 看看线程情况,结果发现大量WAITING线程。

    image.png

    jstack 看看线程出现什么问题了

    jstack 68541,找到了一些症状,定位到了WAITING的代码所在位置。

    image.png

    分析代码

    找到代码位置,下面我贴出代码。

    入口

    package threadDeadLock;
    
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    /**
     * @author William Zhang
     * @date 2020/7/3 3:19 下午
     * @description
     */
    public class ThreadDeadLockDemo {
    
        /**
         * 初始化线程池
         */
        public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
                50, 100, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10000), r -> new Thread(r, "zw-test-thread-pool"));
    
        /**
         * 程序入口
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            /**mock一定数量的任务**/
            List<Integer> skuIds = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
    
            /**开始任务**/
            TaskAService taskAService = new TaskAService();
            taskAService.taskA(skuIds);
    
            /**完成后停止线程组**/
            executor.shutdown();
        }
    
    
    }
    
    

    ServiceA

    package threadDeadLock;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Future;
    
    /**
     * @author William Zhang
     * @date 2020/7/3 3:49 下午
     * @description
     */
    public class TaskAService {
    
        private TaskBService taskBService = new TaskBService();
    
        /**
         * taskA,对传入的用户id集合,进行并行操作。
         * @param userIds
         * @throws Exception
         */
        public void taskA(List<Integer> userIds) throws Exception{
            System.out.println("taskA begin ...");
            List<Future> futures = new ArrayList<>();
            for (Integer userId : userIds) {
                // 并行执行任务
                Future<?> future = ThreadDeadLockDemo.executor.submit(() -> {
                    try {
                        taskBService.taskB(userId);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                futures.add(future);
            }
            // 等所有任务完成后返回
            for (Future future : futures) {
                future.get();
            }
            System.out.println("taskA end    ...");
        }
    
    }
    
    

    ServiceB

    package threadDeadLock;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Future;
    
    /**
     * @author William Zhang
     * @date 2020/7/3 3:49 下午
     * @description
     */
    public class TaskBService {
    
        /**
         * 查询商品
         */
        public Integer taskB(Integer userId) throws Exception{
            System.out.println("taskB 开始...");
            // 这里也是模拟并行查询用户明细
            List<Future> futures = new ArrayList<>();
    
            futures.add(queryUserInfoA(userId));
            futures.add(queryUserInfoB(userId));
            futures.add(queryUserInfoC(userId));
            futures.add(queryUserInfoD(userId));
            futures.add(queryUserInfoE(userId));
            futures.add(queryUserInfoF(userId));
    
            for (Future future : futures) {
                future.get();
            }
            System.out.println("taskB 结束...");
            return userId;
        }
    
        /*********** 下面都是模拟查询用户xxx信息的接口***********/
        private Future queryUserInfoA(Integer userId){
            Future<?> future = ThreadDeadLockDemo.executor.submit(() -> {
                try {
                    // 模拟查询过程
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            return future;
        }
    
        private Future queryUserInfoB(Integer userId){
            return queryUserInfoA(userId);
        }
    
        private Future queryUserInfoC(Integer userId){
            return queryUserInfoA(userId);
        }
    
        private Future queryUserInfoD(Integer userId){
            return queryUserInfoA(userId);
        }
    
        private Future queryUserInfoE(Integer userId){
            return queryUserInfoA(userId);
        }
    
        private Future queryUserInfoF(Integer userId){
            return queryUserInfoA(userId);
        }
    
    
    }
    
    

    解密

            问题非常隐晦,定位到的代码,看起来一切正常,仔细分析后发现,A方法里面有future get,然后taskA里面调用的taskB方法,也是future get,由于get会阻塞线程,那会不会是这里出现了问题呢?
            是的,线程数池的资源是宝贵的,这里形成了类似“死锁”的情景。执行taskA,它要等待taskB全部执行完成,如果此时线程池全部被taskA任务占用,taskB就进入阻塞队列,等待执行。而且阻塞队列设置的过长,无法填满,从而无法激活corePoolSize以外的线程,此时corePoolSize里面的线程没有执行完成,taskB在阻塞队列中等待,无法执行,那么也就导致taskA一直等待,这样就形成了“死锁”。

    总结

            在开发时,我们要注意以下几点:

    • 设置合理的corePoolSizemaximumPoolSizequeueSizecorePoolSize太小无法利用CPU资源,太大增加线程上下文切换反而耗费资源,一般设置CPU核心数2倍。
    • queueSize同样非常重要,太大的话,其实是无法激活corePoolSize以外的线程来进行工作,具体数值按照业务来估算。
    • 线程池隔离,对于特定业务,使用专用的线程池,隔离线程之间的干扰。
    • future.get(),要设置超时时间,避免堵死整个线程。

    更新后的代码

        /**
         * 初始化线程池
         */
        public static final ThreadPoolExecutor taskAExecutor = new ThreadPoolExecutor(
                5, 100, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10000), r -> new Thread(r, "zw-test-taskA-thread-pool"));
    
        public static final ThreadPoolExecutor taskBExecutor = new ThreadPoolExecutor(
                5, 100, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(1000), r -> new Thread(r, "zw-test-taskB-thread-pool"));
    
    
    
    /*********** 下面都是模拟查询用户xxx信息的接口***********/
        private Future queryUserInfoA(Integer userId){
            Future<?> future = ThreadDeadLockDemo.taskBExecutor.submit(() -> {
                try {
                    // 模拟查询过程
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            return future;
        }
        
        
    

    相关文章

      网友评论

          本文标题:03.记一次奇妙的线程池故障

          本文链接:https://www.haomeiwen.com/subject/incvqktx.html