美文网首页
ThreadPoolExecutor 源码分析

ThreadPoolExecutor 源码分析

作者: 想起个帅气的头像 | 来源:发表于2020-12-06 23:26 被阅读0次

    前言

    本文重点分析了ThreadPoolExecutor两个方法execute() 和 submit() 的执行原理,并说明Future如何实现阻塞返回。

    继承关系图

    关键方法介绍

    构造方法

        /**
         * @param corePoolSize   核心线程数
         * @param maximumPoolSize  最大线程数
         * @param keepAliveTime 临时线程保留时间
         * @param unit  临时线程保留时间单位
         * @param workQueue 阻塞队列
         * @param threadFactory  线程工程
         * @param handler  拒绝策略
         */
     public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    为了方便区分,本文会将超过核心线程数创建的线程叫临时线程,本质上这两类线程没有任何区别,到期回收哪个线程完全是跟当时线程池哪个线程先被空闲有关,跟创建时间的先后无关

    execute(Runnable command)

    默认参数

    先介绍主要方法实现前,先说明一些静态变量的含义和值。

    ctl 官方给出的注释是The main pool control state,这个值包含了两部分,workerCount和runState。

    int COUNT_BITS = Integer.SIZE - 3 = 29; 一共32位,高3位表示线程池的运行状态,低29位表示线程池中的线程数量。是一种高低位的实现。

    用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况

    int CAPACITY = (1 << COUNT_BITS) - 1 = 536870912;也就是从的线程容量是536870912个。

    RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 都是用高3位表示不同的含义。低29位都是0

    具体值参考下表:

       private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 RUNNING | 0 = -536870912 , 1110 0000 + 24位0 
        private static final int COUNT_BITS = Integer.SIZE - 3;   //29   高3位表示状态  低29表示线程数量
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  //536870912  0001 1111 + 24位1
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;  // -536870912  1110 0000 + 24位0
        private static final int SHUTDOWN   =  0 << COUNT_BITS; // 0            0000 0000 + 24位0 
        private static final int STOP       =  1 << COUNT_BITS;  // 536870912   0010 0000 + 24位0
        private static final int TIDYING    =  2 << COUNT_BITS;  // 1073741824  0100 0000 + 24位0
        private static final int TERMINATED =  3 << COUNT_BITS; // 1610612736   0110 0000 + 24位0
    
        // Packing and unpacking ctl
        // 如果c是默认值-536870912, 
        // runStateOf = (-536870912 & ~29) = -536870912, 
        // workerCountOf = (-536870912 & 29) = 0
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }  
    
    源码分析
     /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //c = -536870912
            int c = ctl.get();
            // workerCountOf(c) = 0
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    首先,这个execute有三个主要的if判断:

          //判断当前线程池中的线程数量有没有到核心线程数,没有就创建新的worker来处理任务。
          if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    
          //执行到此处,说明此时线程池的线程数已经超过了coolPoolSize。先判断线程池状态,且尝试将任务添加到阻塞队列里。
          if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    
            // 最后意味着此时阻塞队列已满,尝试创建新的worker来处理,不能创建则执行拒绝策略。
            else if (!addWorker(command, false))
                reject(command);
    
    addWorker()

    很长的一个方法,注释就不贴了,两个参数分别是当前要执行的任务和core(表示要创建的是核心线程还是临时线程)。
    这里的worker是真正负责处理任务的对象,worker内部封装了所属线程和待执行的任务.

     private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
            // ...
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
           
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    

    接下来主要看addWorker方法的实现。

        private boolean addWorker(Runnable firstTask, boolean core) {
          //第一部分
          retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            //第二部分
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    第一部分:
    rs表示是线程池的状态,先校验线程池状态和队列数量。前文已经提过,RUNNING的值是负数,SHOTDOWN是0,其他值都是正数。

    之后是for循环,判断容量和是否超过了预设的线程数量。
    如果成功增加了workerCount的值就跳出循环,开始执行任务。
    如果失败,说明有并发情况,就重新获取ctl,判断rs状态是否变了,从而决定是重新执行一遍大或小循环。

    for循环结束后,说明当前可以增加worker对象。此时就真正创建对象开始执行任务。

    第二部分:
    在创建worker对象时,构造方法中也创建了一个Thread。并通过lock来保证原子性,校验状态之后将worker对象add到HashSet中。
    private final HashSet<Worker> workers = new HashSet<Worker>();

    添加后,释放锁并start线程。

    如果在addWorker过程中失败,且第一阶段顺利完成,就从hashSet中移除,并减少workerCount。

    /**
         * Rolls back the worker thread creation.
         * - removes worker from workers, if present
         * - decrements worker count
         * - rechecks for termination, in case the existence of this
         *   worker was holding up termination
         */
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    

    如果添加任务顺利,则在t.start();执行完成后,主要任务就完成了并返回true。此时线程会执行worker对象内的run方法。

    worker内 run()
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    

    runWorker真正执行,这个this只得是worker对象,task和线程都已经封装到worker内了。

    
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 如果线程池已经是STOP或TIDYING或TERMINATED,需要将线程也主动中断
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    这里说明一些最核心的逻辑
    执行过程:

    1. 在while中判断当前的task和队列中的task,如果当前task != null,说明是线程是伴随着任务一起创建的,直接调用task.run来执行。
    2. 第一圈执行完成后,task=null,第二次执行while时,需要从getTask中取task来执行。
    3. 当getTask() 返回null时,while结束,设置completedAbruptly = false;表明任务时正常结束。最后调用processWorkerExit来退出线程。

    这里提供了两个方法:beforeExecute 和 afterExecute,task.run()的切面,我们可以定义worker的子类,来实现扩展,比如加入一些监控等。

    getTask() 返回null就代表着线程可以正常结束,那么什么情况下会返回null?

    getTask()
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    getTask() 的主要任务是从阻塞队列中获取task。通过判断当前的wc 是否超过了核心线程数,来决定poll还是take来取任务。
    如果超过了,说明此时已经创建过了临时线程,临时线程的有效期就是等待从队列返回的时间,超过这个时间没有取到,则设置timeOut表示已经超时,在下一次for循环的if判断中,返回null,让这个临时线程自动结束。
    如果没超过,说明此时还处在核心线程的阶段,可以take长期等待。

    至此,run方法的执行过程就此完成。

    任务是如何添加到队列中的,还得回到execute方法。

          if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    

    如果已经达到核心线程数,就不能在继续addWorker,而是要offer到workQueue中,并再次检查线程池状态。

    如果offer失败,说明阻塞队列已满,此时需要继续创建新的worker来完成任务。

            else if (!addWorker(command, false))
                reject(command);
    

    这里的false代表 创建时和最大线程数进行比较,如果超过了最大线程数,则调用reject来执行拒绝策略。

    reject()
    /**
         * Invokes the rejected execution handler for the given command.
         * Package-protected for use by ScheduledThreadPoolExecutor.
         */
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    

    4种默认的拒绝策略

    AbortPolicy : 直接抛出异常(默认策略)
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
    
    DiscardPolicy : 什么也不处理
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    
    DiscardOldestPolicy : 把当前最早在队列的任务丢弃,并将再次执行此任务(可能会直接执行,也可能被加到队列中)
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
    
    CallerRunsPolicy : 由当前线程来直接执行run,不再交给线程池。
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
    

    submit 源码分析

    submit()

        Future<?> future = Executors.newCachedThreadPool().submit(new Thread());
        ...
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    

    submit方法可用于带返回值的任务执行。可以返回Future来获取线程的执行结果,具体的实现定义在AbstractExecutorService中。

    首先创建了一个FutureTask对象,传入了要执行的任务。把封装后的FutureTask交给execute来执行。

      /**
         * Returns a {@code RunnableFuture} for the given runnable and default
         * value.
         *
         * @param runnable 要执行的任务
         * @param 返回的默认值
         * @param <T> the type of the given value
         */
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    

    FutureTask

    继承关系图和构造方法
        /**  
         * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        // FutureTask 可能的状态列表
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        /** 执行的任务 */
        private Callable<V> callable;
        /** get() 的返回值,即最终的执行结果 */
        private Object outcome; // non-volatile, protected by state reads/writes
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
        /** Treiber stack of waiting threads */
        // 单项列表的node
        private volatile WaitNode waiters;
    
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    FutureTask 既然将任务封装到了callable属性中,且它自身还是一个Runnable,那么真正执行一定在run方法中。而get() 是一个阻塞方法,当执行完成后,可以获取返回值,否则就等待。

    那重点看下run() 和 get() 的实现。

    get()
    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
     private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    state 有多种状态,用来标记当前任务的执行情况,如果已经是完成状态,通过report方法直接返回outcome即可。
    如果还未到达完成态,就说明当前任务还在执行,此时需要await等待,也就是awaitDone。

    awaitDone()
    /**
         * Awaits completion or aborts on interrupt or timeout.
         *
         * @param timed true if use timed waits
         * @param nanos time to wait, if timed
         * @return state upon completion
         */
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    

    awaitDone的两个参数分别用于表示是否有等待时间,以及等待时间的纳秒数。
    如果有等待时间,deadline就是截止时间。
    下面则是主要逻辑:
    一般来说,这里的for循环会执行3圈,(不考虑已经执行完成和中断的情况)。

    1. 第一圈:因为WaitNode q 最初被赋值为null,在run执行完之前,state是NEW,所以for循环会执行q=null的逻辑,先创建一个WaitNode对象。
    2. 第二圈:因为q此时有值,但queued是false,此时for循环执行! queued的逻辑,如果设置成功,则queued = true。
    3. 第三圈:LockSupport.park(this); (如果有deadline,就判断是否超时了)此时线程进入阻塞状态等待唤醒。
    queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        //背景:
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters;
    

    重点就是这一句。
    这一句是做了两个事情:

    1. 构建waiters的Node单向链表
    2. 如果添加队列成功就返回true。
    这里为什么要构建单向链表?

    一般来说,一个task通过一个get()方法等待获取就OK了,是一个单任务。但如果,同一个FutureTask的get() 方法被多个线程调用时,多个线程(可能)会同时处于阻塞状态,这时就需要一个存储介质来存储这些等待线程,这里是通过单链表来实现
    构建单向链表的过程如下:

    1. 第一次调用get():
      当前waiters = null;q.next = waiters(null); waiters = q; 即waiters的头节点是q,q.next是null。
    2. 第二次调用get(); 如果当前的任务命名为p;
      当前waiters = q; p.next = waiter(q); waiters = p; 即构建了一个 p -> q的链表结构,waiters是头节点p。
    3. 第三次调用get(); 如果当前的任务命名为r;
      最后的效果是 r -> p -> q; 可以看出来是头插法。
    run()
     public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
        private void handlePossibleCancellationInterrupt(int s) {
            // It is possible for our interrupter to stall before getting a
            // chance to interrupt us.  Let's spin-wait patiently.
            if (s == INTERRUPTING)
                while (state == INTERRUPTING)
                    Thread.yield(); // wait out pending interrupt
            // We want to clear any interrupt we may have received from
            // cancel(true).  However, it is permissible to use interrupts
            // as an independent mechanism for a task to communicate with
            // its caller, and there is no way to clear only the
            // cancellation interrupt.
            //
            // Thread.interrupted();
    }
    

    run() 比较简单,如果当前FutureTask是NEW的状态,就调用callable.call(),将执行完成的result通过set方法设置到outcome中。
    且无论成功失败,都将runner线程置为null,并判断执行过程中是否被其他线程中断,如果因为中断而失败,则此线程一直交出时间片,直到状态从INTERRUPTING变成INTERRUPTED。

    如果成功执行且没有被中断过,则通过set方法进行返回值的设置。

    set()
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    

    先判断此时状态是NEW,则改成COMPLETING,设置outcome后,状态改成NORMAL(完成态),调用finishCompletion来唤醒等待中的线程。

    finishCompletion()
    /**
         * Removes and signals all waiting threads, invokes done(), and
         * nulls out callable.
         */
        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
            // done是一个空方法,给子类重写用。
            done();
    
            callable = null;        // to reduce footprint
        }
    

    这个方法比较简单,可以看到就是在遍历waiters单链表,依次唤醒内部的阻塞线程。(阻塞的发起点是get方法)。

    总结

    execute()

    实现思想:
    1. task因为交由线程池来执行,线程池的线程直接调用task中的run,而不是执行task.start()。
    2. 如果当前线程池中的线程数 < corePoolSize ,就创建新的线程添加到线程池中(HashSet存储)。
    3. 如果当前的线程数 > corePoolSize 就先存放到阻塞队列里
    4. 如果阻塞队列已满,且 < maximumPoolSize,就创建新的线程添加到线程池中(HashSet存储),当keepAliveTime的时间没有处理任务,则销毁(也就是让run方法结束)。
    5. 如果已经超过maximumPoolSize,则根据拒绝策略执行。
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
    

    submit()

    实现思想:

    任务执行的思想还是execute,阻塞等待返回值的思想是通过Future完成。实现类是FutureTask。

    1. get()返回值时如果还未完成,将当前线程封装成WaiterNode,进行LockSupport.park,并将所有park的线程按照头插法构建一个单向链表。
    2. run() 执行完成后,将内部的outcome属性设置成当前FutureTask的返回值,并unpark单链表中的所有阻塞线程,这些线程的get()会直接返回outcome的值。

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor 源码分析

          本文链接:https://www.haomeiwen.com/subject/egcbwktx.html