美文网首页三个JAVA臭皮匠程序员
并发编程之ThreadPoolExecutor(三)

并发编程之ThreadPoolExecutor(三)

作者: 后厂村老司机 | 来源:发表于2018-05-24 21:30 被阅读68次

    前言:

    ThreadPoolExecutor类是Executor框架的核心,负责创建线程池,执行线程池任务,下图为ThreadPoolExecutor整体架构!

    1:ThreadPoolExecutor类:

    这个类我们常用来创建线程池,我们来深八一下底层到底是怎么实现的。要想了解此类的实现我们首先需要了解一下ThreadPoolExecutor,而且实际上《阿里编程规约》里面要求我们用ThreadPoolExecutor而不是Executors类来创建线程池以做到更精确的配置。

    ThreadPoolExecutor类的成员变量:

        //高三位表示状态,低29位表示有效线程数量,所以线程池最多可容纳线程为2的29次方-1

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

        private static final int COUNT_BITS = Integer.SIZE - 3;

        //线程数量的最大比特位数

        private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

        //线程池5种状态

        private static final int RUNNING    = -1 << COUNT_BITS;

        private static final int SHUTDOWN  =  0 << COUNT_BITS;

        private static final int STOP      =  1 << COUNT_BITS;

        private static final int TIDYING    =  2 << COUNT_BITS;

        private static final int TERMINATED =  3 << COUNT_BITS;

        // 打包线程状态和线程数量,以及解包还原两个数值

        private static int runStateOf(int c)    { return c & ~CAPACITY; }

        private static int workerCountOf(int c)  { return c & CAPACITY; }

        private static int ctlOf(int rs, int wc) { return rs | wc; }

        //阻塞队列用于存放线程执行的任务

        private final BlockingQueue<Runnable> workQueue;

        //用于保存线程池里的线程,访问该Set需要mainLock锁

        private final HashSet workers = new HashSet();

        //用于创建线程的线程工厂,该工厂调用addWorker生产线程

        private volatile ThreadFactory threadFactory;

        //线程池保存的最小存活线程数量

        private volatile int corePoolSize;

        //线程池最大存活的线程数量

        private volatile int maximumPoolSize;

        //线程池饱和或者线程池关闭时的处理策略,默认为拒绝接受任务

        private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    两个关键字段:workerCount和runState,前者表示当前线程池里有效线程数量,后者表示当前线程池的状态。其他字段注解里已经说明。我们来重点关注下worker,即真正干活的线程长什么样:

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

            private static final long serialVersionUID = 6138294804551838833L;

            //这里的Thread就是真正干活儿的工人了

            final Thread thread;                             

            Runnable firstTask;

            volatile long completedTasks;

            Worker(Runnable firstTask) {

                setState(-1);

                this.firstTask = firstTask;

                this.thread = getThreadFactory().newThread(this);

            }

           //调用Thread里面Run方法的方法

            public void run() {

                runWorker(this);

            }

            // 0代表非锁状态,1代表锁定状态,这个在熟悉不过了,AQS框架里的资源

            protected boolean isHeldExclusively() {

                return getState() != 0;

            }

            protected boolean tryAcquire(int unused) {

                if (compareAndSetState(0, 1)) {

                    setExclusiveOwnerThread(Thread.currentThread());

                    return true;

                }

                return false;

            }

            protected boolean tryRelease(int unused) {

                setExclusiveOwnerThread(null);

                setState(0);

                return true;

            }

            public void lock()        { acquire(1); }

            public boolean tryLock()  { return tryAcquire(1); }

            public void unlock()      { release(1); }

            public boolean isLocked() { return isHeldExclusively(); }

            void interruptIfStarted() {

                Thread t;

                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

                    try {

                        t.interrupt();

                    } catch (SecurityException ignore) {

                    }

                }

            }

        }

    上述代码就是Worker类的全貌了,他实际上是AQS类的儿子,里面自定义实现了lock和unlock方法,设置独占AQS的state资源。最重要的就是里面组合了Thread,该Worker的Thread有ThreadFactory工厂生产,并通过run方法实现线程的执行。

    认识了这些worker了我们就得考虑怎么把这些worker放到池子里:

    private boolean addWorker(Runnable firstTask, boolean core) {

            retry:

            for (;;) {

                int c = ctl.get();

                int rs = runStateOf(c);

                // Check if queue empty only if necessary.

                if (rs >= SHUTDOWN &&

                    ! (rs == SHUTDOWN &&

                      firstTask == null &&

                      ! workQueue.isEmpty()))

                    return false;

                for (;;) {

                    int wc = workerCountOf(c);

                    if (wc >= CAPACITY ||

                        wc >= (core ? corePoolSize : maximumPoolSize))

                        return false;

                    if (compareAndIncrementWorkerCount(c))

                        break retry;

                    c = ctl.get();  // Re-read ctl

                    if (runStateOf(c) != rs)

                        continue retry;

                    // else CAS failed due to workerCount change; retry inner loop

                }

            }

            //我是分割线,我的上面是判断当前线程池是否满足添加线程的条件,不满足当然加不了喽

            boolean workerStarted = false;

            boolean workerAdded = false;

            Worker w = null;

            try {

                w = new Worker(firstTask);

                final Thread t = w.thread;

                if (t != null) {

                    final ReentrantLock mainLock = this.mainLock;

                    mainLock.lock();

                    try {

                        int rs = runStateOf(ctl.get());

                        if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {

                            if (t.isAlive())

                                throw new IllegalThreadStateException();

                            workers.add(w);

                            int s = workers.size();

                            if (s > largestPoolSize)

                                largestPoolSize = s;

                            workerAdded = true;

                        }

                    } finally {

                        mainLock.unlock();

                    }

                    if (workerAdded) {

                        t.start();

                        workerStarted = true;

                    }

                }

            } finally {

                if (! workerStarted)

                    addWorkerFailed(w);

            }

            return workerStarted;

        }

    private void addWorkerFailed(Worker w) {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                if (w != null)

                    workers.remove(w);

                decrementWorkerCount();

                tryTerminate();

            } finally {

                mainLock.unlock();

            }

        }

    不要被上面冗长的代码吓坏了,分割线以上是判断当前线程池的状态是否满足添加新线程的条件。下面才是真正添加线程的代码,而且下面的代码也很好理解:首先调用Worker构造方法利用其内部的ThreadFactory创建一个线程并把任务塞给这个新线程;然后把新线程的模型worker加入到内部的worker的Set中、改变线程池的一些属性,由于这个过程不是线程安全的(workers实际是HashSet)所以采用了锁;最后调用这个线程的start方法真正启动该线程,线程启动肯定是执行Runnable的run方法啊,run 方法在哪?好好看看Worker类(实现了Runnable)。如果添加失败了没关系,还是锁定操作worker的set集合。

    好了,上面就是我们的线程池模型,现在工人也有了,工人也被我们添加到池子里去了,下面应该让工人干活儿了,工人干活儿的方法是:

    final void runWorker(Worker w) {

            Thread wt = Thread.currentThread();

            Runnable task = w.firstTask;

            w.firstTask = null;

            w.unlock(); // allow interrupts

            boolean completedAbruptly = true;

            try {

                while (task != null || (task = getTask()) != null) {

                    w.lock();

                    if ((runStateAtLeast(ctl.get(), STOP) ||

                        (Thread.interrupted() &&

                          runStateAtLeast(ctl.get(), STOP))) &&

                        !wt.isInterrupted())

                        wt.interrupt();

                    try {

                        beforeExecute(wt, task);

                        Throwable thrown = null;

                        try {

                            task.run();

                        } catch (RuntimeException x) {

                            thrown = x; throw x;

                        } catch (Error x) {

                            thrown = x; throw x;

                        } catch (Throwable x) {

                            thrown = x; throw new Error(x);

                        } finally {

                            afterExecute(task, thrown);

                        }

                    } finally {

                        task = null;

                        w.completedTasks++;

                        w.unlock();

                    }

                }

                completedAbruptly = false;

            } finally {

                processWorkerExit(w, completedAbruptly);

            }

        }

    DougLea真是烦人呐,又搞了这么长的代码,代码随长,还是很清晰的。线程start之后执行了Worker类中的run方法,而run方法内部实际上调用的就是runWorker方法。该方法的关键在于while循环,循环条件是firstTask或者从BlockingQueue中取的task不为空的时候执行,该方法给我们留了两个钩子,即beforeExecute和afterExecute,如果我们实现一个子类可以实现这两个方法给自己定制。真正执行的部分在于task.run()。

    上面代码我们关注getTask()方法:

    private Runnable getTask() {

            boolean timedOut = false; // Did the last poll() time out?

            for (;;) {

                int c = ctl.get();

                int rs = runStateOf(c);

                // Check if queue empty only if necessary.

                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                    decrementWorkerCount();

                    return null;

                }

                int wc = workerCountOf(c);

                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if ((wc > maximumPoolSize || (timed && timedOut))

                    && (wc > 1 || workQueue.isEmpty())) {

                    if (compareAndDecrementWorkerCount(c))

                        return null;

                    continue;

                }

                try {

                    Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

                    if (r != null)

                        return r;

                    timedOut = true;

                } catch (InterruptedException retry) {

                    timedOut = false;

                }

            }

        }

    这个方法的关键在于这句话Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();这句话就是阻塞队列取元素的方法,所以该方法是线程安全的。

    OK,上面就是ThreadPoolExecutor的具体实现,然而该类对外部暴露的api最常用的则是构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

                                  ThreadFactory threadFactory,

                                  RejectedExecutionHandler handler) {

            if (corePoolSize < 0 ||

                maximumPoolSize <= 0 ||

                maximumPoolSize < corePoolSize ||

                keepAliveTime < 0)

                throw new IllegalArgumentException();

            if (workQueue == null || threadFactory == null || handler == null)

                throw new NullPointerException();

            this.corePoolSize = corePoolSize;

            this.maximumPoolSize = maximumPoolSize;

            this.workQueue = workQueue;

            this.keepAliveTime = unit.toNanos(keepAliveTime);

            this.threadFactory = threadFactory;

            this.handler = handler;

        }

    构造方法分为很多中,每种可以实现不同的线程池,《阿里巴巴编程规约》里要求我们直接用构造方法定制一定数目的线程池。

    OK,以上我们解决了构造函数的问题,现在我们看一下线程池是怎么执行的:

    public void execute(Runnable command) {

            if (command == null)

                throw new NullPointerException();

            *如果池子里线程数量小于最小线程数,那么新起一个线程并把command当作第                一个任务,

            *如果任务成功加入队列中,我们仍需要检测是否有线程加入workers里了

            int c = ctl.get();

            if (workerCountOf(c) < corePoolSize) {

                if (addWorker(command, true))

                    return;

                c = ctl.get();

            }

            if (isRunning(c) && workQueue.offer(command)) {

                int recheck = ctl.get();

                if (! isRunning(recheck) && remove(command))

                    reject(command);

                else if (workerCountOf(recheck) == 0)

                    addWorker(null, false);

            }

            else if (!addWorker(command, false))

                reject(command);

        }

    以上代码最重要的部分就是addworker,实际上首先保证有足够线程,如果没有就创建线程执行任务(不是创建core个线程哦)如果有就把任务放到BlockingQueue中等待线程去拿任务执行。

    最后看看线程池的关闭:

    public void shutdown() {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                checkShutdownAccess();

                advanceRunState(SHUTDOWN);

                //中断线程池里没有执行任务并且等待任务的线程,调用的是t.interupt方法

                interruptIdleWorkers();

                //给定时线程池留的钩子

                onShutdown(); // hook for ScheduledThreadPoolExecutor

            } finally {

                mainLock.unlock();

            }

            tryTerminate();

        }

    public ListshutdownNow() { List tasks;

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                checkShutdownAccess();

                advanceRunState(STOP);

                //直接中断所有线程,调用的是interrupt方法

                interruptWorkers();

                //把没执行完的任务放到List里面返回

                tasks = drainQueue();

            } finally {

                mainLock.unlock();

            }

            tryTerminate();

            return tasks;

        }

    两个关闭线程池方法略有区别,shutdown方法只改变线程池状态为SHUTDOWN,这个时候线程池拒绝接受任务但是会执行完原有的任务。shutdownNow直接关闭线程池,即不接受任务也不执行原来的任务。停止线程调用的都是Thread.interruput方法!!!!

    本篇实际上漏掉了AbstractExecutorService里的一些submit方法,没关系,我们在下一篇ScheduledThreadPoolExecutor中补回来!

    相关文章

      网友评论

        本文标题:并发编程之ThreadPoolExecutor(三)

        本文链接:https://www.haomeiwen.com/subject/nixejftx.html