美文网首页工作生活
J.U.C之线程池05:源码解析-work执行

J.U.C之线程池05:源码解析-work执行

作者: 贪睡的企鹅 | 来源:发表于2019-07-02 23:14 被阅读0次

work 工作线程

通过上节我们将任务提交给work。并启动work线程。在我们看run方法之前我们看下work类的整体设计。

工作线程Worker的设计

private final class Worker extends AbstractQueuedSynchronizer
            implements Runnable {
  • work的本质是一个Runnable,而创建一个work线程是使用ThreadFactory创建一个Thread并将work做其属性new Tread(work),同时将这个thread设置的work属性中。多么巧妙的设计!!

  • work为什么需要AQS同步状态,因为同一个时间只能完成一个任务。因而需要在其开始工作和结束工作获取同步状态(加锁),释放同步状态(解锁)
    当它的state属性为0时表示unlock,state为1时表示lock。任务执行时必须在lock状态的保护下,防止出现同步问题。因此当Worker处于lock状态时,表示它正在运行,当它处于unlock状态时,表示它“空闲”。当它空闲超过keepAliveTime时,就有可能被回收。

 /**
     * work使用AQS同步锁,用来判断当前work能否接收新任务
     *
     * 同步状态0,表示空闲 可以接收新任务
     * 同步状态1,表示正在执行任务 无法接收新任务
     *
     * 获取同步状态将 同步状态设置为1 ,释放同步状态设置为0
     */
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {

        private static final long serialVersionUID = 6138294804551838833L;

        /** 工作线程*/
        final Thread thread;
        /** 初始化Worker,分配的第一个任务 */
        Runnable firstTask;
        /** 每个work执行的任务数量 */
        volatile long completedTasks;

        /**
         * 实例化Worker
         */
        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 工作线程执行,调用外部TheadPoolExecutor.runWorker方法  */
        public void run() {
            runWorker(this);
        }


        /**
         * 判断当前Work是否空闲
         */
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        /**
         * tryAcquire 为AQS 尝试获取独占同步状态模板方法实现。
         */
        protected boolean tryAcquire(int unused) {

            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * tryRelease为AQS 尝试释放独占同步状态模板方法实现。
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 获取独占同步状态
         */
        public void lock()        { acquire(1); }

        /**
         * 尝试获取同步状态
         */
        public boolean tryLock()  { return tryAcquire(1); }

        /**
         * 释放独占同步状态
         */
        public void unlock()      { release(1); }

        /**
         * 判断能够护球同步状态
         */
        public boolean isLocked() { return isHeldExclusively(); }

        /**
         * 中断work正在执行任务
         */
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

工作线程Worker的执行

work执行的核心是有条件无限循环,只要getTask不为null,线程会一直运行。不断获取任务执行。

**
     * work执行逻辑。
     * 内部存在一个for循环,不断循环获取任务执行。当线程池状态还在运行,work线程会一直运行不会推出循环
     * getTask()线程返回null时退出,一般可能当前work超时被销毁或线程池不在运行。
     * @param w
     */
    final void runWorker(Worker w) {
        /** 获取当前线程 **/
        Thread wt = Thread.currentThread();
        /** 获取执行任务**/
        Runnable task = w.firstTask;
        /** 将任务从work清理**/
        w.firstTask = null;
        /** 初始化同步状态为0(创建时为-1) **/
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            /**
             * 如果当前work中存在任务则执行,不存在则从WorkQueue获取任务
             * getTask()!=null 时work永远不停止
             *  **/
            while (task != null || (task = getTask()) != null) {
                /** 获取work独占同步状态 **/
                w.lock();

                               /** 如果当前线程池的状态为STOP,将work中工作线程标记为中断
                 * 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
                 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
                 是,再次设置中断标示,wt.interrupt()
                 *    否,不做操作,清除中断标示后进行后续步骤
                 *
                 * **/
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();


                try {
                    /** 模板方法给子类扩展 **/
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        /** 处理任务 **/
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        /** 模板方法给子类扩展 **/
                        afterExecute(task, thrown);
                    }
                } finally {
                    /** 重置任务 **/
                    task = null;
                    /** work执行的任务数量  **/
                    w.completedTasks++;
                    /** 释放work独占同步状态 **/
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

从WorkQueue获取任务

getTask()返回阻塞队列中等待的任务。当以下几种情况返回null 告知work需要销毁

1 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空

2 判断是否需要超时控制如果需要超时则等待超时时间内返回任务。超过时间返回null

 /**
     * 从WorkQueue获取任务
     * 同时用来判断work何时退出销毁
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /** 无限循环,
         *  当work超过指定时间没有获取时,设置timedOut = true进行二次遍历时销毁当前work **/
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /** 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则停止worker - 1,return null **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            /** 获取work数量 **/
            int wc = workerCountOf(c);

            /**  判断是否需要开启work淘汰机制 **/
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


            /**
             * 以下几种情况直接销毁当前work
             *
             * 超时没有获取任务timedOut=tue,for循环遍历第二次时
             * 当前任务超过maximumPoolSize
             * **/
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                /**
                 * 如果开启work淘汰机制超时获取任务,调用poll阻塞获取任务,存在超时,如果超时没有获取到任务
                 * 设置timedOut = true 进入第二次循环销毁
                 *
                 * 如果没开启work淘汰机制超时获取任务,调用take阻塞获取任务
                 * 【这里的阻塞都能被中断响应!!】
                 **/
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

work退出回收

processWorkerExit

    /**
     * 执行work销毁退出操作
     * work 要结束的worker
     * completedAbruptly 表示是否需要对work数量-1操作
     *  runWorker 正常执行时 completedAbruptly 为false
     *  runWorker 执行出现异常 completedAbruptly 为true
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        /** 从workers 集合中移除worker **/
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        /** 尝试将线程池状态设置为Terminate **/
        tryTerminate();

        int c = ctl.get();
        /**  **/
        if (runStateLessThan(c, STOP)) {
            /** 如果 work正常退出,需要判断当前线程数量 < 要维护的线程数量 如果是addWorker()添加一个非核心work **/
            if (!completedAbruptly) {
                /**
                 * 如果允许回收核心线程,且workQueue还存在需要处理任务 work线程需要大于1
                 * 如果不允许回收核心线程,则work线程需要大于corePoolSize
                 * **/
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            /** 如果 work 是异常退出  addWorker() 添加一个非核心work**/
            addWorker(null, false);
        }
    }

相关文章

网友评论

    本文标题:J.U.C之线程池05:源码解析-work执行

    本文链接:https://www.haomeiwen.com/subject/qevxhctx.html