美文网首页
ThreadPoolExecutor详解

ThreadPoolExecutor详解

作者: 永远的太阳0123 | 来源:发表于2018-12-14 17:28 被阅读0次

    1 ThreadPoolExecutor中的状态

    (1)RUNNING:可以接受新任务,可以处理阻塞队列中的任务。
    (2)SHUTDOWN:不接受新任务,可以处理阻塞队列中的任务,中断空闲Worker。
    (3)STOP:不接受新任务,不处理阻塞队列中的任务,中断所有Worker。
    (4)TIDYING:所有任务执行完毕,workerCount等于0,开始执行terminated方法。
    (5)TERMINATED:terminated方法执行完毕。

    2 ThreadPoolExecutor中的状态转换

    (1)RUNNING到SHUTDOWN:调用了shutdown方法。
    (2)(RUNNING或SHUTDOWN)到STOP:调用了shutdownNow方法。
    (3)SHUTDOWN到TIDYING:阻塞队列中和线程池中没有任务。
    (4)STOP到TIDYING:线程池中没有任务。
    (5)TIDYING到TERMINATED:terminated方法执行完毕。

    3 ThreadPoolExecutor中的部分字段

    (1)ctl的高3位代表线程池的状态,低29位代表workerCount。
    (2)COUNT_BITS=32-3=29。
    (3)CAPACITY=2^29-1=536870911,二进制为00011111111111111111111111111111。
    (4)RUNNING的二进制为11100000000000000000000000000000。
    (5)SHUTDOWN的二进制为00000000000000000000000000000000。
    (6)STOP的二进制为00100000000000000000000000000000。
    (7)TIDYING的二进制为01000000000000000000000000000000。
    (8)TERMINATED的二进制为01100000000000000000000000000000。
    (9)runStateOf方法可以获取线程池的状态。
    (10)workerCountOf方法可以获取workerCount。
    (11)ctlOf方法可以根据线程池的状态和workerCount获取ctl。
    (12)runStateLessThan方法可以比较线程池的两个状态。
    (13)runStateAtLeast方法可以比较线程池的两个状态。
    (14)isRunning方法可以判断线程池是否处于Running状态。

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    

    (15)workQueue:阻塞队列。
    (16)mainLock:访问workers集合、访问largestPoolSize等线程池的统计型变量、执行shutdown方法、执行shutdownNow方法都需要获取该锁。
    (17)workers:保存ThreadPoolExecutor.Worker对象的HashSet集合。只有获取mainLock,才能访问该集合。
    (18)termination:和mainLock绑定的Condition对象,执行awaitTermination方法和tryTerminate方法时使用该Condition对象。
    (19)threadFactory:线程工厂。
    (20)handler:拒绝策略。
    (21)keepAliveTime:空闲线程的存活时间。
    (22)allowCoreThreadTimeOut:是否允许结束空闲的核心线程。如果allowCoreThreadTimeOut等于true,则keepAliveTime必须大于0。
    (23)corePoolSize:核心线程数。
    (24)maximumPoolSize:最大线程数。
    (25)defaultHandler:默认的拒绝策略。

        private final BlockingQueue<Runnable> workQueue;
        private final ReentrantLock mainLock = new ReentrantLock();
        private final HashSet<Worker> workers = new HashSet<Worker>();
        private final Condition termination = mainLock.newCondition();
        private volatile ThreadFactory threadFactory;
        private volatile RejectedExecutionHandler handler;
        private volatile long keepAliveTime;
        private volatile boolean allowCoreThreadTimeOut;
        private volatile int corePoolSize;
        private volatile int maximumPoolSize;
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    

    4 ThreadPoolExecutor中的构造方法

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    4.1 Executors中的defaultThreadFactory方法

        public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
    

    4.1.1 Executors.DefaultThreadFactory内部类

    Executors.DefaultThreadFactory实现了ThreadFactory接口。

        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    
    4.1.1.1 ThreadFactory接口

    ThreadFactory接口中只定义了newThread方法。

    public interface ThreadFactory {
        Thread newThread(Runnable r);
    }
    

    5 ThreadPoolExecutor.Worker内部类

    ThreadPoolExecutor.Worker继承了AbstractQueuedSynchronizer,实现了Runnable接口。

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
            // Worker中的线程对象
            final Thread thread;
            // 线程的第一个任务
            Runnable firstTask;
            volatile long completedTasks;
    
            // 传入的firstTask可以等于null
            Worker(Runnable firstTask) {
                // 将state设为-1
                // 执行runWorker方法之前,state一直等于-1
                // 查看interruptIfStarted方法可以发现,如果state小于0,不允许中断线程对象
                setState(-1);
                this.firstTask = firstTask;
                // 使用线程工厂创建线程对象
                this.thread = getThreadFactory().newThread(this);
            }
    
            public void run() {
                runWorker(this);
            }
    
            // state等于0代表Worker锁未被占用,state等于1代表Worker锁已被占用
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    6 ThreadPoolExecutor中的execute方法

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // 获取ctl的数值
            int c = ctl.get();
            // 如果workerCount小于核心线程数
            if (workerCountOf(c) < corePoolSize) {
                // 尝试启动一个新线程,并且将这个线程的第一个任务设为command
                // 如果新线程启动成功,直接返回
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 运行到这里有两种可能,要么workerCount大于等于核心线程数,要么新线程启动失败
            // 如果线程池处于RUNNING状态,并且成功将当前任务command添加到阻塞队列中
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 成功将当前任务command添加到阻塞队列中之前,其它线程可能尝试关闭线程池,因此需要重新获取线程池的状态
                // 如果线程池已不处于RUNNING状态,并且成功从阻塞队列中移除了command
                if (! isRunning(recheck) && remove(command))
                    // 执行拒绝策略
                    reject(command);
                // 运行到这里有两种可能,要么线程池还处于RUNNING状态,要么线程池已不处于RUNNING状态但command已开始执行或执行完毕
                // 如果workerCount等于0
                else if (workerCountOf(recheck) == 0)
                    // 尝试启动一个新线程,并且将这个线程的第一个任务设为null
                    addWorker(null, false);
            }
            // 运行到这里有两种可能,要么线程池不处于RUNNING状态,要么阻塞队列已满
            // 尝试启动一个新线程,并且将这个线程的第一个任务设为command
            // 如果新线程启动失败,说明线程池不处于RUNNING状态或workerCount已达到最大线程数
            else if (!addWorker(command, false))
                // 执行拒绝策略
                reject(command);
        }
    

    6.1 ThreadPoolExecutor中的addWorker方法

        // firstTask:线程的第一个任务,可以传入null
        // core:线程数量的界限。true代表以corePoolSize作为界限,false代表以maximumPoolSize作为界限
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                // 如果线程池处于STOP或TIDYING或TERMINATED状态,直接返回false
                // 如果线程池处于SHUTDOWN状态,firstTask不等于null,直接返回false
                // 如果线程池处于SHUTDOWN状态,firstTask等于null,阻塞队列中没有任务,直接返回false
                // 如果线程池处于SHUTDOWN状态,firstTask等于null,阻塞队列中还有任务,可以创建Worker
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                for (;;) {
                    int wc = workerCountOf(c);
                    // 如果workerCount已达到线程数量的界限,直接返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 如果CAS操作成功,说明所有条件都已满足,可以开始创建Worker
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();
                    // 如果CAS操作失败,说明还有其它线程正在创建Worker
                    // 此时需要重新获取线程池的状态。如果状态不变,继续内层循环;如果状态改变,回到外层循环
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
            // 运行到这里,开始创建Worker
            // workerStarted代表是否已启动新创建的Worker中的线程对象
            boolean workerStarted = false;
            // workerAdded代表是否已将新创建的Worker添加到workers这个HashSet中
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 传入firstTask,创建Worker
                w = new Worker(firstTask);
                // 获取Worker中的线程对象
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
                        // 获取mainLock之前,其它线程可能尝试关闭线程池,因此需要重新获取线程池的状态
                        // 如果(线程池处于RUNNING状态)或(线程池处于SHUTDOWN状态并且firstTask等于null)
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 如果已启动Worker中的线程对象
                            if (t.isAlive())
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                // 更新largestPoolSize
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 如果已将新创建的Worker添加到workers这个HashSet中
                    if (workerAdded) {
                        // 启动新创建的Worker中的线程对象
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果没有启动新创建的Worker中的线程对象
                if (! workerStarted)
                    // 回滚操作
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    6.1.1 ThreadPoolExecutor中的addWorkerFailed方法

        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                // workerCount减1
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    

    6.1.2 ThreadPoolExecutor.Worker中的run方法

    启动新创建的Worker中的线程对象,JVM会自动调用Worker中的run方法。

        public void run() {
            runWorker(this);
        }
    
    6.1.2.1 ThreadPoolExecutor中的runWorker方法
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            // 将Worker中的state设为0,允许中断线程对象
            w.unlock();
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    // 执行任务前,获取Worker锁
                    w.lock();
                    // 如果线程池处于STOP或TIDYING或TERMINATED状态,确保当前线程已被中断
                    // 如果线程池处于RUNNING或SHUTDOWN状态,确保当前线程未被中断
                    // interrupted方法会清除当前线程的中断状态
                    // 如果线程池处于RUNNING或SHUTDOWN状态,interrupted方法返回true,说明在此之前当前线程已被中断,现在当前线程的中断状态已被清除。其它线程可能调用shutdownNow方法,因此需要重新判断线程池是否处于RUNNING或SHUTDOWN状态
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        // 中断当前线程
                        wt.interrupt();
                    try {
                        // 空方法,子类中可以重写该方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 执行Runnable对象中的run方法不会抛出Checked Exception
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 空方法,子类中可以重写该方法
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        // 执行任务后,释放Worker锁
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // 运行到这里,需要结束当前线程
                // 运行到这里有两种可能,要么getTask方法返回null,要么执行任务时抛出了异常或错误
                // 第一种情况,completedAbruptly等于false,执行getTask方法时workerCount已减1
                // 第二种情况,completedAbruptly等于true,workerCount未减1
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    6.1.2.1.1 ThreadPoolExecutor中的getTask方法
        private Runnable getTask() {
            // timedOut代表上一次调用poll方法获取任务是否超时
            boolean timedOut = false;
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                // 如果(线程池处于STOP或TIDYING或TERMINATED状态)或(线程池处于SHUTDOWN状态并且阻塞队列中没有任务)
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    // workerCount减1
                    decrementWorkerCount();
                    return null;
                }
                int wc = workerCountOf(c);
                // timed代表如果获取任务超时是否允许结束Worker
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 如果workerCount大于最大线程数,尝试CAS操作
                // 如果(上一次调用poll方法获取任务超时)并且(获取任务超时允许结束Worker)并且(workerCount大于1),尝试CAS操作
                // 如果(上一次调用poll方法获取任务超时)并且(获取任务超时允许结束Worker)并且(阻塞队列中没有任务),尝试CAS操作
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    // 如果CAS操作成功,workerCount减1
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
                try {
                    // 如果获取任务超时允许结束Worker,调用poll方法获取任务
                    // 如果获取任务超时不允许结束Worker,调用take方法获取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    // 如果成功获取任务
                    if (r != null)
                        return r;
                    // 运行到这里,说明调用poll方法获取任务超时
                    timedOut = true;
                } catch (InterruptedException retry) {
                    // 调用poll方法或take方法获取任务时,当前线程可能被中断,此时需要捕获InterruptedException,重置timedOut,重新开始循环
                    timedOut = false;
                }
            }
        }
    
    6.1.2.1.2 ThreadPoolExecutor中的processWorkerExit方法
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 如果completedAbruptly等于true
            if (completedAbruptly)
                // workerCount减1
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                // 从workers这个HashSet中移除当前Worker
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            // 如果线程池处于RUNNING或SHUTDOWN状态
            if (runStateLessThan(c, STOP)) {
                // 如果completedAbruptly等于false,说明getTask方法返回null,需要判断是否尝试启动一个新线程
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    // 如果允许结束空闲的核心线程,并且阻塞队列中还有任务
                    if (min == 0 && ! workQueue.isEmpty())
                        // 至少保留一个线程
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return;
                }
                // 尝试启动一个新线程,并且将这个线程的第一个任务设为null
                addWorker(null, false);
            }
        }
    

    6.2 ThreadPoolExecutor中的reject方法

        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    

    6.2.1 ThreadPoolExecutor中的四种拒绝策略

    (1)CallerRunsPolicy:如果线程池处于Running状态,直接在提交任务的线程中执行任务;如果线程池不处于Running状态,直接丢弃任务。

        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public CallerRunsPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
        public boolean isShutdown() {
            return ! isRunning(ctl.get());
        }
    

    (2)AbortPolicy:抛出异常。ThreadPoolExecutor默认使用该拒绝策略。

        public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    (3)DiscardPolicy:直接丢弃任务。

        public static class DiscardPolicy implements RejectedExecutionHandler {
            public DiscardPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    

    (4)DiscardOldestPolicy:如果线程池处于Running状态,从阻塞队列中移除头部任务并重新调用execute方法。

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public DiscardOldestPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    

    6.3 ThreadPoolExecutor中的remove方法

        public boolean remove(Runnable task) {
            // 从阻塞队列中移除指定任务
            boolean removed = workQueue.remove(task);
            tryTerminate();
            return removed;
        }
    

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor详解

          本文链接:https://www.haomeiwen.com/subject/ymnghqtx.html