美文网首页美文共赏
掌握线程池7大核心参数,自己也可以手写线程池

掌握线程池7大核心参数,自己也可以手写线程池

作者: ludan110 | 来源:发表于2021-11-29 21:34 被阅读0次

    手写线程池只需了解7个线程池核心参数

    参数名 中文名 说明
    corePoolSize 核心线程数 默认不会销毁,需要设置allowCoreThreadTimeOut为true时会销毁
    maximumPoolSize 最大线程数量 线程数量要大于核心线程数,且不能小于等于0
    keepAliveTime 空闲时间 最终存入的是nanos的值
    unit 空闲时间单位 unit.tonanos(long time),最终转为纳秒值存入成员变量
    workQueue 存放任务的队列 jdk实现的queue在初始化时需要设置队列长度
    threadFactory 线程创建的工厂 默认会提供工厂,Executors.DefaultThreadFactory
    handler 拒绝策略 默认拒绝策略为AbortPolicy,直接抛出异常

    文章开头直接表格丢出七个核心参数,以及简单的说明。

    如果已经是老司机,那么看到之后就能直接唤醒深处已经淡忘的记忆。

    如果还是小萌新,不妨先简单的有个印象,等看完示例之后再深刻记忆。无论是对面试还是源码走读,或是线程池二开优化都会有帮助。

    前导

    手写线程池之前,你得知道线程池是什么,如何工作的。这样踩在Doug Lea的肩膀上你才能看的更远,走的更高。

    如果你还不知道什么是线程池,可能本文不太适合你,看看我其他文章如线程池的前世今生、池化技术之后再来吧。

    JDK中ThreeadPoolExecutor线程池的运行原理借用我生活中的例子来说明,帮助理解记忆。

    故事

    第零章

    我是隶属于一个包工头(ThreadPoolExecutor)下的一个小员工,是一名Worker。曾经他手下一个员工都没有(默认线程数为0,没有线程启动),他自己也好吃懒作,从来不干活。

    一次酒后,我们头儿吐露出他计划要招4个长工(coolPoolSize为4),而且以他的能力最多也只能管理6个下属(maximumPoolSize为6),那么也就说他最多也还会招2个临时工。他老婆兼职着Hr(threadFactory,理解为每一个线程都是经过她的手创建),每一个人入职都会发给我们一个安全帽,安全帽上标着编号,而我的编号是0x11。

    第一章:第一个下属

    有一天,一个老板遇见头儿,问他100块搬一块砖干不干,头儿说干。我们都知道的,他是不干活的。在下雨的公交站,只有我们两个人,他神秘的问我,你也是搬砖的吧,我有一个1块钱一块砖的活,你敢不敢?我想到我们老板以前只给一毛一块,我二话没说,成了他第一个员工(此时,核心线程数为1)。

    第二章:干活

    看着涨了10倍的工资,我的工作积极性被调动了起来。从来没有一块砖需要等我,一来砖我就搬完了,只有我等砖,没有砖等我。

    第三章:两块砖

    有一天,工地门口一下子来了两块砖。
    我站在门口呆了,毕竟我一次只能搬一块砖,还有一块砖怎么办?工头来了,不知道他使出了什么技巧,一眨眼的时间,又拉到了一个工友,和我一起搬砖。不知道他是不是也是1块钱一块砖,我也没敢问。

    第四章:两个人

    活来的不是那么稳定。我和小王,对了忘记说了,那个新来的同事叫小王。小王和我一样,也是手脚麻利,干活卖力,一次来两块砖时,我们一人一块,如果只到了一块砖,我们看谁先抢到,谁就能赚这一块钱。幸好,我的手脚挺快,基本上一块砖的时候,我也能和他抢个55开。

    第五章:平淡的生活

    砖是一天天搬,工头的生意也越做越大,最多的时候能有一次四块砖。和小王来的那次一样,在门口岗亭那,工头和他们聊不到一支烟的功夫,我又多了两个工友。

    第六章:门口的沙地

    门口有一块沙地,我一直都没有注意到它。但是今天例外,因为今天一下来了五块砖,工头也没向往常一样出来解决。我知道他躺在那一定是看见了,因为我看到他故意的压了压帽檐,应该是想让我们以为他睡得比较死,帽子盖得严。

    我们四个人一人搬走了一块砖,还剩下一块砖,它就静静的摆在沙地上(中间插一句,workQueue中插入了第一个任务,这毕竟还是一篇技术向的文章,别忘了正事)。它就躺在那块沙地上,我从来没有注意到的沙地,仿佛以前从未存在,但是今天它在了。沙地上还摆着我的罪恶感,不知道其他的工友是怎么想的,至少我是。我内心里从来不能忍受有砖是在等我搬的。明明只有我能等砖啊。

    第七章:砖块

    想聊的太多了,我好像都忘记介绍我搬的砖块了。这可不是普通的砖块,长宽高我不太有概念,只知道很重,那天工头跟我们说,这砖不耐压,最底下的砖要是再压上三块(共4块,workQueue的长度为4),如果再加一块就会把最底下的砖压坏。我甚至还听到一个比较恐怖的传说,门口那块沙地风水也不太好,要是沙地上一共压了五块砖,能把下面挖的地下室压塌,甚至整个工地也会倒掉。(队列中积压大对象过多,导致OOM)

    这就是为什么我一次只能搬一块砖,这砖可不好搬呀。你也别说你来你也行。要是没有经过长期的搬砖训练,这砖你应该是搬不动的。

    第八章:摇人

    沙地上偶然会出现已经积压四块砖的情况,不过一般很快危险就被排除了。毕竟我们四个工友都是好样的,基本上不会让这种情况出现太久。

    但是危险的事情总会出现,两辆自卸车带着第五块砖、第六块砖来了。门口也没有其他人,不管怎么样,第五块砖都会压上去吧。

    然而,这件事我没有亲眼所见,是我工友告诉我的。

    工友告诉我,这些自卸车到的几分钟前,工头就打了一个电话。也就等自卸车刚停稳,边上一辆小面包就过来了。面包车里跳下来两个人,一人搬起一块砖,就开始工作。

    工头原来还会摇人,从没有见过,还刚好摇了两个人来搬超额的两块砖。我们私下都认为老板应该是事先知道今天肯定会来不及搬。

    第九章:门房大爷

    原来我不知道的事情还有这么多。门房大爷(RejectedExecutionHandler)是早在我之前,就已经和工头谈好的,帮忙看着那些砖,防止转压的过多。事情是这样的。
    不是我们不勤快,是砖越来越多。即便我们六个人,也搬不过来,堆在沙地上的砖也一直到了四块。

    我们一直都以为门房大爷和蔼可亲,和我们关系也老好了,不像是个社会人。但是那天自卸车到了,大家都没注意,第五块砖要压上来了,唯独大爷看到了。只见到大爷抄起菜刀脱掉安全帽带起钢盔冲到门口,大声嚷嚷挥舞菜刀赶走了自卸车和带着的砖块(默认AbortPolicy策略,直接抛异常,任务不处理)。原来大爷是这么一个大爷,有他在,这辈子是验证不了都市传说到底是不是真的了。

    第十章:后疫情时代

    疫情来了,送砖头的车也少了。我们知道,裁员肯定也是来了。

    工头说他很人性化,他不做决断,让我们自己说了算。但是他定下规矩如果三天(时间单位TimeUnit.DAYS,时间keepAliveTime为3)一块砖都没搬,让我们自己选择离开吧。我还以为我们最开始四个不是临时工,不会被裁,原来我们都是。

    工头只保证最后会留四个。

    第十一章:内卷还在继续

    虽然我们不拿底薪,按照搬砖的数量计薪。不过工头还是嫌我们人多活少碍事,在工地里瞎晃悠(线程还是存在进程中,多少占用了系统资源)。工头调整了规则(allowsCoreThreadTimeOuttrue,允许核心线程销毁),我们四个常任搬砖工也变成临时工,根据之前的规则要被淘汰了。
    每一个人都为了一口饭,互相内卷着,希望不被开除。

    手写线程池实现

    终于开始自己个儿实现一个线程池了,思路就是模仿jdk中的线程池。或者说的再直白点,那就是默写jdk线程池的核心代码。

    定义成员变量

    /**
     * 线程池中用了一个ctl来实现, 不过自己实现的时候并不想写的这么复杂, 因此还是拆成两个变量, state与threadNum
     * 线程池状态,简单理解为-1是正常,大于0线程池已经异常, 简单起见,就没有做判断.
     */
    private AtomicInteger state = new AtomicInteger(0);
    /**
     * 线程数量
     */
    private AtomicInteger threadNum = new AtomicInteger(0);
    /**
     * 完成任务量
     */
    private volatile int compliteNum = 0;
    
    /**
     * 持有所有的线程
     */
    private HashSet<Worker> workers = new HashSet<>();
    
    /**
     * 缓存工作任务
     */
    private BlockingQueue<Runnable> taskQueue;
    
    /**
     * 线程池创建的工厂
     */
    private volatile ThreadFactory threadFactory;
    
    /**
     * 核心线程数
     */
    private volatile int corePoolSize;
    
    /**
     * 最大线程数
     */
    private volatile int maximumPoolSize;
    
    /**
     * 拒绝策略
     */
    private MyRejectedExecutionHandler handler;
    
    /**
     * 锁, 处理一些公用变量时需要加锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    

    构造方法

    只提供了一个三个参数的构造方法,其他参数都不是关键,所以直接设置了默认值。

    /**
     * 提供一种构造方法, 只需要配置线程数和queue的容量
     * @param corePoolSize
     * @param maximumPoolSize
     * @param taskQueue
     */
    public MyPool(Integer corePoolSize, Integer maximumPoolSize, BlockingQueue taskQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.taskQueue = taskQueue;
        this.threadFactory = Executors.defaultThreadFactory();
        this.handler = new MyRejectedExecutionHandler();
    }
    

    核心内部类,worker

    /**
     * 内部类, worker,持有线程,执行任务
     */
    private class Worker implements Runnable {
        Thread thread;
        Runnable task;
    
        /**
         * 构造方法, 构造出work
         * @param task
         */
        public Worker( Runnable task) {
            this.task = task;
            this.thread = getThreadFactory().newThread(this);
        }
    
        @Override
        public void run() {
            System.out.println("覆盖run方法,对传入的任务做增强");
            runWorker(this);
        }
    }
    

    获取线程工厂的方法

    线程工厂用的是jdk中自带的DefaultThreadFactory,在worker初始化时获取线程。

    /**
     * 通过线程工厂创建线程
     * @return
     */
    private ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }
    

    自定义拒绝策略

    自定义拒绝策略。

    /**
     * 自定义拒绝策略, 并没有去继承jdk中的接口
     * 直接打印, 不做其他处理
     */
    private class MyRejectedExecutionHandler {
    
        public void rejectedExecution(Runnable r, MyPool p) {
            System.out.println("任务过载,丢弃任务");
        }
    }
    

    在后续使用拒绝策略时,调用的方法:

    private void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    

    向线程池提交任务

    向线程池提交任务,在自己实现的线程池中,是为数不多的对外暴露的接口。

    而在官方的线程池中,也几乎没有什么能直接操作到线程或者任务的API,对外暴露的多为管理线程池方面的API。

    这个方法需要好好掌握,也是面试中问的最多的部分。

    /**
     * 对外暴露的方法, 执行任务
     * @param r
     */
    public void execute(Runnable r) {
        if (r == null) throw new NullPointerException();
    
        int n = threadNum.get();
    
        // 小于核心线程数
        if (n < corePoolSize) {
            addWorker(r, true);
            return;
        }
        // 队列未满, 未满时offer返回true
        if (taskQueue.offer(r)) {
            addWorker(null, false);
        }
        // 队列已满, 增加线程直到最大线程数为止
        else if (!addWorker(r, false)){
            // 执行拒绝策略
            reject(r);
        }
    
    }
    

    将任务交给worker内部类

    execute中基本上只是一个中转,决定任务到底是由核心线程接收,还是队列接收,还是非核心线程接收或是拒绝策略接收。

    在这里是要把任务添加给work,加入到works中,并且调用了线程的t.start()方法启动一个新的线程。

    /**
     * 将任务添加到执行过程中
     * @param r
     * @param core
     */
    private boolean addWorker(Runnable r, boolean core) {
        int num = threadNum.get();
    
        // 乐观锁
        for(;;) {
            // 线程数是否超出
            if (num >= (core ? corePoolSize : maximumPoolSize)) {
                return false;
            }
    
            if (threadNum.compareAndSet(num, num + 1)) {
                // 添加成功
                break;
            }
        }
    
        Worker w = null;
        boolean workerStarted = false;
        try {
            w = new Worker(r);
            Thread t = w.thread;
    
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    workers.add(w);
                } finally {
                    mainLock.unlock();
                }
                t.start();
                workerStarted = true;
            }
        } finally {
    
        }
    
        return workerStarted;
    }
    

    真正执行任务

    /**
     * 线程由addWorker中t.start已启动, 启动之后将执行Worker中的run方法
     * 这个run方法传入的是worker对象,
     * 对普通的任务做增强, 也用于对于所执行的任务做前置后置的管理
     * @param w
     */
    private void runWorker(Worker w) {
        Runnable task = w.task;
    
        try {
            // 一直获取任务
            while (task != null || (task = getTask()) != null) {
    
                try {
                    beforeExecute();
    
                    try {
                        task.run();
    
                    } finally {
                        afterExecute();
                    }
                } finally {
                    // 演示用, 直接加锁,  统计任务完成数
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        compliteNum += 1;
                    } finally {
                        mainLock.unlock();
                        task = null;
                    }
    
                }
            }
        } finally {
            // 线程退出的后置操作
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                workers.remove(w);
                threadNum.decrementAndGet();
            } finally {
                mainLock.unlock();
            }
    
            if (threadNum.get() < corePoolSize) {
                addWorker(null, false);
            }
        }
    
    }
    

    获取任务,不让线程闲下来

    /**
     * 从队列中获取任务
     * @return
     */
    private Runnable getTask() {
        try {
            Runnable r = taskQueue.take();
            if (r != null) {
                return r;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    

    前后置增强

    private void afterExecute() {
        //System.out.println("操作后置增强");
    }
    
    private void beforeExecute() {
        //System.out.println("操作前置增强");
    }
    

    获取总共完成的任务数

    /**
     * 获取总共完成的任务数
     * @return
     */
    public int getCompliteNum() {
        return this.compliteNum;
    }
    

    运行示例

    image

    全部代码

    public class MyPool {
    
        /**
         * 线程池中用了一个ctl来实现, 不过自己实现的时候并不想写的这么复杂, 因此还是拆成两个变量, state与threadNum
         * 线程池状态,简单理解为-1是正常,大于0线程池已经异常, 简单起见,就没有做判断.
         */
        private AtomicInteger state = new AtomicInteger(0);
        /**
         * 线程数量
         */
        private AtomicInteger threadNum = new AtomicInteger(0);
        /**
         * 完成任务量
         */
        private volatile int compliteNum = 0;
    
        /**
         * 持有所有的线程
         */
        private HashSet<Worker> workers = new HashSet<>();
    
        /**
         * 缓存工作任务
         */
        private BlockingQueue<Runnable> taskQueue;
    
        /**
         * 线程池创建的工厂
         */
        private volatile ThreadFactory threadFactory;
    
        /**
         * 核心线程数
         */
        private volatile int corePoolSize;
    
        /**
         * 最大线程数
         */
        private volatile int maximumPoolSize;
    
        /**
         * 拒绝策略
         */
        private MyRejectedExecutionHandler handler;
    
        /**
         * 锁, 处理一些公用变量时需要加锁
         */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /**
         * 提供一种构造方法, 只需要配置线程数和queue的容量
         * @param corePoolSize
         * @param maximumPoolSize
         * @param taskQueue
         */
        public MyPool(Integer corePoolSize, Integer maximumPoolSize, BlockingQueue taskQueue) {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.taskQueue = taskQueue;
            this.threadFactory = Executors.defaultThreadFactory();
            this.handler = new MyRejectedExecutionHandler();
        }
    
        /**
         * 内部类, worker,持有线程,执行任务
         */
        private class Worker implements Runnable {
            Thread thread;
            Runnable task;
    
            /**
             * 构造方法, 构造出work
             * @param task
             */
            public Worker( Runnable task) {
                this.task = task;
                this.thread = getThreadFactory().newThread(this);
            }
    
            @Override
            public void run() {
                System.out.println("覆盖run方法,对传入的任务做增强");
                runWorker(this);
            }
        }
    
        /**
         * 通过线程工厂创建线程
         * @return
         */
        private ThreadFactory getThreadFactory() {
            return this.threadFactory;
        }
    
        /**
         * 自定义拒绝策略, 并没有去继承jdk中的接口
         * 直接打印, 不做其他处理
         */
        private class MyRejectedExecutionHandler {
    
            public void rejectedExecution(Runnable r, MyPool p) {
                System.out.println("任务过载,丢弃任务");
            }
        }
    
        /**
         * 对外暴露的方法, 执行任务
         * @param r
         */
        public void execute(Runnable r) {
            if (r == null) throw new NullPointerException();
    
            int n = threadNum.get();
    
            // 小于核心线程数
            if (n < corePoolSize) {
                addWorker(r, true);
                return;
            }
            // 队列未满, 未满时offer返回true
            if (taskQueue.offer(r)) {
                addWorker(null, false);
            }
            // 队列已满, 增加线程直到最大线程数为止
            else if (!addWorker(r, false)){
                // 执行拒绝策略
                reject(r);
            }
    
        }
    
        /**
         * 将任务添加到执行过程中
         * @param r
         * @param core
         */
        private boolean addWorker(Runnable r, boolean core) {
            int num = threadNum.get();
    
            // 乐观锁
            for(;;) {
                // 线程数是否超出
                if (num >= (core ? corePoolSize : maximumPoolSize)) {
                    return false;
                }
    
                if (threadNum.compareAndSet(num, num + 1)) {
                    // 添加成功
                    break;
                }
            }
    
            Worker w = null;
            boolean workerStarted = false;
            try {
                w = new Worker(r);
                Thread t = w.thread;
    
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        workers.add(w);
                    } finally {
                        mainLock.unlock();
                    }
                    t.start();
                    workerStarted = true;
                }
            } finally {
    
            }
    
            return workerStarted;
        }
    
        /**
         * 线程由addWorker中t.start已启动, 启动之后将执行Worker中的run方法
         * 这个run方法传入的是worker对象,
         * 对普通的任务做增强, 也用于对于所执行的任务做前置后置的管理
         * @param w
         */
        private void runWorker(Worker w) {
            Runnable task = w.task;
    
            try {
                // 一直获取任务
                while (task != null || (task = getTask()) != null) {
    
                    try {
                        beforeExecute();
    
                        try {
                            task.run();
    
                        } finally {
                            afterExecute();
                        }
                    } finally {
                        // 演示用, 直接加锁,  统计任务完成数
                        final ReentrantLock mainLock = this.mainLock;
                        mainLock.lock();
                        try {
                            compliteNum += 1;
                        } finally {
                            mainLock.unlock();
                            task = null;
                        }
    
                    }
                }
            } finally {
                // 线程退出的后置操作
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    workers.remove(w);
                    threadNum.decrementAndGet();
                } finally {
                    mainLock.unlock();
                }
    
                if (threadNum.get() < corePoolSize) {
                    addWorker(null, false);
                }
            }
    
        }
    
        /**
         * 从队列中获取任务
         * @return
         */
        private Runnable getTask() {
            try {
                Runnable r = taskQueue.take();
                if (r != null) {
                    return r;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        private void afterExecute() {
            //System.out.println("操作后置增强");
        }
    
        private void beforeExecute() {
            //System.out.println("操作前置增强");
        }
    
        private void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    
        /**
         * 获取总共完成的任务数
         * @return
         */
        public int getCompliteNum() {
            return this.compliteNum;
        }
    
    }
    

    后记

    工作中,手写线程池属实没什么用。jdk已经在底层实现了线程池的功能,我们只需要在此基础上封装即可。手写的意义在于能更细致的去精读里面的代码。获取你直接阅读,甚至是debug之后,你以为你已经掌握了。但是你用自己的语言,自己的理解再去实现一遍时。你会发现,还是有很多地方跟你想象中的不一样,有很多细节遗漏。因此即便到最后实现的线程池在一般流程下能运行,但是各种异常情况,各种容错都没有实现,这些异常的处理,已经边界条件的选择仍然需要精进。

    相关文章

      网友评论

        本文标题:掌握线程池7大核心参数,自己也可以手写线程池

        本文链接:https://www.haomeiwen.com/subject/byclxrtx.html